This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new dffff6b43 [#3973] improvement(core):support audit log (#4575)
dffff6b43 is described below

commit dffff6b43913152e1594ba4afa012cc4861b50a7
Author: hanwxx <[email protected]>
AuthorDate: Mon Oct 21 22:02:27 2024 +0800

    [#3973] improvement(core):support audit log (#4575)
    
    ### What changes were proposed in this pull request?
    
    Design a gravitino audit framework.
    
    ### Why are the changes needed?
    
    fixes: #3973
    
    ### Does this PR introduce _any_ user-facing change?
    No
    ### How was this patch tested?
    1. Add UT
    
    ---------
    
    Co-authored-by: han <[email protected]>
    Co-authored-by: Qi Yu <[email protected]>
---
 .../main/java/org/apache/gravitino/Configs.java    |  25 +
 .../java/org/apache/gravitino/GravitinoEnv.java    |   6 +
 .../java/org/apache/gravitino/audit/AuditLog.java  | 299 +++++++++++
 .../apache/gravitino/audit/AuditLogManager.java    | 121 +++++
 .../org/apache/gravitino/audit/AuditLogWriter.java |  65 +++
 .../apache/gravitino/audit/FileAuditWriter.java    |  99 ++++
 .../java/org/apache/gravitino/audit/Formatter.java |  34 ++
 .../org/apache/gravitino/audit/SimpleAuditLog.java |  76 +++
 .../apache/gravitino/audit/SimpleFormatter.java    |  40 ++
 .../gravitino/listener/EventListenerManager.java   |   4 +
 .../gravitino/audit/DummyAuditFormatter.java       |  38 ++
 .../org/apache/gravitino/audit/DummyAuditLog.java  |  67 +++
 .../apache/gravitino/audit/DummyAuditWriter.java   |  53 ++
 .../apache/gravitino/audit/TestAuditManager.java   | 247 +++++++++
 .../org/apache/gravitino/audit/TestOperation.java  | 572 +++++++++++++++++++++
 docs/gravitino-server-config.md                    |  27 +
 16 files changed, 1773 insertions(+)

diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 587d85ada..81388608d 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -23,6 +23,8 @@ import java.io.File;
 import java.util.Collections;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.audit.FileAuditWriter;
+import org.apache.gravitino.audit.SimpleFormatter;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
@@ -342,4 +344,27 @@ public class Configs {
           .stringConf()
           .toSequence()
           .createWithDefault(Collections.emptyList());
+
+  public static final String AUDIT_LOG_WRITER_CONFIG_PREFIX = 
"gravitino.audit.writer.";
+
+  public static final ConfigEntry<Boolean> AUDIT_LOG_ENABLED_CONF =
+      new ConfigBuilder("gravitino.audit.enable")
+          .doc("Gravitino audit log enable flag")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .booleanConf()
+          .createWithDefault(false);
+
+  public static final ConfigEntry<String> AUDIT_LOG_WRITER_CLASS_NAME =
+      new ConfigBuilder("gravitino.audit.writer.className")
+          .doc("Gravitino audit log writer class name")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .createWithDefault(FileAuditWriter.class.getName());
+
+  public static final ConfigEntry<String> AUDIT_LOG_FORMATTER_CLASS_NAME =
+      new ConfigBuilder("gravitino.audit.formatter.className")
+          .doc("Gravitino event log formatter class name")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .createWithDefault(SimpleFormatter.class.getName());
 }
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 77d76b474..516e9d9d3 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino;
 
 import com.google.common.base.Preconditions;
+import org.apache.gravitino.audit.AuditLogManager;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AccessControlManager;
 import org.apache.gravitino.authorization.FutureGrantManager;
@@ -109,6 +110,8 @@ public class GravitinoEnv {
 
   private EventListenerManager eventListenerManager;
 
+  private AuditLogManager auditLogManager;
+
   private TagManager tagManager;
   private EventBus eventBus;
   private OwnerManager ownerManager;
@@ -356,6 +359,9 @@ public class GravitinoEnv {
     eventListenerManager.init(
         
config.getConfigsWithPrefix(EventListenerManager.GRAVITINO_EVENT_LISTENER_PREFIX));
     this.eventBus = eventListenerManager.createEventBus();
+
+    this.auditLogManager = new AuditLogManager();
+    auditLogManager.init(config, eventListenerManager);
   }
 
   private void initGravitinoServerComponents() {
diff --git a/core/src/main/java/org/apache/gravitino/audit/AuditLog.java 
b/core/src/main/java/org/apache/gravitino/audit/AuditLog.java
new file mode 100644
index 000000000..86286c0f6
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/AuditLog.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import org.apache.gravitino.listener.api.event.AlterCatalogEvent;
+import org.apache.gravitino.listener.api.event.AlterCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTableEvent;
+import org.apache.gravitino.listener.api.event.AlterTableFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTableEvent;
+import org.apache.gravitino.listener.api.event.CreateTableFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTableEvent;
+import org.apache.gravitino.listener.api.event.DropTableFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTopicEvent;
+import org.apache.gravitino.listener.api.event.DropTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.GetFileLocationEvent;
+import org.apache.gravitino.listener.api.event.GetFileLocationFailureEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.ListMetalakeEvent;
+import org.apache.gravitino.listener.api.event.ListMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTableEvent;
+import org.apache.gravitino.listener.api.event.ListTableFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTopicEvent;
+import org.apache.gravitino.listener.api.event.ListTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTableEvent;
+import org.apache.gravitino.listener.api.event.LoadTableFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.PartitionExistsEvent;
+import org.apache.gravitino.listener.api.event.PartitionExistsFailureEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.PurgeTableEvent;
+import org.apache.gravitino.listener.api.event.PurgeTableFailureEvent;
+
+/** The interface define unified audit log schema. */
+public interface AuditLog {
+  /**
+   * The user who do the operation.
+   *
+   * @return user name.
+   */
+  String user();
+
+  /**
+   * The operation name.
+   *
+   * @return operation name.
+   */
+  Operation operation();
+
+  /**
+   * The identifier of the resource.
+   *
+   * @return resource identifier name.
+   */
+  String identifier();
+
+  /**
+   * The timestamp of the operation.
+   *
+   * @return operation timestamp.
+   */
+  long timestamp();
+
+  /**
+   * The status of the operation.
+   *
+   * @return operation status.
+   */
+  Status status();
+
+  /** Define user metadata operation. */
+  enum Operation {
+    CREATE_METALAKE,
+
+    ALTER_METALAKE,
+
+    DROP_METALAKE,
+
+    LOAD_METALAKE,
+
+    LIST_METALAKE,
+
+    CREATE_CATALOG,
+
+    LOAD_CATALOG,
+
+    ALTER_CATALOG,
+
+    DROP_CATALOG,
+
+    LIST_CATALOG,
+
+    CREATE_SCHEMA,
+
+    ALTER_SCHEMA,
+
+    DROP_SCHEMA,
+
+    LOAD_SCHEMA,
+
+    LIST_SCHEMA,
+
+    CREATE_TABLE,
+
+    ALTER_TABLE,
+
+    DROP_TABLE,
+
+    LOAD_TABLE,
+
+    PURGE_TABLE,
+
+    LIST_TABLE,
+
+    PARTITION_EXIST,
+
+    PURGE_PARTITION,
+
+    LIST_PARTITION,
+
+    GET_PARTITION,
+
+    CREATE_TOPIC,
+
+    ALTER_TOPIC,
+
+    DROP_TOPIC,
+
+    LOAD_TOPIC,
+
+    LIST_TOPIC,
+
+    GET_FILE_LOCATION,
+
+    CREATE_FILESET,
+
+    ALTER_FILESET,
+
+    DROP_FILESET,
+
+    LOAD_FILESET,
+
+    LIST_FILESET,
+
+    UNKNOWN_OPERATION;
+
+    public static Operation fromEvent(Event event) {
+      if (event instanceof CreateMetalakeEvent || event instanceof 
CreateMetalakeFailureEvent) {
+        return CREATE_METALAKE;
+      } else if (event instanceof AlterMetalakeEvent
+          || event instanceof AlterMetalakeFailureEvent) {
+        return ALTER_METALAKE;
+      } else if (event instanceof DropMetalakeEvent || event instanceof 
DropMetalakeFailureEvent) {
+        return DROP_METALAKE;
+      } else if (event instanceof LoadMetalakeEvent || event instanceof 
LoadMetalakeFailureEvent) {
+        return LOAD_METALAKE;
+      } else if (event instanceof ListMetalakeEvent || event instanceof 
ListMetalakeFailureEvent) {
+        return LIST_METALAKE;
+      } else if (event instanceof CreateCatalogEvent
+          || event instanceof CreateCatalogFailureEvent) {
+        return CREATE_CATALOG;
+      } else if (event instanceof AlterCatalogEvent || event instanceof 
AlterCatalogFailureEvent) {
+        return ALTER_CATALOG;
+      } else if (event instanceof DropCatalogEvent || event instanceof 
DropCatalogFailureEvent) {
+        return DROP_CATALOG;
+      } else if (event instanceof LoadCatalogEvent || event instanceof 
LoadCatalogFailureEvent) {
+        return LOAD_CATALOG;
+      } else if (event instanceof ListCatalogEvent || event instanceof 
ListCatalogFailureEvent) {
+        return LIST_CATALOG;
+      } else if (event instanceof CreateSchemaEvent || event instanceof 
CreateSchemaFailureEvent) {
+        return CREATE_SCHEMA;
+      } else if (event instanceof AlterSchemaEvent || event instanceof 
AlterSchemaFailureEvent) {
+        return ALTER_SCHEMA;
+      } else if (event instanceof DropSchemaEvent || event instanceof 
DropSchemaFailureEvent) {
+        return DROP_SCHEMA;
+      } else if (event instanceof LoadSchemaEvent || event instanceof 
LoadSchemaFailureEvent) {
+        return LOAD_SCHEMA;
+      } else if (event instanceof ListSchemaEvent || event instanceof 
ListSchemaFailureEvent) {
+        return LIST_SCHEMA;
+      } else if (event instanceof CreateTableEvent || event instanceof 
CreateTableFailureEvent) {
+        return CREATE_TABLE;
+      } else if (event instanceof AlterTableEvent || event instanceof 
AlterTableFailureEvent) {
+        return ALTER_TABLE;
+      } else if (event instanceof DropTableEvent || event instanceof 
DropTableFailureEvent) {
+        return DROP_TABLE;
+      } else if (event instanceof LoadTableEvent || event instanceof 
LoadTableFailureEvent) {
+        return LOAD_TABLE;
+      } else if (event instanceof PurgeTableEvent || event instanceof 
PurgeTableFailureEvent) {
+        return PURGE_TABLE;
+      } else if (event instanceof ListTableEvent || event instanceof 
ListTableFailureEvent) {
+        return LIST_TABLE;
+      } else if (event instanceof PartitionExistsEvent
+          || event instanceof PartitionExistsFailureEvent) {
+        return PARTITION_EXIST;
+      } else if (event instanceof PurgePartitionEvent
+          || event instanceof PurgePartitionFailureEvent) {
+        return PURGE_PARTITION;
+      } else if (event instanceof ListPartitionEvent
+          || event instanceof ListPartitionFailureEvent) {
+        return LIST_PARTITION;
+      } else if (event instanceof GetPartitionEvent || event instanceof 
GetPartitionFailureEvent) {
+        return GET_PARTITION;
+      } else if (event instanceof CreateTopicEvent || event instanceof 
CreateTopicFailureEvent) {
+        return CREATE_TOPIC;
+      } else if (event instanceof AlterTopicEvent || event instanceof 
AlterTopicFailureEvent) {
+        return ALTER_TOPIC;
+      } else if (event instanceof DropTopicEvent || event instanceof 
DropTopicFailureEvent) {
+        return DROP_TOPIC;
+      } else if (event instanceof LoadTopicEvent || event instanceof 
LoadTopicFailureEvent) {
+        return LOAD_TOPIC;
+      } else if (event instanceof ListTopicEvent || event instanceof 
ListTopicFailureEvent) {
+        return LIST_TOPIC;
+      } else if (event instanceof CreateFilesetEvent
+          || event instanceof CreateFilesetFailureEvent) {
+        return CREATE_FILESET;
+      } else if (event instanceof AlterFilesetEvent || event instanceof 
AlterFilesetFailureEvent) {
+        return ALTER_FILESET;
+      } else if (event instanceof DropFilesetEvent || event instanceof 
DropFilesetFailureEvent) {
+        return DROP_FILESET;
+      } else if (event instanceof GetFileLocationEvent
+          || event instanceof GetFileLocationFailureEvent) {
+        return GET_FILE_LOCATION;
+      } else if (event instanceof LoadFilesetEvent || event instanceof 
LoadFilesetFailureEvent) {
+        return LOAD_FILESET;
+      } else if (event instanceof ListFilesetEvent || event instanceof 
ListFilesetFailureEvent) {
+        return LIST_FILESET;
+      } else {
+        return UNKNOWN_OPERATION;
+      }
+    }
+  }
+
+  enum Status {
+    SUCCESS,
+    FAILURE
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/AuditLogManager.java 
b/core/src/main/java/org/apache/gravitino/audit/AuditLogManager.java
new file mode 100644
index 000000000..ac9d24bd8
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/AuditLogManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * AuditLogManager is responsible for initializing the audit log writer and 
formatter,
+ * which are used to write metadata audit logs.
+ * */
+public class AuditLogManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AuditLogManager.class);
+
+  @VisibleForTesting private AuditLogWriter auditLogWriter;
+
+  public void init(Config config, EventListenerManager eventBusManager) {
+    if (!config.get(Configs.AUDIT_LOG_ENABLED_CONF)) {
+      LOG.info("Audit log is not enabled");
+      return;
+    }
+    String formatterClassName = 
config.get(Configs.AUDIT_LOG_FORMATTER_CLASS_NAME);
+    Formatter formatter = loadFormatter(formatterClassName);
+    LOG.info("Audit log formatter class name:{}", formatterClassName);
+
+    String writerClassName = config.get(Configs.AUDIT_LOG_WRITER_CLASS_NAME);
+    auditLogWriter =
+        loadAuditLogWriter(
+            writerClassName,
+            
config.getConfigsWithPrefix(Configs.AUDIT_LOG_WRITER_CONFIG_PREFIX),
+            formatter);
+    LOG.info("Audit log writer class name:{}", writerClassName);
+
+    eventBusManager.addEventListener(
+        "audit-log",
+        new EventListenerPlugin() {
+
+          @Override
+          public void init(Map<String, String> properties) throws 
RuntimeException {}
+
+          @Override
+          public void start() throws RuntimeException {}
+
+          @Override
+          public void stop() throws RuntimeException {
+            try {
+              auditLogWriter.close();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          @Override
+          public void onPostEvent(Event event) throws RuntimeException {
+            try {
+              auditLogWriter.write(event);
+            } catch (Exception e) {
+              LOG.warn("Failed to write audit log {}.", event, e);
+            }
+          }
+
+          @Override
+          public Mode mode() {
+            return Mode.ASYNC_ISOLATED;
+          }
+        });
+  }
+
+  private AuditLogWriter loadAuditLogWriter(
+      String className, Map<String, String> config, Formatter formatter) {
+    try {
+      AuditLogWriter auditLogWriter =
+          (AuditLogWriter) 
Class.forName(className).getDeclaredConstructor().newInstance();
+      Map<String, String> writerConfig = MapUtils.getPrefixMap(config, 
auditLogWriter.name() + ".");
+      auditLogWriter.init(formatter, writerConfig);
+      return auditLogWriter;
+    } catch (Exception e) {
+      throw new GravitinoRuntimeException(e, "Failed to load audit log writer 
%s", className);
+    }
+  }
+
+  private Formatter loadFormatter(String className) {
+    try {
+      return (Formatter) 
Class.forName(className).getDeclaredConstructor().newInstance();
+    } catch (Exception e) {
+      throw new GravitinoRuntimeException(e, "Failed to load formatter class 
name %s", className);
+    }
+  }
+
+  AuditLogWriter getAuditLogWriter() {
+    return auditLogWriter;
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/AuditLogWriter.java 
b/core/src/main/java/org/apache/gravitino/audit/AuditLogWriter.java
new file mode 100644
index 000000000..d68fc72c9
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/AuditLogWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.listener.api.event.Event;
+
+/**
+ * Interface for writing the audit log, which can write to different storage, 
such as file,
+ * database,mq.
+ */
+public interface AuditLogWriter extends Closeable {
+
+  /** @return formatter. */
+  Formatter getFormatter();
+
+  /**
+   * Initialize the writer with the given configuration.
+   *
+   * @param properties
+   */
+  void init(Formatter formatter, Map<String, String> properties);
+
+  /**
+   * Write the audit event to storage.
+   *
+   * @param auditLog
+   */
+  void doWrite(AuditLog auditLog);
+
+  /**
+   * Write the audit event to storage.
+   *
+   * @param event
+   */
+  default void write(Event event) {
+    doWrite(getFormatter().format(event));
+  }
+
+  /**
+   * Define the name of the writer, which related to audit writer 
configuration. Audit log writer
+   * configuration start with: gravitino.audit.log.writer.${name}.*
+   *
+   * @return
+   */
+  String name();
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/FileAuditWriter.java 
b/core/src/main/java/org/apache/gravitino/audit/FileAuditWriter.java
new file mode 100644
index 000000000..cc3d93078
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/FileAuditWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DefaultFileAuditWriter is the default implementation of AuditLogWriter, 
which writes audit logs
+ * to a file.
+ */
+public class FileAuditWriter implements AuditLogWriter {
+  private static final Logger Log = 
LoggerFactory.getLogger(FileAuditWriter.class);
+
+  public static final String AUDIT_LOG_FILE_NAME = "fileName";
+
+  public static final String APPEND = "append";
+
+  public static final String LINE_SEPARATOR = System.lineSeparator();
+
+  Formatter formatter;
+  @VisibleForTesting Writer outWriter;
+  @VisibleForTesting String fileName;
+
+  boolean append;
+
+  @Override
+  public Formatter getFormatter() {
+    return formatter;
+  }
+
+  @Override
+  public void init(Formatter formatter, Map<String, String> properties) {
+    this.formatter = formatter;
+    this.fileName =
+        System.getProperty("gravitino.log.path")
+            + "/"
+            + properties.getOrDefault(AUDIT_LOG_FILE_NAME, 
"gravitino_audit.log");
+    this.append = Boolean.parseBoolean(properties.getOrDefault(APPEND, 
"true"));
+    try {
+      OutputStream outputStream = new FileOutputStream(fileName, append);
+      this.outWriter = new OutputStreamWriter(outputStream, 
StandardCharsets.UTF_8);
+    } catch (Exception e) {
+      throw new GravitinoRuntimeException(
+          e, "Init audit log writer fail, filename is %s", fileName);
+    }
+  }
+
+  @Override
+  public void doWrite(AuditLog auditLog) {
+    String log = auditLog.toString();
+    try {
+      outWriter.write(log + LINE_SEPARATOR);
+    } catch (Exception e) {
+      Log.warn("Failed to write audit log: {}", log, e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (outWriter != null) {
+      try {
+        outWriter.close();
+      } catch (Exception e) {
+        Log.warn("Failed to close writer", e);
+      }
+    }
+  }
+
+  @Override
+  public String name() {
+    return "file";
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/Formatter.java 
b/core/src/main/java/org/apache/gravitino/audit/Formatter.java
new file mode 100644
index 000000000..3650ffb02
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/Formatter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import org.apache.gravitino.listener.api.event.Event;
+
+/** The interface defined the conversions of metadata change event to unified 
log format. */
+public interface Formatter {
+
+  /**
+   * Format the event, returning the unified audit log format.
+   *
+   * @param event The event to format.
+   * @return The formatted AuditLog.
+   */
+  AuditLog format(Event event);
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/SimpleAuditLog.java 
b/core/src/main/java/org/apache/gravitino/audit/SimpleAuditLog.java
new file mode 100644
index 000000000..75ee08aad
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/SimpleAuditLog.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import java.text.SimpleDateFormat;
+import javax.annotation.Nullable;
+import lombok.Builder;
+
+/** The default implementation of the audit log. */
+@Builder
+public class SimpleAuditLog implements AuditLog {
+
+  private String user;
+
+  private Operation operation;
+
+  private String identifier;
+
+  private long timestamp;
+
+  private Status status;
+
+  @Override
+  public String user() {
+    return user;
+  }
+
+  @Override
+  public Operation operation() {
+    return operation;
+  }
+
+  @Override
+  @Nullable
+  public String identifier() {
+    return identifier;
+  }
+
+  @Override
+  public long timestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public Status status() {
+    return status;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "[%s]\t%s\t%s\t%s\t%s",
+        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timestamp),
+        user,
+        operation,
+        identifier,
+        status);
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/SimpleFormatter.java 
b/core/src/main/java/org/apache/gravitino/audit/SimpleFormatter.java
new file mode 100644
index 000000000..7b54d460e
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/SimpleFormatter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import org.apache.gravitino.audit.AuditLog.Status;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+
+/** The default formatter implementation of the audit log. */
+public class SimpleFormatter implements Formatter {
+
+  @Override
+  public SimpleAuditLog format(Event event) {
+    Status status = event instanceof FailureEvent ? Status.FAILURE : 
Status.SUCCESS;
+    return SimpleAuditLog.builder()
+        .user(event.user())
+        .operation(AuditLog.Operation.fromEvent(event))
+        .identifier(event.identifier() != null ? event.identifier().toString() 
: null)
+        .timestamp(event.eventTime())
+        .status(status)
+        .build();
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java 
b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
index ebe08637f..e018f8c13 100644
--- a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
+++ b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
@@ -99,6 +99,10 @@ public class EventListenerManager {
     return new EventBus(eventListeners);
   }
 
+  public void addEventListener(String listenerName, EventListenerPlugin 
listener) {
+    eventListeners.add(new EventListenerPluginWrapper(listenerName, listener));
+  }
+
   private List<EventListenerPlugin> assembleEventListeners(
       Map<String, EventListenerPlugin> userEventListeners) {
     List<EventListenerPlugin> sharedQueueListeners = new ArrayList<>();
diff --git 
a/core/src/test/java/org/apache/gravitino/audit/DummyAuditFormatter.java 
b/core/src/test/java/org/apache/gravitino/audit/DummyAuditFormatter.java
new file mode 100644
index 000000000..32094cd33
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/DummyAuditFormatter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import static org.apache.gravitino.audit.AuditLog.Status;
+
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+
+public class DummyAuditFormatter implements Formatter {
+  @Override
+  public DummyAuditLog format(Event event) {
+    return DummyAuditLog.builder()
+        .user(event.user())
+        .operation(AuditLog.Operation.fromEvent(event))
+        .identifier(event.identifier() != null ? event.identifier().toString() 
: null)
+        .timestamp(event.eventTime())
+        .status(event instanceof FailureEvent ? Status.FAILURE : 
Status.SUCCESS)
+        .build();
+  }
+}
diff --git a/core/src/test/java/org/apache/gravitino/audit/DummyAuditLog.java 
b/core/src/test/java/org/apache/gravitino/audit/DummyAuditLog.java
new file mode 100644
index 000000000..d75565b22
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/DummyAuditLog.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+@Builder
+@ToString
+@Getter
+@EqualsAndHashCode
+public class DummyAuditLog implements AuditLog {
+
+  private String user;
+
+  private Operation operation;
+
+  private String identifier;
+
+  private long timestamp;
+
+  private Status status;
+
+  @Override
+  public String user() {
+    return user;
+  }
+
+  @Override
+  public Operation operation() {
+    return operation;
+  }
+
+  @Override
+  public String identifier() {
+    return identifier;
+  }
+
+  @Override
+  public long timestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public Status status() {
+    return status;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/audit/DummyAuditWriter.java 
b/core/src/test/java/org/apache/gravitino/audit/DummyAuditWriter.java
new file mode 100644
index 000000000..a6ca2d627
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/DummyAuditWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import java.util.LinkedList;
+import java.util.Map;
+import lombok.Getter;
+
+public class DummyAuditWriter implements AuditLogWriter {
+  private Formatter formatter;
+
+  @Getter private final LinkedList<DummyAuditLog> auditLogs = new 
LinkedList<>();
+
+  @Override
+  public Formatter getFormatter() {
+    return formatter;
+  }
+
+  @Override
+  public void init(Formatter formatter, Map<String, String> properties) {
+    this.formatter = formatter;
+  }
+
+  @Override
+  public void doWrite(AuditLog auditLog) {
+    auditLogs.add((DummyAuditLog) auditLog);
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public String name() {
+    return "dummy";
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/audit/TestAuditManager.java 
b/core/src/test/java/org/apache/gravitino/audit/TestAuditManager.java
new file mode 100644
index 000000000..2db57b48e
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/TestAuditManager.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import static org.apache.gravitino.audit.AuditLog.Operation;
+import static org.apache.gravitino.audit.AuditLog.Status;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.stream.Stream;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestAuditManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestAuditManager.class);
+
+  private static final String DEFAULT_FILE_NAME = "gravitino_audit.log";
+
+  private static final int EVENT_NUM = 2000;
+
+  private Path logPath;
+
+  @BeforeAll
+  public void setup() {
+    String logDir = System.getProperty("gravitino.log.path");
+    Path logDirPath = Paths.get(logDir);
+    if (!Files.exists(logDirPath)) {
+      try {
+        Files.createDirectories(logDirPath);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    this.logPath = Paths.get(logDir + "/" + DEFAULT_FILE_NAME);
+    if (Files.exists(logPath)) {
+      LOG.warn(
+          String.format("tmp audit log file: %s already exists, delete it", 
DEFAULT_FILE_NAME));
+      try {
+        Files.delete(logPath);
+        LOG.warn(String.format("delete tmp audit log file: %s success", 
DEFAULT_FILE_NAME));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /** Test audit log with custom audit writer and formatter. */
+  @Test
+  public void testAuditLog() {
+    Config config = new Config(false) {};
+    config.set(Configs.AUDIT_LOG_ENABLED_CONF, true);
+    config.set(Configs.AUDIT_LOG_WRITER_CLASS_NAME, 
"org.apache.gravitino.audit.DummyAuditWriter");
+    config.set(
+        Configs.AUDIT_LOG_FORMATTER_CLASS_NAME, 
"org.apache.gravitino.audit.DummyAuditFormatter");
+
+    EventListenerManager eventListenerManager = mockEventListenerManager();
+    AuditLogManager auditLogManager = mockAuditLogManager(config, 
eventListenerManager);
+    EventBus eventBus = eventListenerManager.createEventBus();
+
+    // dispatch success event
+    DummyEvent dummyEvent = mockDummyEvent();
+    eventBus.dispatchEvent(dummyEvent);
+
+    Assertions.assertInstanceOf(DummyAuditWriter.class, 
auditLogManager.getAuditLogWriter());
+    Assertions.assertInstanceOf(
+        DummyAuditFormatter.class, 
(auditLogManager.getAuditLogWriter()).getFormatter());
+
+    DummyAuditWriter dummyAuditWriter = (DummyAuditWriter) 
auditLogManager.getAuditLogWriter();
+
+    DummyAuditFormatter formatter = (DummyAuditFormatter) 
dummyAuditWriter.getFormatter();
+    DummyAuditLog formattedAuditLog = formatter.format(dummyEvent);
+
+    Assertions.assertNotNull(formattedAuditLog);
+    Assertions.assertEquals(formattedAuditLog.operation(), 
Operation.UNKNOWN_OPERATION);
+    Assertions.assertEquals(formattedAuditLog.status(), Status.SUCCESS);
+    Assertions.assertEquals(formattedAuditLog.user(), "user");
+    Assertions.assertEquals(formattedAuditLog.identifier(), "a.b.c.d");
+    Assertions.assertEquals(formattedAuditLog.timestamp(), 
dummyEvent.eventTime());
+    Assertions.assertEquals(formattedAuditLog, 
dummyAuditWriter.getAuditLogs().get(0));
+
+    // dispatch fail event
+    DummyFailEvent dummyFailEvent = mockDummyFailEvent();
+    eventBus.dispatchEvent(dummyFailEvent);
+    DummyAuditLog formattedFailAuditLog = formatter.format(dummyFailEvent);
+    Assertions.assertEquals(formattedFailAuditLog, 
dummyAuditWriter.getAuditLogs().get(1));
+    Assertions.assertEquals(formattedFailAuditLog.operation(), 
Operation.UNKNOWN_OPERATION);
+    Assertions.assertEquals(formattedFailAuditLog.status(), Status.FAILURE);
+    Assertions.assertEquals(formattedFailAuditLog, 
dummyAuditWriter.getAuditLogs().get(1));
+  }
+
+  /** Test audit log with default audit writer and formatter. */
+  @Test
+  public void testFileAuditLog() {
+    Config config = new Config(false) {};
+    config.set(Configs.AUDIT_LOG_ENABLED_CONF, true);
+    DummyEvent dummyEvent = mockDummyEvent();
+    EventListenerManager eventListenerManager = mockEventListenerManager();
+    AuditLogManager auditLogManager = mockAuditLogManager(config, 
eventListenerManager);
+    EventBus eventBus = eventListenerManager.createEventBus();
+    eventBus.dispatchEvent(dummyEvent);
+    Assertions.assertInstanceOf(FileAuditWriter.class, 
auditLogManager.getAuditLogWriter());
+    Assertions.assertInstanceOf(
+        SimpleFormatter.class, 
(auditLogManager.getAuditLogWriter()).getFormatter());
+
+    FileAuditWriter fileAuditWriter = (FileAuditWriter) 
auditLogManager.getAuditLogWriter();
+    String fileName = fileAuditWriter.fileName;
+    try {
+      fileAuditWriter.outWriter.flush();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String auditLog = readAuditLog(fileName);
+    Formatter formatter = fileAuditWriter.getFormatter();
+    SimpleAuditLog formattedAuditLog = (SimpleAuditLog) 
formatter.format(dummyEvent);
+
+    Assertions.assertNotNull(formattedAuditLog);
+    Assertions.assertEquals(formattedAuditLog.toString(), auditLog);
+  }
+
+  @Test
+  public void testBathEvents() {
+    Config config = new Config(false) {};
+    config.set(Configs.AUDIT_LOG_ENABLED_CONF, true);
+    // set immediate flush to true for testing, so that the audit log will be 
read immediately
+    config.set(
+        new 
ConfigBuilder("gravitino.audit.writer.file.immediateFlush").stringConf(), 
"true");
+
+    EventListenerManager eventListenerManager = mockEventListenerManager();
+    AuditLogManager auditLogManager = mockAuditLogManager(config, 
eventListenerManager);
+    EventBus eventBus = eventListenerManager.createEventBus();
+
+    for (int i = 0; i < EVENT_NUM; i++) {
+      DummyEvent dummyEvent = mockDummyEvent();
+      eventBus.dispatchEvent(dummyEvent);
+    }
+
+    FileAuditWriter fileAuditWriter = (FileAuditWriter) 
auditLogManager.getAuditLogWriter();
+    String fileName = fileAuditWriter.fileName;
+    try {
+      fileAuditWriter.outWriter.flush();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    long auditSize = getAuditSize(fileName);
+    Assertions.assertEquals(EVENT_NUM, auditSize);
+  }
+
+  @AfterEach
+  public void cleanup() {
+    try {
+      if (Files.exists(logPath)) {
+        Files.delete(logPath);
+        LOG.warn(String.format("delete tmp audit log file: %s success", 
DEFAULT_FILE_NAME));
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String readAuditLog(String fileName) {
+    try (BufferedReader reader =
+        Files.newBufferedReader(Paths.get(fileName), StandardCharsets.UTF_8)) {
+      return reader.readLine();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private long getAuditSize(String fileName) {
+    try (Stream<String> lines = Files.lines(Paths.get(fileName))) {
+      return lines.count();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private AuditLogManager mockAuditLogManager(
+      Config config, EventListenerManager eventListenerManager) {
+    AuditLogManager auditLogManager = new AuditLogManager();
+    auditLogManager.init(config, eventListenerManager);
+    return auditLogManager;
+  }
+
+  private EventListenerManager mockEventListenerManager() {
+    EventListenerManager eventListenerManager = new EventListenerManager();
+    eventListenerManager.init(new HashMap<>());
+    eventListenerManager.start();
+    return eventListenerManager;
+  }
+
+  private DummyEvent mockDummyEvent() {
+    return new DummyEvent("user", NameIdentifier.of("a", "b", "c", "d"));
+  }
+
+  private DummyFailEvent mockDummyFailEvent() {
+    return new DummyFailEvent("user", NameIdentifier.of("a", "b", "c", "d"));
+  }
+
+  static class DummyEvent extends Event {
+    protected DummyEvent(String user, NameIdentifier identifier) {
+      super(user, identifier);
+    }
+  }
+
+  static class DummyFailEvent extends FailureEvent {
+    protected DummyFailEvent(String user, NameIdentifier identifier) {
+      super(user, identifier, new RuntimeException("dummy exception"));
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/gravitino/audit/TestOperation.java 
b/core/src/test/java/org/apache/gravitino/audit/TestOperation.java
new file mode 100644
index 000000000..cd0bc9da9
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/TestOperation.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.LocalDate;
+import java.util.HashMap;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.MetalakeChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.listener.api.event.AlterCatalogEvent;
+import org.apache.gravitino.listener.api.event.AlterCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTableEvent;
+import org.apache.gravitino.listener.api.event.AlterTableFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTableEvent;
+import org.apache.gravitino.listener.api.event.CreateTableFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTableEvent;
+import org.apache.gravitino.listener.api.event.DropTableFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTopicEvent;
+import org.apache.gravitino.listener.api.event.DropTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.GetFileLocationEvent;
+import org.apache.gravitino.listener.api.event.GetFileLocationFailureEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTableEvent;
+import org.apache.gravitino.listener.api.event.ListTableFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTopicEvent;
+import org.apache.gravitino.listener.api.event.ListTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTableEvent;
+import org.apache.gravitino.listener.api.event.LoadTableFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.PartitionExistsEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.PurgeTableEvent;
+import org.apache.gravitino.listener.api.info.CatalogInfo;
+import org.apache.gravitino.listener.api.info.FilesetInfo;
+import org.apache.gravitino.listener.api.info.MetalakeInfo;
+import org.apache.gravitino.listener.api.info.SchemaInfo;
+import org.apache.gravitino.listener.api.info.TableInfo;
+import org.apache.gravitino.listener.api.info.TopicInfo;
+import org.apache.gravitino.listener.api.info.partitions.IdentityPartitionInfo;
+import org.apache.gravitino.listener.api.info.partitions.PartitionInfo;
+import org.apache.gravitino.messaging.TopicChange;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestOperation {
+
+  private NameIdentifier metalakeIdentifier;
+
+  private MetalakeInfo metalakeInfo;
+
+  private NameIdentifier catalogIdentifier;
+
+  private CatalogInfo catalogInfo;
+
+  private NameIdentifier schemaIdentifier;
+
+  private SchemaInfo schemaInfo;
+
+  private NameIdentifier tableIdentifier;
+
+  private TableInfo tableInfo;
+
+  private NameIdentifier filesetIdentifier;
+
+  private FilesetInfo filesetInfo;
+
+  private NameIdentifier topicIdentifier;
+
+  private TopicInfo topicInfo;
+
+  private NameIdentifier partitionIdentifier;
+
+  private PartitionInfo partitionInfo;
+
+  private static final String USER = "user";
+
+  private static final String PARTITION_NAME = "dt=2008-08-08/country=us";
+
+  @BeforeAll
+  public void init() {
+    this.metalakeIdentifier = mockMetalakeIdentifier();
+    this.metalakeInfo = mockMetalakeInfo();
+
+    this.catalogIdentifier = mockCatalogIndentifier();
+    this.catalogInfo = mockCatalogInfo();
+
+    this.schemaIdentifier = mockSchemaIdentifier();
+    this.schemaInfo = mockSchemaInfo();
+
+    this.tableIdentifier = mockTableIdentifier();
+    this.tableInfo = mockTableInfo();
+
+    this.topicIdentifier = mockTopicIdentifier();
+    this.topicInfo = mockTopicInfo();
+
+    this.filesetIdentifier = mockFilesetIdentifier();
+    this.filesetInfo = mockFilesetInfo();
+
+    this.partitionIdentifier = mockPartitionIdentifier();
+    this.partitionInfo = mockPartitionInfo();
+  }
+
+  @Test
+  public void testCreateOperation() {
+    Event createMetalakeEvent = new CreateMetalakeEvent(USER, 
catalogIdentifier, metalakeInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createMetalakeEvent), 
AuditLog.Operation.CREATE_METALAKE);
+    Event createMetalakeFaileEvent =
+        new CreateMetalakeFailureEvent(USER, metalakeIdentifier, new 
Exception(), metalakeInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createMetalakeFaileEvent), 
AuditLog.Operation.CREATE_METALAKE);
+
+    Event createCatalogEvent = new CreateCatalogEvent(USER, catalogIdentifier, 
catalogInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createCatalogEvent), 
AuditLog.Operation.CREATE_CATALOG);
+    Event CreateCatalogFailureEvent =
+        new CreateCatalogFailureEvent(USER, catalogIdentifier, new 
Exception(), catalogInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(CreateCatalogFailureEvent), 
AuditLog.Operation.CREATE_CATALOG);
+
+    Event createSchemaEvent = new CreateSchemaEvent(USER, schemaIdentifier, 
schemaInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createSchemaEvent), 
AuditLog.Operation.CREATE_SCHEMA);
+    Event createSchemaFailureEvent =
+        new CreateSchemaFailureEvent(USER, schemaIdentifier, new Exception(), 
schemaInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createSchemaFailureEvent), 
AuditLog.Operation.CREATE_SCHEMA);
+
+    Event createTableEvent = new CreateTableEvent(USER, tableIdentifier, 
tableInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createTableEvent), 
AuditLog.Operation.CREATE_TABLE);
+    Event createTableFailureEvent =
+        new CreateTableFailureEvent(USER, tableIdentifier, new Exception(), 
tableInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createTableFailureEvent), 
AuditLog.Operation.CREATE_TABLE);
+
+    Event createFilesetEvent = new CreateFilesetEvent(USER, filesetIdentifier, 
filesetInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createFilesetEvent), 
AuditLog.Operation.CREATE_FILESET);
+    Event createFilesetFailureEvent =
+        new CreateFilesetFailureEvent(USER, filesetIdentifier, new 
Exception(), filesetInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createFilesetFailureEvent), 
AuditLog.Operation.CREATE_FILESET);
+
+    Event createTopicEvent = new CreateTopicEvent(USER, topicIdentifier, 
topicInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createTopicEvent), 
AuditLog.Operation.CREATE_TOPIC);
+    Event createTopicFailureEvent =
+        new CreateTopicFailureEvent(USER, topicIdentifier, new Exception(), 
topicInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(createTopicFailureEvent), 
AuditLog.Operation.CREATE_TOPIC);
+  }
+
+  @Test
+  public void testAlterOperation() {
+    Event alterMetalakeEvent =
+        new AlterMetalakeEvent(USER, metalakeIdentifier, new MetalakeChange[] 
{}, metalakeInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterMetalakeEvent), 
AuditLog.Operation.ALTER_METALAKE);
+    Event alterMetalakeFailureEvent =
+        new AlterMetalakeFailureEvent(
+            USER, metalakeIdentifier, new Exception(), new MetalakeChange[] 
{});
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterMetalakeFailureEvent), 
AuditLog.Operation.ALTER_METALAKE);
+
+    Event alterCatalogEvent =
+        new AlterCatalogEvent(USER, schemaIdentifier, new CatalogChange[] {}, 
catalogInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterCatalogEvent), 
AuditLog.Operation.ALTER_CATALOG);
+    Event alterCatalogFailureEvent =
+        new AlterCatalogFailureEvent(
+            USER, catalogIdentifier, new Exception(), new CatalogChange[] {});
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterCatalogFailureEvent), 
AuditLog.Operation.ALTER_CATALOG);
+
+    Event alterSchemaEvent =
+        new AlterSchemaEvent(USER, schemaIdentifier, new SchemaChange[] {}, 
schemaInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterSchemaEvent), 
AuditLog.Operation.ALTER_SCHEMA);
+    Event alterSchemaFailureEvent =
+        new AlterSchemaFailureEvent(USER, schemaIdentifier, new Exception(), 
new SchemaChange[] {});
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterSchemaFailureEvent), 
AuditLog.Operation.ALTER_SCHEMA);
+
+    Event alterTableEvent =
+        new AlterTableEvent(USER, tableIdentifier, new TableChange[] {}, 
tableInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterTableEvent), 
AuditLog.Operation.ALTER_TABLE);
+    Event alterTableFailureEvent =
+        new AlterTableFailureEvent(USER, tableIdentifier, new Exception(), new 
TableChange[] {});
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterTableFailureEvent), 
AuditLog.Operation.ALTER_TABLE);
+
+    Event alterFilesetEvent =
+        new AlterFilesetEvent(USER, filesetIdentifier, new FilesetChange[] {}, 
filesetInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterFilesetEvent), 
AuditLog.Operation.ALTER_FILESET);
+    Event alterFilesetFailureEvent =
+        new AlterFilesetFailureEvent(
+            USER, filesetIdentifier, new Exception(), new FilesetChange[] {});
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterFilesetFailureEvent), 
AuditLog.Operation.ALTER_FILESET);
+
+    Event alterTopicEvent =
+        new AlterTopicEvent(USER, topicIdentifier, new TopicChange[] {}, 
topicInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterTopicEvent), 
AuditLog.Operation.ALTER_TOPIC);
+    Event alterTopicFailureEvent =
+        new AlterTopicFailureEvent(USER, topicIdentifier, new Exception(), new 
TopicChange[] {});
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(alterTopicFailureEvent), 
AuditLog.Operation.ALTER_TOPIC);
+  }
+
+  @Test
+  public void testDropOperation() {
+    Event dropMetalakeEvent = new DropMetalakeEvent(USER, metalakeIdentifier, 
true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropMetalakeEvent), 
AuditLog.Operation.DROP_METALAKE);
+    Event dropMetalakeFailureEvent = new DropMetalakeFailureEvent(USER, 
metalakeIdentifier, null);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropMetalakeFailureEvent), 
AuditLog.Operation.DROP_METALAKE);
+
+    Event dropCatalogEvent = new DropCatalogEvent(USER, catalogIdentifier, 
true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropCatalogEvent), 
AuditLog.Operation.DROP_CATALOG);
+    Event dropCatalogFailureEvent =
+        new DropCatalogFailureEvent(USER, catalogIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropCatalogFailureEvent), 
AuditLog.Operation.DROP_CATALOG);
+
+    Event dropSchemaEvent = new DropSchemaEvent(USER, schemaIdentifier, true, 
true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropSchemaEvent), 
AuditLog.Operation.DROP_SCHEMA);
+    Event dropSchemaFailureEvent =
+        new DropSchemaFailureEvent(USER, schemaIdentifier, new Exception(), 
true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropSchemaFailureEvent), 
AuditLog.Operation.DROP_SCHEMA);
+
+    Event dropTableEvent = new DropTableEvent(USER, tableIdentifier, true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropTableEvent), 
AuditLog.Operation.DROP_TABLE);
+    Event dropTableFailureEvent = new DropTableFailureEvent(USER, 
tableIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropTableFailureEvent), 
AuditLog.Operation.DROP_TABLE);
+
+    Event dropFilesetEvent = new DropFilesetEvent(USER, filesetIdentifier, 
true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropFilesetEvent), 
AuditLog.Operation.DROP_FILESET);
+    Event dropFilesetFailureEvent =
+        new DropFilesetFailureEvent(USER, filesetIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropFilesetFailureEvent), 
AuditLog.Operation.DROP_FILESET);
+
+    Event dropTopicEvent = new DropTopicEvent(USER, topicIdentifier, true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropTopicEvent), 
AuditLog.Operation.DROP_TOPIC);
+    Event dropTopicFailureEvent = new DropTopicFailureEvent(USER, 
topicIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(dropTopicFailureEvent), 
AuditLog.Operation.DROP_TOPIC);
+  }
+
+  @Test
+  public void testPurgeOperation() {
+    Event purgeMetalakeEvent =
+        new PurgePartitionEvent(USER, metalakeIdentifier, true, 
PARTITION_NAME);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(purgeMetalakeEvent), 
AuditLog.Operation.PURGE_PARTITION);
+    Event purgeMetalakeFailureEvent =
+        new PurgePartitionFailureEvent(USER, partitionIdentifier, new 
Exception(), PARTITION_NAME);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(purgeMetalakeFailureEvent),
+        AuditLog.Operation.PURGE_PARTITION);
+
+    Event purgeTableEvent = new PurgeTableEvent(USER, tableIdentifier, true);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(purgeTableEvent), 
AuditLog.Operation.PURGE_TABLE);
+    Event purgePartitionFailureEvent =
+        new PurgePartitionFailureEvent(USER, partitionIdentifier, new 
Exception(), PARTITION_NAME);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(purgePartitionFailureEvent),
+        AuditLog.Operation.PURGE_PARTITION);
+  }
+
+  @Test
+  public void testGetOperation() {
+    Event getFileLocationEvent =
+        new GetFileLocationEvent(USER, filesetIdentifier, "location", 
"subPath", new HashMap<>());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(getFileLocationEvent), 
AuditLog.Operation.GET_FILE_LOCATION);
+    Event getFilesetFailureEvent =
+        new GetFileLocationFailureEvent(USER, filesetIdentifier, "subPath", 
new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(getFilesetFailureEvent), 
AuditLog.Operation.GET_FILE_LOCATION);
+
+    Event getPartitionEvent = new GetPartitionEvent(USER, partitionIdentifier, 
partitionInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(getPartitionEvent), 
AuditLog.Operation.GET_PARTITION);
+    Event getPartitionFailureEvent =
+        new GetPartitionFailureEvent(USER, partitionIdentifier, new 
Exception(), PARTITION_NAME);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(getPartitionFailureEvent), 
AuditLog.Operation.GET_PARTITION);
+  }
+
+  @Test
+  public void testListOperation() {
+    Namespace namespace = Namespace.of("metalake", "catalog");
+    Event listCatalogEvent = new ListCatalogEvent(USER, namespace);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listCatalogEvent), 
AuditLog.Operation.LIST_CATALOG);
+    Event listCatalogFailureEvent = new ListCatalogFailureEvent(USER, new 
Exception(), namespace);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listCatalogFailureEvent), 
AuditLog.Operation.LIST_CATALOG);
+
+    Event listSchemaEvent = new ListSchemaEvent(USER, namespace);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listSchemaEvent), 
AuditLog.Operation.LIST_SCHEMA);
+    Event listSchemaFailureEvent = new ListSchemaFailureEvent(USER, namespace, 
new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listSchemaFailureEvent), 
AuditLog.Operation.LIST_SCHEMA);
+
+    Event listTableEvent = new ListTableEvent(USER, namespace);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listTableEvent), 
AuditLog.Operation.LIST_TABLE);
+    Event listTableFailureEvent = new ListTableFailureEvent(USER, namespace, 
new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listTableFailureEvent), 
AuditLog.Operation.LIST_TABLE);
+
+    Event listTopicEvent = new ListTopicEvent(USER, namespace);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listTopicEvent), 
AuditLog.Operation.LIST_TOPIC);
+    Event listTopicFailureEvent = new ListTopicFailureEvent(USER, namespace, 
new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listTopicFailureEvent), 
AuditLog.Operation.LIST_TOPIC);
+
+    Event listFilesetEvent = new ListFilesetEvent(USER, namespace);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listFilesetEvent), 
AuditLog.Operation.LIST_FILESET);
+    Event listFilesetFailureEvent = new ListFilesetFailureEvent(USER, 
namespace, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listFilesetFailureEvent), 
AuditLog.Operation.LIST_FILESET);
+
+    Event listPartitionEvent = new ListPartitionEvent(USER, 
partitionIdentifier);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listPartitionEvent), 
AuditLog.Operation.LIST_PARTITION);
+    Event listPartitionFailureEvent =
+        new ListPartitionFailureEvent(USER, partitionIdentifier, new 
Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(listPartitionFailureEvent), 
AuditLog.Operation.LIST_PARTITION);
+  }
+
+  @Test
+  public void testLoadOperation() {
+    Event loadMetalakeEvent = new LoadMetalakeEvent(USER, metalakeIdentifier, 
metalakeInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadMetalakeEvent), 
AuditLog.Operation.LOAD_METALAKE);
+    Event loadMetalakeFailureEvent =
+        new LoadMetalakeFailureEvent(USER, metalakeIdentifier, new 
Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadMetalakeFailureEvent), 
AuditLog.Operation.LOAD_METALAKE);
+
+    Event loadCatalogEvent = new LoadCatalogEvent(USER, catalogIdentifier, 
catalogInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadCatalogEvent), 
AuditLog.Operation.LOAD_CATALOG);
+    Event loadCatalogFailureEvent =
+        new LoadCatalogFailureEvent(USER, catalogIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadCatalogFailureEvent), 
AuditLog.Operation.LOAD_CATALOG);
+
+    Event loadSchemaEvent = new LoadSchemaEvent(USER, schemaIdentifier, 
schemaInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadSchemaEvent), 
AuditLog.Operation.LOAD_SCHEMA);
+    Event loadSchemaFailureEvent =
+        new LoadSchemaFailureEvent(USER, schemaIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadSchemaFailureEvent), 
AuditLog.Operation.LOAD_SCHEMA);
+
+    Event loadTableEvent = new LoadTableEvent(USER, tableIdentifier, 
tableInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadTableEvent), 
AuditLog.Operation.LOAD_TABLE);
+    Event loadTableFailureEvent = new LoadTableFailureEvent(USER, 
tableIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadTableFailureEvent), 
AuditLog.Operation.LOAD_TABLE);
+
+    Event loadFilesetEvent = new LoadFilesetEvent(USER, filesetIdentifier, 
filesetInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadFilesetEvent), 
AuditLog.Operation.LOAD_FILESET);
+    Event loadFilesetFailureEvent =
+        new LoadFilesetFailureEvent(USER, filesetIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadFilesetFailureEvent), 
AuditLog.Operation.LOAD_FILESET);
+
+    Event loadTopicEvent = new LoadTopicEvent(USER, topicIdentifier, 
topicInfo);
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadTopicEvent), 
AuditLog.Operation.LOAD_TOPIC);
+    Event loadTopicFailureEvent = new LoadTopicFailureEvent(USER, 
topicIdentifier, new Exception());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(loadTopicFailureEvent), 
AuditLog.Operation.LOAD_TOPIC);
+  }
+
+  @Test
+  public void testExistsOperation() {
+    Event partitionExistsEvent =
+        new PartitionExistsEvent(USER, partitionIdentifier, true, 
partitionIdentifier.name());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(partitionExistsEvent), 
AuditLog.Operation.PARTITION_EXIST);
+
+    Event partitionExistsFailureEvent =
+        new PartitionExistsEvent(USER, partitionIdentifier, true, 
partitionIdentifier.name());
+    Assertions.assertEquals(
+        AuditLog.Operation.fromEvent(partitionExistsFailureEvent),
+        AuditLog.Operation.PARTITION_EXIST);
+  }
+
+  private NameIdentifier mockMetalakeIdentifier() {
+    return NameIdentifier.of("metalake");
+  }
+
+  private MetalakeInfo mockMetalakeInfo() {
+    return new MetalakeInfo("metalake", "comment", ImmutableMap.of("a", "b"), 
null);
+  }
+
+  private NameIdentifier mockCatalogIndentifier() {
+    return NameIdentifier.of("metalake", "catalog");
+  }
+
+  private CatalogInfo mockCatalogInfo() {
+    return new CatalogInfo(
+        "catalog", Catalog.Type.RELATIONAL, "hive", "comment", 
ImmutableMap.of("a", "b"), null);
+  }
+
+  private NameIdentifier mockSchemaIdentifier() {
+    return NameIdentifier.of("metalake", "catalog", "schema");
+  }
+
+  private SchemaInfo mockSchemaInfo() {
+    return new SchemaInfo("schema", "comment", ImmutableMap.of("a", "b"), 
null);
+  }
+
+  private NameIdentifier mockTableIdentifier() {
+    return NameIdentifier.of("metalake", "catalog", "table");
+  }
+
+  private TableInfo mockTableInfo() {
+    return new TableInfo(
+        "table",
+        new Column[] {Column.of("a", Types.IntegerType.get())},
+        "comment",
+        ImmutableMap.of("a", "b"),
+        new Transform[] {Transforms.identity("a")},
+        Distributions.of(Strategy.HASH, 10, NamedReference.field("a")),
+        new SortOrder[] {SortOrders.ascending(NamedReference.field("a"))},
+        new Index[] {Indexes.primary("p", new String[][] {{"a"}, {"b"}})},
+        null);
+  }
+
+  private NameIdentifier mockFilesetIdentifier() {
+    return NameIdentifier.of("metalake", "catalog", "fileset");
+  }
+
+  private FilesetInfo mockFilesetInfo() {
+    return new FilesetInfo(
+        "fileset", "comment", Fileset.Type.MANAGED, "location", 
ImmutableMap.of("a", "b"), null);
+  }
+
+  private NameIdentifier mockTopicIdentifier() {
+    return NameIdentifier.of("metalake", "catalog", "topic");
+  }
+
+  private TopicInfo mockTopicInfo() {
+    return new TopicInfo("topic", "comment", ImmutableMap.of("a", "b"), null);
+  }
+
+  private NameIdentifier mockPartitionIdentifier() {
+    return NameIdentifier.of("metalake", "catalog", "schema", "table", 
PARTITION_NAME);
+  }
+
+  private PartitionInfo mockPartitionInfo() {
+    return new IdentityPartitionInfo(
+        PARTITION_NAME,
+        new String[][] {{"dt"}, {"country"}},
+        new Literal[] {
+          Literals.dateLiteral(LocalDate.parse("2008-08-08")), 
Literals.stringLiteral("us")
+        },
+        ImmutableMap.of("location", 
"/user/hive/warehouse/tpch_flat_orc_2.db/orders"));
+  }
+}
diff --git a/docs/gravitino-server-config.md b/docs/gravitino-server-config.md
index efc842813..9d319d289 100644
--- a/docs/gravitino-server-config.md
+++ b/docs/gravitino-server-config.md
@@ -138,6 +138,33 @@ The plugin provides several operational modes for how to 
process event, supporti
 
 For more details, please refer to the definition of the plugin.
 
+### Audit log configuration
+
+The audit log framework defines how audit logs are formatted and written to 
various storages. The formatter defines an interface that transforms different 
`Event` types into a unified `AuditLog`. The writer defines an interface to 
writing AuditLog to different storages.
+
+Gravitino provides a default implement to log basic audit information to a 
file, you could extend the audit system by implementation corresponding 
interfaces.
+
+| Property name                         | Description                          
  | Default value                               | Required | Since Version      
        |
+|---------------------------------------|----------------------------------------|---------------------------------------------|----------|----------------------------|
+| `gravitino.audit.enabled`             | The audit log enable flag.           
  | false                                       | NO       | 0.7.0-incubating   
        |
+| `gravitino.audit.writer.className`    | The class name of audit log writer.  
  | org.apache.gravitino.audit.FileAuditWriter  | NO       | 0.7.0-incubating   
        | 
+| `gravitino.audit.formatter.className` | The class name of audit log 
formatter. | org.apache.gravitino.audit.SimpleFormatter  | NO       | 
0.7.0-incubating           | 
+
+#### Audit log formatter
+
+The Formatter defines an interface that formats metadata audit logs into a 
unified format. `SimpleFormatter` is a default implement to format audit 
information, you don't need to do extra configurations.
+
+#### Audit log writer
+
+The `AuditLogWriter` defines an interface that enables the writing of metadata 
audit logs to different storage mediums such as files, databases, etc.
+
+Writer configuration begins with `gravitino.audit.writer.${name}`, where 
${name} is replaced with the actual writer name defined in method `name()`. 
`FileAuditWriter` is a default implement to log audit information, whose name 
is `file`.
+
+| Property name                                       | Description            
                                                                 | Default 
value       | Required | Since Version              |
+|-----------------------------------------------------|-----------------------------------------------------------------------------------------|---------------------|----------|----------------------------|
+| `gravitino.audit.writer.file.fileName`              | The audit log file 
name, the path is ${sys:gravitino.log.path}/${fileName}.             | 
gravitino_audit.log | NO       | 0.7.0-incubating           |
+| `gravitino.audit.writer.file.append`                | Whether the log will 
be written to the end or the beginning of the file.                | true       
         | NO       | 0.7.0-incubating           |
+
 ### Security configuration
 
 Refer to [security](security/security.md) for HTTPS and authentication 
configurations.

Reply via email to