This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new dffff6b43 [#3973] improvement(core):support audit log (#4575)
dffff6b43 is described below
commit dffff6b43913152e1594ba4afa012cc4861b50a7
Author: hanwxx <[email protected]>
AuthorDate: Mon Oct 21 22:02:27 2024 +0800
[#3973] improvement(core):support audit log (#4575)
### What changes were proposed in this pull request?
Design a gravitino audit framework.
### Why are the changes needed?
fixes: #3973
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. Add UT
---------
Co-authored-by: han <[email protected]>
Co-authored-by: Qi Yu <[email protected]>
---
.../main/java/org/apache/gravitino/Configs.java | 25 +
.../java/org/apache/gravitino/GravitinoEnv.java | 6 +
.../java/org/apache/gravitino/audit/AuditLog.java | 299 +++++++++++
.../apache/gravitino/audit/AuditLogManager.java | 121 +++++
.../org/apache/gravitino/audit/AuditLogWriter.java | 65 +++
.../apache/gravitino/audit/FileAuditWriter.java | 99 ++++
.../java/org/apache/gravitino/audit/Formatter.java | 34 ++
.../org/apache/gravitino/audit/SimpleAuditLog.java | 76 +++
.../apache/gravitino/audit/SimpleFormatter.java | 40 ++
.../gravitino/listener/EventListenerManager.java | 4 +
.../gravitino/audit/DummyAuditFormatter.java | 38 ++
.../org/apache/gravitino/audit/DummyAuditLog.java | 67 +++
.../apache/gravitino/audit/DummyAuditWriter.java | 53 ++
.../apache/gravitino/audit/TestAuditManager.java | 247 +++++++++
.../org/apache/gravitino/audit/TestOperation.java | 572 +++++++++++++++++++++
docs/gravitino-server-config.md | 27 +
16 files changed, 1773 insertions(+)
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index 587d85ada..81388608d 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -23,6 +23,8 @@ import java.io.File;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.audit.FileAuditWriter;
+import org.apache.gravitino.audit.SimpleFormatter;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
@@ -342,4 +344,27 @@ public class Configs {
.stringConf()
.toSequence()
.createWithDefault(Collections.emptyList());
+
+ public static final String AUDIT_LOG_WRITER_CONFIG_PREFIX =
"gravitino.audit.writer.";
+
+ public static final ConfigEntry<Boolean> AUDIT_LOG_ENABLED_CONF =
+ new ConfigBuilder("gravitino.audit.enable")
+ .doc("Gravitino audit log enable flag")
+ .version(ConfigConstants.VERSION_0_7_0)
+ .booleanConf()
+ .createWithDefault(false);
+
+ public static final ConfigEntry<String> AUDIT_LOG_WRITER_CLASS_NAME =
+ new ConfigBuilder("gravitino.audit.writer.className")
+ .doc("Gravitino audit log writer class name")
+ .version(ConfigConstants.VERSION_0_7_0)
+ .stringConf()
+ .createWithDefault(FileAuditWriter.class.getName());
+
+ public static final ConfigEntry<String> AUDIT_LOG_FORMATTER_CLASS_NAME =
+ new ConfigBuilder("gravitino.audit.formatter.className")
+ .doc("Gravitino event log formatter class name")
+ .version(ConfigConstants.VERSION_0_7_0)
+ .stringConf()
+ .createWithDefault(SimpleFormatter.class.getName());
}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 77d76b474..516e9d9d3 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -19,6 +19,7 @@
package org.apache.gravitino;
import com.google.common.base.Preconditions;
+import org.apache.gravitino.audit.AuditLogManager;
import org.apache.gravitino.authorization.AccessControlDispatcher;
import org.apache.gravitino.authorization.AccessControlManager;
import org.apache.gravitino.authorization.FutureGrantManager;
@@ -109,6 +110,8 @@ public class GravitinoEnv {
private EventListenerManager eventListenerManager;
+ private AuditLogManager auditLogManager;
+
private TagManager tagManager;
private EventBus eventBus;
private OwnerManager ownerManager;
@@ -356,6 +359,9 @@ public class GravitinoEnv {
eventListenerManager.init(
config.getConfigsWithPrefix(EventListenerManager.GRAVITINO_EVENT_LISTENER_PREFIX));
this.eventBus = eventListenerManager.createEventBus();
+
+ this.auditLogManager = new AuditLogManager();
+ auditLogManager.init(config, eventListenerManager);
}
private void initGravitinoServerComponents() {
diff --git a/core/src/main/java/org/apache/gravitino/audit/AuditLog.java
b/core/src/main/java/org/apache/gravitino/audit/AuditLog.java
new file mode 100644
index 000000000..86286c0f6
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/AuditLog.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import org.apache.gravitino.listener.api.event.AlterCatalogEvent;
+import org.apache.gravitino.listener.api.event.AlterCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTableEvent;
+import org.apache.gravitino.listener.api.event.AlterTableFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTableEvent;
+import org.apache.gravitino.listener.api.event.CreateTableFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTableEvent;
+import org.apache.gravitino.listener.api.event.DropTableFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTopicEvent;
+import org.apache.gravitino.listener.api.event.DropTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.GetFileLocationEvent;
+import org.apache.gravitino.listener.api.event.GetFileLocationFailureEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.ListMetalakeEvent;
+import org.apache.gravitino.listener.api.event.ListMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTableEvent;
+import org.apache.gravitino.listener.api.event.ListTableFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTopicEvent;
+import org.apache.gravitino.listener.api.event.ListTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTableEvent;
+import org.apache.gravitino.listener.api.event.LoadTableFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.PartitionExistsEvent;
+import org.apache.gravitino.listener.api.event.PartitionExistsFailureEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.PurgeTableEvent;
+import org.apache.gravitino.listener.api.event.PurgeTableFailureEvent;
+
+/** The interface define unified audit log schema. */
+public interface AuditLog {
+ /**
+ * The user who do the operation.
+ *
+ * @return user name.
+ */
+ String user();
+
+ /**
+ * The operation name.
+ *
+ * @return operation name.
+ */
+ Operation operation();
+
+ /**
+ * The identifier of the resource.
+ *
+ * @return resource identifier name.
+ */
+ String identifier();
+
+ /**
+ * The timestamp of the operation.
+ *
+ * @return operation timestamp.
+ */
+ long timestamp();
+
+ /**
+ * The status of the operation.
+ *
+ * @return operation status.
+ */
+ Status status();
+
+ /** Define user metadata operation. */
+ enum Operation {
+ CREATE_METALAKE,
+
+ ALTER_METALAKE,
+
+ DROP_METALAKE,
+
+ LOAD_METALAKE,
+
+ LIST_METALAKE,
+
+ CREATE_CATALOG,
+
+ LOAD_CATALOG,
+
+ ALTER_CATALOG,
+
+ DROP_CATALOG,
+
+ LIST_CATALOG,
+
+ CREATE_SCHEMA,
+
+ ALTER_SCHEMA,
+
+ DROP_SCHEMA,
+
+ LOAD_SCHEMA,
+
+ LIST_SCHEMA,
+
+ CREATE_TABLE,
+
+ ALTER_TABLE,
+
+ DROP_TABLE,
+
+ LOAD_TABLE,
+
+ PURGE_TABLE,
+
+ LIST_TABLE,
+
+ PARTITION_EXIST,
+
+ PURGE_PARTITION,
+
+ LIST_PARTITION,
+
+ GET_PARTITION,
+
+ CREATE_TOPIC,
+
+ ALTER_TOPIC,
+
+ DROP_TOPIC,
+
+ LOAD_TOPIC,
+
+ LIST_TOPIC,
+
+ GET_FILE_LOCATION,
+
+ CREATE_FILESET,
+
+ ALTER_FILESET,
+
+ DROP_FILESET,
+
+ LOAD_FILESET,
+
+ LIST_FILESET,
+
+ UNKNOWN_OPERATION;
+
+ public static Operation fromEvent(Event event) {
+ if (event instanceof CreateMetalakeEvent || event instanceof
CreateMetalakeFailureEvent) {
+ return CREATE_METALAKE;
+ } else if (event instanceof AlterMetalakeEvent
+ || event instanceof AlterMetalakeFailureEvent) {
+ return ALTER_METALAKE;
+ } else if (event instanceof DropMetalakeEvent || event instanceof
DropMetalakeFailureEvent) {
+ return DROP_METALAKE;
+ } else if (event instanceof LoadMetalakeEvent || event instanceof
LoadMetalakeFailureEvent) {
+ return LOAD_METALAKE;
+ } else if (event instanceof ListMetalakeEvent || event instanceof
ListMetalakeFailureEvent) {
+ return LIST_METALAKE;
+ } else if (event instanceof CreateCatalogEvent
+ || event instanceof CreateCatalogFailureEvent) {
+ return CREATE_CATALOG;
+ } else if (event instanceof AlterCatalogEvent || event instanceof
AlterCatalogFailureEvent) {
+ return ALTER_CATALOG;
+ } else if (event instanceof DropCatalogEvent || event instanceof
DropCatalogFailureEvent) {
+ return DROP_CATALOG;
+ } else if (event instanceof LoadCatalogEvent || event instanceof
LoadCatalogFailureEvent) {
+ return LOAD_CATALOG;
+ } else if (event instanceof ListCatalogEvent || event instanceof
ListCatalogFailureEvent) {
+ return LIST_CATALOG;
+ } else if (event instanceof CreateSchemaEvent || event instanceof
CreateSchemaFailureEvent) {
+ return CREATE_SCHEMA;
+ } else if (event instanceof AlterSchemaEvent || event instanceof
AlterSchemaFailureEvent) {
+ return ALTER_SCHEMA;
+ } else if (event instanceof DropSchemaEvent || event instanceof
DropSchemaFailureEvent) {
+ return DROP_SCHEMA;
+ } else if (event instanceof LoadSchemaEvent || event instanceof
LoadSchemaFailureEvent) {
+ return LOAD_SCHEMA;
+ } else if (event instanceof ListSchemaEvent || event instanceof
ListSchemaFailureEvent) {
+ return LIST_SCHEMA;
+ } else if (event instanceof CreateTableEvent || event instanceof
CreateTableFailureEvent) {
+ return CREATE_TABLE;
+ } else if (event instanceof AlterTableEvent || event instanceof
AlterTableFailureEvent) {
+ return ALTER_TABLE;
+ } else if (event instanceof DropTableEvent || event instanceof
DropTableFailureEvent) {
+ return DROP_TABLE;
+ } else if (event instanceof LoadTableEvent || event instanceof
LoadTableFailureEvent) {
+ return LOAD_TABLE;
+ } else if (event instanceof PurgeTableEvent || event instanceof
PurgeTableFailureEvent) {
+ return PURGE_TABLE;
+ } else if (event instanceof ListTableEvent || event instanceof
ListTableFailureEvent) {
+ return LIST_TABLE;
+ } else if (event instanceof PartitionExistsEvent
+ || event instanceof PartitionExistsFailureEvent) {
+ return PARTITION_EXIST;
+ } else if (event instanceof PurgePartitionEvent
+ || event instanceof PurgePartitionFailureEvent) {
+ return PURGE_PARTITION;
+ } else if (event instanceof ListPartitionEvent
+ || event instanceof ListPartitionFailureEvent) {
+ return LIST_PARTITION;
+ } else if (event instanceof GetPartitionEvent || event instanceof
GetPartitionFailureEvent) {
+ return GET_PARTITION;
+ } else if (event instanceof CreateTopicEvent || event instanceof
CreateTopicFailureEvent) {
+ return CREATE_TOPIC;
+ } else if (event instanceof AlterTopicEvent || event instanceof
AlterTopicFailureEvent) {
+ return ALTER_TOPIC;
+ } else if (event instanceof DropTopicEvent || event instanceof
DropTopicFailureEvent) {
+ return DROP_TOPIC;
+ } else if (event instanceof LoadTopicEvent || event instanceof
LoadTopicFailureEvent) {
+ return LOAD_TOPIC;
+ } else if (event instanceof ListTopicEvent || event instanceof
ListTopicFailureEvent) {
+ return LIST_TOPIC;
+ } else if (event instanceof CreateFilesetEvent
+ || event instanceof CreateFilesetFailureEvent) {
+ return CREATE_FILESET;
+ } else if (event instanceof AlterFilesetEvent || event instanceof
AlterFilesetFailureEvent) {
+ return ALTER_FILESET;
+ } else if (event instanceof DropFilesetEvent || event instanceof
DropFilesetFailureEvent) {
+ return DROP_FILESET;
+ } else if (event instanceof GetFileLocationEvent
+ || event instanceof GetFileLocationFailureEvent) {
+ return GET_FILE_LOCATION;
+ } else if (event instanceof LoadFilesetEvent || event instanceof
LoadFilesetFailureEvent) {
+ return LOAD_FILESET;
+ } else if (event instanceof ListFilesetEvent || event instanceof
ListFilesetFailureEvent) {
+ return LIST_FILESET;
+ } else {
+ return UNKNOWN_OPERATION;
+ }
+ }
+ }
+
+ enum Status {
+ SUCCESS,
+ FAILURE
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/AuditLogManager.java
b/core/src/main/java/org/apache/gravitino/audit/AuditLogManager.java
new file mode 100644
index 000000000..ac9d24bd8
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/AuditLogManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * AuditLogManager is responsible for initializing the audit log writer and
formatter,
+ * which are used to write metadata audit logs.
+ * */
+public class AuditLogManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditLogManager.class);
+
+ @VisibleForTesting private AuditLogWriter auditLogWriter;
+
+ public void init(Config config, EventListenerManager eventBusManager) {
+ if (!config.get(Configs.AUDIT_LOG_ENABLED_CONF)) {
+ LOG.info("Audit log is not enabled");
+ return;
+ }
+ String formatterClassName =
config.get(Configs.AUDIT_LOG_FORMATTER_CLASS_NAME);
+ Formatter formatter = loadFormatter(formatterClassName);
+ LOG.info("Audit log formatter class name:{}", formatterClassName);
+
+ String writerClassName = config.get(Configs.AUDIT_LOG_WRITER_CLASS_NAME);
+ auditLogWriter =
+ loadAuditLogWriter(
+ writerClassName,
+
config.getConfigsWithPrefix(Configs.AUDIT_LOG_WRITER_CONFIG_PREFIX),
+ formatter);
+ LOG.info("Audit log writer class name:{}", writerClassName);
+
+ eventBusManager.addEventListener(
+ "audit-log",
+ new EventListenerPlugin() {
+
+ @Override
+ public void init(Map<String, String> properties) throws
RuntimeException {}
+
+ @Override
+ public void start() throws RuntimeException {}
+
+ @Override
+ public void stop() throws RuntimeException {
+ try {
+ auditLogWriter.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onPostEvent(Event event) throws RuntimeException {
+ try {
+ auditLogWriter.write(event);
+ } catch (Exception e) {
+ LOG.warn("Failed to write audit log {}.", event, e);
+ }
+ }
+
+ @Override
+ public Mode mode() {
+ return Mode.ASYNC_ISOLATED;
+ }
+ });
+ }
+
+ private AuditLogWriter loadAuditLogWriter(
+ String className, Map<String, String> config, Formatter formatter) {
+ try {
+ AuditLogWriter auditLogWriter =
+ (AuditLogWriter)
Class.forName(className).getDeclaredConstructor().newInstance();
+ Map<String, String> writerConfig = MapUtils.getPrefixMap(config,
auditLogWriter.name() + ".");
+ auditLogWriter.init(formatter, writerConfig);
+ return auditLogWriter;
+ } catch (Exception e) {
+ throw new GravitinoRuntimeException(e, "Failed to load audit log writer
%s", className);
+ }
+ }
+
+ private Formatter loadFormatter(String className) {
+ try {
+ return (Formatter)
Class.forName(className).getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new GravitinoRuntimeException(e, "Failed to load formatter class
name %s", className);
+ }
+ }
+
+ AuditLogWriter getAuditLogWriter() {
+ return auditLogWriter;
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/AuditLogWriter.java
b/core/src/main/java/org/apache/gravitino/audit/AuditLogWriter.java
new file mode 100644
index 000000000..d68fc72c9
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/AuditLogWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.listener.api.event.Event;
+
+/**
+ * Interface for writing the audit log, which can write to different storage,
such as file,
+ * database,mq.
+ */
+public interface AuditLogWriter extends Closeable {
+
+ /** @return formatter. */
+ Formatter getFormatter();
+
+ /**
+ * Initialize the writer with the given configuration.
+ *
+ * @param properties
+ */
+ void init(Formatter formatter, Map<String, String> properties);
+
+ /**
+ * Write the audit event to storage.
+ *
+ * @param auditLog
+ */
+ void doWrite(AuditLog auditLog);
+
+ /**
+ * Write the audit event to storage.
+ *
+ * @param event
+ */
+ default void write(Event event) {
+ doWrite(getFormatter().format(event));
+ }
+
+ /**
+ * Define the name of the writer, which related to audit writer
configuration. Audit log writer
+ * configuration start with: gravitino.audit.log.writer.${name}.*
+ *
+ * @return
+ */
+ String name();
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/FileAuditWriter.java
b/core/src/main/java/org/apache/gravitino/audit/FileAuditWriter.java
new file mode 100644
index 000000000..cc3d93078
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/FileAuditWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DefaultFileAuditWriter is the default implementation of AuditLogWriter,
which writes audit logs
+ * to a file.
+ */
+public class FileAuditWriter implements AuditLogWriter {
+ private static final Logger Log =
LoggerFactory.getLogger(FileAuditWriter.class);
+
+ public static final String AUDIT_LOG_FILE_NAME = "fileName";
+
+ public static final String APPEND = "append";
+
+ public static final String LINE_SEPARATOR = System.lineSeparator();
+
+ Formatter formatter;
+ @VisibleForTesting Writer outWriter;
+ @VisibleForTesting String fileName;
+
+ boolean append;
+
+ @Override
+ public Formatter getFormatter() {
+ return formatter;
+ }
+
+ @Override
+ public void init(Formatter formatter, Map<String, String> properties) {
+ this.formatter = formatter;
+ this.fileName =
+ System.getProperty("gravitino.log.path")
+ + "/"
+ + properties.getOrDefault(AUDIT_LOG_FILE_NAME,
"gravitino_audit.log");
+ this.append = Boolean.parseBoolean(properties.getOrDefault(APPEND,
"true"));
+ try {
+ OutputStream outputStream = new FileOutputStream(fileName, append);
+ this.outWriter = new OutputStreamWriter(outputStream,
StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new GravitinoRuntimeException(
+ e, "Init audit log writer fail, filename is %s", fileName);
+ }
+ }
+
+ @Override
+ public void doWrite(AuditLog auditLog) {
+ String log = auditLog.toString();
+ try {
+ outWriter.write(log + LINE_SEPARATOR);
+ } catch (Exception e) {
+ Log.warn("Failed to write audit log: {}", log, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (outWriter != null) {
+ try {
+ outWriter.close();
+ } catch (Exception e) {
+ Log.warn("Failed to close writer", e);
+ }
+ }
+ }
+
+ @Override
+ public String name() {
+ return "file";
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/Formatter.java
b/core/src/main/java/org/apache/gravitino/audit/Formatter.java
new file mode 100644
index 000000000..3650ffb02
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/Formatter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import org.apache.gravitino.listener.api.event.Event;
+
+/** The interface defined the conversions of metadata change event to unified
log format. */
+public interface Formatter {
+
+ /**
+ * Format the event, returning the unified audit log format.
+ *
+ * @param event The event to format.
+ * @return The formatted AuditLog.
+ */
+ AuditLog format(Event event);
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/SimpleAuditLog.java
b/core/src/main/java/org/apache/gravitino/audit/SimpleAuditLog.java
new file mode 100644
index 000000000..75ee08aad
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/SimpleAuditLog.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import java.text.SimpleDateFormat;
+import javax.annotation.Nullable;
+import lombok.Builder;
+
+/** The default implementation of the audit log. */
+@Builder
+public class SimpleAuditLog implements AuditLog {
+
+ private String user;
+
+ private Operation operation;
+
+ private String identifier;
+
+ private long timestamp;
+
+ private Status status;
+
+ @Override
+ public String user() {
+ return user;
+ }
+
+ @Override
+ public Operation operation() {
+ return operation;
+ }
+
+ @Override
+ @Nullable
+ public String identifier() {
+ return identifier;
+ }
+
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public Status status() {
+ return status;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "[%s]\t%s\t%s\t%s\t%s",
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timestamp),
+ user,
+ operation,
+ identifier,
+ status);
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/audit/SimpleFormatter.java
b/core/src/main/java/org/apache/gravitino/audit/SimpleFormatter.java
new file mode 100644
index 000000000..7b54d460e
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/audit/SimpleFormatter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import org.apache.gravitino.audit.AuditLog.Status;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+
+/** The default formatter implementation of the audit log. */
+public class SimpleFormatter implements Formatter {
+
+ @Override
+ public SimpleAuditLog format(Event event) {
+ Status status = event instanceof FailureEvent ? Status.FAILURE :
Status.SUCCESS;
+ return SimpleAuditLog.builder()
+ .user(event.user())
+ .operation(AuditLog.Operation.fromEvent(event))
+ .identifier(event.identifier() != null ? event.identifier().toString()
: null)
+ .timestamp(event.eventTime())
+ .status(status)
+ .build();
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
index ebe08637f..e018f8c13 100644
--- a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
+++ b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
@@ -99,6 +99,10 @@ public class EventListenerManager {
return new EventBus(eventListeners);
}
+ public void addEventListener(String listenerName, EventListenerPlugin
listener) {
+ eventListeners.add(new EventListenerPluginWrapper(listenerName, listener));
+ }
+
private List<EventListenerPlugin> assembleEventListeners(
Map<String, EventListenerPlugin> userEventListeners) {
List<EventListenerPlugin> sharedQueueListeners = new ArrayList<>();
diff --git
a/core/src/test/java/org/apache/gravitino/audit/DummyAuditFormatter.java
b/core/src/test/java/org/apache/gravitino/audit/DummyAuditFormatter.java
new file mode 100644
index 000000000..32094cd33
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/DummyAuditFormatter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import static org.apache.gravitino.audit.AuditLog.Status;
+
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+
+public class DummyAuditFormatter implements Formatter {
+ @Override
+ public DummyAuditLog format(Event event) {
+ return DummyAuditLog.builder()
+ .user(event.user())
+ .operation(AuditLog.Operation.fromEvent(event))
+ .identifier(event.identifier() != null ? event.identifier().toString()
: null)
+ .timestamp(event.eventTime())
+ .status(event instanceof FailureEvent ? Status.FAILURE :
Status.SUCCESS)
+ .build();
+ }
+}
diff --git a/core/src/test/java/org/apache/gravitino/audit/DummyAuditLog.java
b/core/src/test/java/org/apache/gravitino/audit/DummyAuditLog.java
new file mode 100644
index 000000000..d75565b22
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/DummyAuditLog.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+@Builder
+@ToString
+@Getter
+@EqualsAndHashCode
+public class DummyAuditLog implements AuditLog {
+
+ private String user;
+
+ private Operation operation;
+
+ private String identifier;
+
+ private long timestamp;
+
+ private Status status;
+
+ @Override
+ public String user() {
+ return user;
+ }
+
+ @Override
+ public Operation operation() {
+ return operation;
+ }
+
+ @Override
+ public String identifier() {
+ return identifier;
+ }
+
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public Status status() {
+ return status;
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/audit/DummyAuditWriter.java
b/core/src/test/java/org/apache/gravitino/audit/DummyAuditWriter.java
new file mode 100644
index 000000000..a6ca2d627
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/DummyAuditWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import java.util.LinkedList;
+import java.util.Map;
+import lombok.Getter;
+
+public class DummyAuditWriter implements AuditLogWriter {
+ private Formatter formatter;
+
+ @Getter private final LinkedList<DummyAuditLog> auditLogs = new
LinkedList<>();
+
+ @Override
+ public Formatter getFormatter() {
+ return formatter;
+ }
+
+ @Override
+ public void init(Formatter formatter, Map<String, String> properties) {
+ this.formatter = formatter;
+ }
+
+ @Override
+ public void doWrite(AuditLog auditLog) {
+ auditLogs.add((DummyAuditLog) auditLog);
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String name() {
+ return "dummy";
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/audit/TestAuditManager.java
b/core/src/test/java/org/apache/gravitino/audit/TestAuditManager.java
new file mode 100644
index 000000000..2db57b48e
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/TestAuditManager.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import static org.apache.gravitino.audit.AuditLog.Operation;
+import static org.apache.gravitino.audit.AuditLog.Status;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.stream.Stream;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.FailureEvent;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestAuditManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestAuditManager.class);
+
+ private static final String DEFAULT_FILE_NAME = "gravitino_audit.log";
+
+ private static final int EVENT_NUM = 2000;
+
+ private Path logPath;
+
+ @BeforeAll
+ public void setup() {
+ String logDir = System.getProperty("gravitino.log.path");
+ Path logDirPath = Paths.get(logDir);
+ if (!Files.exists(logDirPath)) {
+ try {
+ Files.createDirectories(logDirPath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ this.logPath = Paths.get(logDir + "/" + DEFAULT_FILE_NAME);
+ if (Files.exists(logPath)) {
+ LOG.warn(
+ String.format("tmp audit log file: %s already exists, delete it",
DEFAULT_FILE_NAME));
+ try {
+ Files.delete(logPath);
+ LOG.warn(String.format("delete tmp audit log file: %s success",
DEFAULT_FILE_NAME));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /** Test audit log with custom audit writer and formatter. */
+ @Test
+ public void testAuditLog() {
+ Config config = new Config(false) {};
+ config.set(Configs.AUDIT_LOG_ENABLED_CONF, true);
+ config.set(Configs.AUDIT_LOG_WRITER_CLASS_NAME,
"org.apache.gravitino.audit.DummyAuditWriter");
+ config.set(
+ Configs.AUDIT_LOG_FORMATTER_CLASS_NAME,
"org.apache.gravitino.audit.DummyAuditFormatter");
+
+ EventListenerManager eventListenerManager = mockEventListenerManager();
+ AuditLogManager auditLogManager = mockAuditLogManager(config,
eventListenerManager);
+ EventBus eventBus = eventListenerManager.createEventBus();
+
+ // dispatch success event
+ DummyEvent dummyEvent = mockDummyEvent();
+ eventBus.dispatchEvent(dummyEvent);
+
+ Assertions.assertInstanceOf(DummyAuditWriter.class,
auditLogManager.getAuditLogWriter());
+ Assertions.assertInstanceOf(
+ DummyAuditFormatter.class,
(auditLogManager.getAuditLogWriter()).getFormatter());
+
+ DummyAuditWriter dummyAuditWriter = (DummyAuditWriter)
auditLogManager.getAuditLogWriter();
+
+ DummyAuditFormatter formatter = (DummyAuditFormatter)
dummyAuditWriter.getFormatter();
+ DummyAuditLog formattedAuditLog = formatter.format(dummyEvent);
+
+ Assertions.assertNotNull(formattedAuditLog);
+ Assertions.assertEquals(formattedAuditLog.operation(),
Operation.UNKNOWN_OPERATION);
+ Assertions.assertEquals(formattedAuditLog.status(), Status.SUCCESS);
+ Assertions.assertEquals(formattedAuditLog.user(), "user");
+ Assertions.assertEquals(formattedAuditLog.identifier(), "a.b.c.d");
+ Assertions.assertEquals(formattedAuditLog.timestamp(),
dummyEvent.eventTime());
+ Assertions.assertEquals(formattedAuditLog,
dummyAuditWriter.getAuditLogs().get(0));
+
+ // dispatch fail event
+ DummyFailEvent dummyFailEvent = mockDummyFailEvent();
+ eventBus.dispatchEvent(dummyFailEvent);
+ DummyAuditLog formattedFailAuditLog = formatter.format(dummyFailEvent);
+ Assertions.assertEquals(formattedFailAuditLog,
dummyAuditWriter.getAuditLogs().get(1));
+ Assertions.assertEquals(formattedFailAuditLog.operation(),
Operation.UNKNOWN_OPERATION);
+ Assertions.assertEquals(formattedFailAuditLog.status(), Status.FAILURE);
+ Assertions.assertEquals(formattedFailAuditLog,
dummyAuditWriter.getAuditLogs().get(1));
+ }
+
+ /** Test audit log with default audit writer and formatter. */
+ @Test
+ public void testFileAuditLog() {
+ Config config = new Config(false) {};
+ config.set(Configs.AUDIT_LOG_ENABLED_CONF, true);
+ DummyEvent dummyEvent = mockDummyEvent();
+ EventListenerManager eventListenerManager = mockEventListenerManager();
+ AuditLogManager auditLogManager = mockAuditLogManager(config,
eventListenerManager);
+ EventBus eventBus = eventListenerManager.createEventBus();
+ eventBus.dispatchEvent(dummyEvent);
+ Assertions.assertInstanceOf(FileAuditWriter.class,
auditLogManager.getAuditLogWriter());
+ Assertions.assertInstanceOf(
+ SimpleFormatter.class,
(auditLogManager.getAuditLogWriter()).getFormatter());
+
+ FileAuditWriter fileAuditWriter = (FileAuditWriter)
auditLogManager.getAuditLogWriter();
+ String fileName = fileAuditWriter.fileName;
+ try {
+ fileAuditWriter.outWriter.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ String auditLog = readAuditLog(fileName);
+ Formatter formatter = fileAuditWriter.getFormatter();
+ SimpleAuditLog formattedAuditLog = (SimpleAuditLog)
formatter.format(dummyEvent);
+
+ Assertions.assertNotNull(formattedAuditLog);
+ Assertions.assertEquals(formattedAuditLog.toString(), auditLog);
+ }
+
+ @Test
+ public void testBathEvents() {
+ Config config = new Config(false) {};
+ config.set(Configs.AUDIT_LOG_ENABLED_CONF, true);
+ // set immediate flush to true for testing, so that the audit log will be
read immediately
+ config.set(
+ new
ConfigBuilder("gravitino.audit.writer.file.immediateFlush").stringConf(),
"true");
+
+ EventListenerManager eventListenerManager = mockEventListenerManager();
+ AuditLogManager auditLogManager = mockAuditLogManager(config,
eventListenerManager);
+ EventBus eventBus = eventListenerManager.createEventBus();
+
+ for (int i = 0; i < EVENT_NUM; i++) {
+ DummyEvent dummyEvent = mockDummyEvent();
+ eventBus.dispatchEvent(dummyEvent);
+ }
+
+ FileAuditWriter fileAuditWriter = (FileAuditWriter)
auditLogManager.getAuditLogWriter();
+ String fileName = fileAuditWriter.fileName;
+ try {
+ fileAuditWriter.outWriter.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ long auditSize = getAuditSize(fileName);
+ Assertions.assertEquals(EVENT_NUM, auditSize);
+ }
+
+ @AfterEach
+ public void cleanup() {
+ try {
+ if (Files.exists(logPath)) {
+ Files.delete(logPath);
+ LOG.warn(String.format("delete tmp audit log file: %s success",
DEFAULT_FILE_NAME));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String readAuditLog(String fileName) {
+ try (BufferedReader reader =
+ Files.newBufferedReader(Paths.get(fileName), StandardCharsets.UTF_8)) {
+ return reader.readLine();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private long getAuditSize(String fileName) {
+ try (Stream<String> lines = Files.lines(Paths.get(fileName))) {
+ return lines.count();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private AuditLogManager mockAuditLogManager(
+ Config config, EventListenerManager eventListenerManager) {
+ AuditLogManager auditLogManager = new AuditLogManager();
+ auditLogManager.init(config, eventListenerManager);
+ return auditLogManager;
+ }
+
+ private EventListenerManager mockEventListenerManager() {
+ EventListenerManager eventListenerManager = new EventListenerManager();
+ eventListenerManager.init(new HashMap<>());
+ eventListenerManager.start();
+ return eventListenerManager;
+ }
+
+ private DummyEvent mockDummyEvent() {
+ return new DummyEvent("user", NameIdentifier.of("a", "b", "c", "d"));
+ }
+
+ private DummyFailEvent mockDummyFailEvent() {
+ return new DummyFailEvent("user", NameIdentifier.of("a", "b", "c", "d"));
+ }
+
+ static class DummyEvent extends Event {
+ protected DummyEvent(String user, NameIdentifier identifier) {
+ super(user, identifier);
+ }
+ }
+
+ static class DummyFailEvent extends FailureEvent {
+ protected DummyFailEvent(String user, NameIdentifier identifier) {
+ super(user, identifier, new RuntimeException("dummy exception"));
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/gravitino/audit/TestOperation.java
b/core/src/test/java/org/apache/gravitino/audit/TestOperation.java
new file mode 100644
index 000000000..cd0bc9da9
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/audit/TestOperation.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.audit;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.LocalDate;
+import java.util.HashMap;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.MetalakeChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetChange;
+import org.apache.gravitino.listener.api.event.AlterCatalogEvent;
+import org.apache.gravitino.listener.api.event.AlterCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetEvent;
+import org.apache.gravitino.listener.api.event.AlterFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeEvent;
+import org.apache.gravitino.listener.api.event.AlterMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaEvent;
+import org.apache.gravitino.listener.api.event.AlterSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTableEvent;
+import org.apache.gravitino.listener.api.event.AlterTableFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogEvent;
+import org.apache.gravitino.listener.api.event.CreateCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetEvent;
+import org.apache.gravitino.listener.api.event.CreateFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.CreateMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaEvent;
+import org.apache.gravitino.listener.api.event.CreateSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTableEvent;
+import org.apache.gravitino.listener.api.event.CreateTableFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogEvent;
+import org.apache.gravitino.listener.api.event.DropCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetEvent;
+import org.apache.gravitino.listener.api.event.DropFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeEvent;
+import org.apache.gravitino.listener.api.event.DropMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaEvent;
+import org.apache.gravitino.listener.api.event.DropSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTableEvent;
+import org.apache.gravitino.listener.api.event.DropTableFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTopicEvent;
+import org.apache.gravitino.listener.api.event.DropTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.GetFileLocationEvent;
+import org.apache.gravitino.listener.api.event.GetFileLocationFailureEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionEvent;
+import org.apache.gravitino.listener.api.event.GetPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogEvent;
+import org.apache.gravitino.listener.api.event.ListCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetEvent;
+import org.apache.gravitino.listener.api.event.ListFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionEvent;
+import org.apache.gravitino.listener.api.event.ListPartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaEvent;
+import org.apache.gravitino.listener.api.event.ListSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTableEvent;
+import org.apache.gravitino.listener.api.event.ListTableFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTopicEvent;
+import org.apache.gravitino.listener.api.event.ListTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogEvent;
+import org.apache.gravitino.listener.api.event.LoadCatalogFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetEvent;
+import org.apache.gravitino.listener.api.event.LoadFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeEvent;
+import org.apache.gravitino.listener.api.event.LoadMetalakeFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaEvent;
+import org.apache.gravitino.listener.api.event.LoadSchemaFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTableEvent;
+import org.apache.gravitino.listener.api.event.LoadTableFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.PartitionExistsEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionEvent;
+import org.apache.gravitino.listener.api.event.PurgePartitionFailureEvent;
+import org.apache.gravitino.listener.api.event.PurgeTableEvent;
+import org.apache.gravitino.listener.api.info.CatalogInfo;
+import org.apache.gravitino.listener.api.info.FilesetInfo;
+import org.apache.gravitino.listener.api.info.MetalakeInfo;
+import org.apache.gravitino.listener.api.info.SchemaInfo;
+import org.apache.gravitino.listener.api.info.TableInfo;
+import org.apache.gravitino.listener.api.info.TopicInfo;
+import org.apache.gravitino.listener.api.info.partitions.IdentityPartitionInfo;
+import org.apache.gravitino.listener.api.info.partitions.PartitionInfo;
+import org.apache.gravitino.messaging.TopicChange;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestOperation {
+
+ private NameIdentifier metalakeIdentifier;
+
+ private MetalakeInfo metalakeInfo;
+
+ private NameIdentifier catalogIdentifier;
+
+ private CatalogInfo catalogInfo;
+
+ private NameIdentifier schemaIdentifier;
+
+ private SchemaInfo schemaInfo;
+
+ private NameIdentifier tableIdentifier;
+
+ private TableInfo tableInfo;
+
+ private NameIdentifier filesetIdentifier;
+
+ private FilesetInfo filesetInfo;
+
+ private NameIdentifier topicIdentifier;
+
+ private TopicInfo topicInfo;
+
+ private NameIdentifier partitionIdentifier;
+
+ private PartitionInfo partitionInfo;
+
+ private static final String USER = "user";
+
+ private static final String PARTITION_NAME = "dt=2008-08-08/country=us";
+
+ @BeforeAll
+ public void init() {
+ this.metalakeIdentifier = mockMetalakeIdentifier();
+ this.metalakeInfo = mockMetalakeInfo();
+
+ this.catalogIdentifier = mockCatalogIndentifier();
+ this.catalogInfo = mockCatalogInfo();
+
+ this.schemaIdentifier = mockSchemaIdentifier();
+ this.schemaInfo = mockSchemaInfo();
+
+ this.tableIdentifier = mockTableIdentifier();
+ this.tableInfo = mockTableInfo();
+
+ this.topicIdentifier = mockTopicIdentifier();
+ this.topicInfo = mockTopicInfo();
+
+ this.filesetIdentifier = mockFilesetIdentifier();
+ this.filesetInfo = mockFilesetInfo();
+
+ this.partitionIdentifier = mockPartitionIdentifier();
+ this.partitionInfo = mockPartitionInfo();
+ }
+
+ @Test
+ public void testCreateOperation() {
+ Event createMetalakeEvent = new CreateMetalakeEvent(USER,
catalogIdentifier, metalakeInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createMetalakeEvent),
AuditLog.Operation.CREATE_METALAKE);
+ Event createMetalakeFaileEvent =
+ new CreateMetalakeFailureEvent(USER, metalakeIdentifier, new
Exception(), metalakeInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createMetalakeFaileEvent),
AuditLog.Operation.CREATE_METALAKE);
+
+ Event createCatalogEvent = new CreateCatalogEvent(USER, catalogIdentifier,
catalogInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createCatalogEvent),
AuditLog.Operation.CREATE_CATALOG);
+ Event CreateCatalogFailureEvent =
+ new CreateCatalogFailureEvent(USER, catalogIdentifier, new
Exception(), catalogInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(CreateCatalogFailureEvent),
AuditLog.Operation.CREATE_CATALOG);
+
+ Event createSchemaEvent = new CreateSchemaEvent(USER, schemaIdentifier,
schemaInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createSchemaEvent),
AuditLog.Operation.CREATE_SCHEMA);
+ Event createSchemaFailureEvent =
+ new CreateSchemaFailureEvent(USER, schemaIdentifier, new Exception(),
schemaInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createSchemaFailureEvent),
AuditLog.Operation.CREATE_SCHEMA);
+
+ Event createTableEvent = new CreateTableEvent(USER, tableIdentifier,
tableInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createTableEvent),
AuditLog.Operation.CREATE_TABLE);
+ Event createTableFailureEvent =
+ new CreateTableFailureEvent(USER, tableIdentifier, new Exception(),
tableInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createTableFailureEvent),
AuditLog.Operation.CREATE_TABLE);
+
+ Event createFilesetEvent = new CreateFilesetEvent(USER, filesetIdentifier,
filesetInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createFilesetEvent),
AuditLog.Operation.CREATE_FILESET);
+ Event createFilesetFailureEvent =
+ new CreateFilesetFailureEvent(USER, filesetIdentifier, new
Exception(), filesetInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createFilesetFailureEvent),
AuditLog.Operation.CREATE_FILESET);
+
+ Event createTopicEvent = new CreateTopicEvent(USER, topicIdentifier,
topicInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createTopicEvent),
AuditLog.Operation.CREATE_TOPIC);
+ Event createTopicFailureEvent =
+ new CreateTopicFailureEvent(USER, topicIdentifier, new Exception(),
topicInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(createTopicFailureEvent),
AuditLog.Operation.CREATE_TOPIC);
+ }
+
+ @Test
+ public void testAlterOperation() {
+ Event alterMetalakeEvent =
+ new AlterMetalakeEvent(USER, metalakeIdentifier, new MetalakeChange[]
{}, metalakeInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterMetalakeEvent),
AuditLog.Operation.ALTER_METALAKE);
+ Event alterMetalakeFailureEvent =
+ new AlterMetalakeFailureEvent(
+ USER, metalakeIdentifier, new Exception(), new MetalakeChange[]
{});
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterMetalakeFailureEvent),
AuditLog.Operation.ALTER_METALAKE);
+
+ Event alterCatalogEvent =
+ new AlterCatalogEvent(USER, schemaIdentifier, new CatalogChange[] {},
catalogInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterCatalogEvent),
AuditLog.Operation.ALTER_CATALOG);
+ Event alterCatalogFailureEvent =
+ new AlterCatalogFailureEvent(
+ USER, catalogIdentifier, new Exception(), new CatalogChange[] {});
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterCatalogFailureEvent),
AuditLog.Operation.ALTER_CATALOG);
+
+ Event alterSchemaEvent =
+ new AlterSchemaEvent(USER, schemaIdentifier, new SchemaChange[] {},
schemaInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterSchemaEvent),
AuditLog.Operation.ALTER_SCHEMA);
+ Event alterSchemaFailureEvent =
+ new AlterSchemaFailureEvent(USER, schemaIdentifier, new Exception(),
new SchemaChange[] {});
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterSchemaFailureEvent),
AuditLog.Operation.ALTER_SCHEMA);
+
+ Event alterTableEvent =
+ new AlterTableEvent(USER, tableIdentifier, new TableChange[] {},
tableInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterTableEvent),
AuditLog.Operation.ALTER_TABLE);
+ Event alterTableFailureEvent =
+ new AlterTableFailureEvent(USER, tableIdentifier, new Exception(), new
TableChange[] {});
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterTableFailureEvent),
AuditLog.Operation.ALTER_TABLE);
+
+ Event alterFilesetEvent =
+ new AlterFilesetEvent(USER, filesetIdentifier, new FilesetChange[] {},
filesetInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterFilesetEvent),
AuditLog.Operation.ALTER_FILESET);
+ Event alterFilesetFailureEvent =
+ new AlterFilesetFailureEvent(
+ USER, filesetIdentifier, new Exception(), new FilesetChange[] {});
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterFilesetFailureEvent),
AuditLog.Operation.ALTER_FILESET);
+
+ Event alterTopicEvent =
+ new AlterTopicEvent(USER, topicIdentifier, new TopicChange[] {},
topicInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterTopicEvent),
AuditLog.Operation.ALTER_TOPIC);
+ Event alterTopicFailureEvent =
+ new AlterTopicFailureEvent(USER, topicIdentifier, new Exception(), new
TopicChange[] {});
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(alterTopicFailureEvent),
AuditLog.Operation.ALTER_TOPIC);
+ }
+
+ @Test
+ public void testDropOperation() {
+ Event dropMetalakeEvent = new DropMetalakeEvent(USER, metalakeIdentifier,
true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropMetalakeEvent),
AuditLog.Operation.DROP_METALAKE);
+ Event dropMetalakeFailureEvent = new DropMetalakeFailureEvent(USER,
metalakeIdentifier, null);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropMetalakeFailureEvent),
AuditLog.Operation.DROP_METALAKE);
+
+ Event dropCatalogEvent = new DropCatalogEvent(USER, catalogIdentifier,
true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropCatalogEvent),
AuditLog.Operation.DROP_CATALOG);
+ Event dropCatalogFailureEvent =
+ new DropCatalogFailureEvent(USER, catalogIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropCatalogFailureEvent),
AuditLog.Operation.DROP_CATALOG);
+
+ Event dropSchemaEvent = new DropSchemaEvent(USER, schemaIdentifier, true,
true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropSchemaEvent),
AuditLog.Operation.DROP_SCHEMA);
+ Event dropSchemaFailureEvent =
+ new DropSchemaFailureEvent(USER, schemaIdentifier, new Exception(),
true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropSchemaFailureEvent),
AuditLog.Operation.DROP_SCHEMA);
+
+ Event dropTableEvent = new DropTableEvent(USER, tableIdentifier, true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropTableEvent),
AuditLog.Operation.DROP_TABLE);
+ Event dropTableFailureEvent = new DropTableFailureEvent(USER,
tableIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropTableFailureEvent),
AuditLog.Operation.DROP_TABLE);
+
+ Event dropFilesetEvent = new DropFilesetEvent(USER, filesetIdentifier,
true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropFilesetEvent),
AuditLog.Operation.DROP_FILESET);
+ Event dropFilesetFailureEvent =
+ new DropFilesetFailureEvent(USER, filesetIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropFilesetFailureEvent),
AuditLog.Operation.DROP_FILESET);
+
+ Event dropTopicEvent = new DropTopicEvent(USER, topicIdentifier, true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropTopicEvent),
AuditLog.Operation.DROP_TOPIC);
+ Event dropTopicFailureEvent = new DropTopicFailureEvent(USER,
topicIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(dropTopicFailureEvent),
AuditLog.Operation.DROP_TOPIC);
+ }
+
+ @Test
+ public void testPurgeOperation() {
+ Event purgeMetalakeEvent =
+ new PurgePartitionEvent(USER, metalakeIdentifier, true,
PARTITION_NAME);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(purgeMetalakeEvent),
AuditLog.Operation.PURGE_PARTITION);
+ Event purgeMetalakeFailureEvent =
+ new PurgePartitionFailureEvent(USER, partitionIdentifier, new
Exception(), PARTITION_NAME);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(purgeMetalakeFailureEvent),
+ AuditLog.Operation.PURGE_PARTITION);
+
+ Event purgeTableEvent = new PurgeTableEvent(USER, tableIdentifier, true);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(purgeTableEvent),
AuditLog.Operation.PURGE_TABLE);
+ Event purgePartitionFailureEvent =
+ new PurgePartitionFailureEvent(USER, partitionIdentifier, new
Exception(), PARTITION_NAME);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(purgePartitionFailureEvent),
+ AuditLog.Operation.PURGE_PARTITION);
+ }
+
+ @Test
+ public void testGetOperation() {
+ Event getFileLocationEvent =
+ new GetFileLocationEvent(USER, filesetIdentifier, "location",
"subPath", new HashMap<>());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(getFileLocationEvent),
AuditLog.Operation.GET_FILE_LOCATION);
+ Event getFilesetFailureEvent =
+ new GetFileLocationFailureEvent(USER, filesetIdentifier, "subPath",
new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(getFilesetFailureEvent),
AuditLog.Operation.GET_FILE_LOCATION);
+
+ Event getPartitionEvent = new GetPartitionEvent(USER, partitionIdentifier,
partitionInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(getPartitionEvent),
AuditLog.Operation.GET_PARTITION);
+ Event getPartitionFailureEvent =
+ new GetPartitionFailureEvent(USER, partitionIdentifier, new
Exception(), PARTITION_NAME);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(getPartitionFailureEvent),
AuditLog.Operation.GET_PARTITION);
+ }
+
+ @Test
+ public void testListOperation() {
+ Namespace namespace = Namespace.of("metalake", "catalog");
+ Event listCatalogEvent = new ListCatalogEvent(USER, namespace);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listCatalogEvent),
AuditLog.Operation.LIST_CATALOG);
+ Event listCatalogFailureEvent = new ListCatalogFailureEvent(USER, new
Exception(), namespace);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listCatalogFailureEvent),
AuditLog.Operation.LIST_CATALOG);
+
+ Event listSchemaEvent = new ListSchemaEvent(USER, namespace);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listSchemaEvent),
AuditLog.Operation.LIST_SCHEMA);
+ Event listSchemaFailureEvent = new ListSchemaFailureEvent(USER, namespace,
new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listSchemaFailureEvent),
AuditLog.Operation.LIST_SCHEMA);
+
+ Event listTableEvent = new ListTableEvent(USER, namespace);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listTableEvent),
AuditLog.Operation.LIST_TABLE);
+ Event listTableFailureEvent = new ListTableFailureEvent(USER, namespace,
new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listTableFailureEvent),
AuditLog.Operation.LIST_TABLE);
+
+ Event listTopicEvent = new ListTopicEvent(USER, namespace);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listTopicEvent),
AuditLog.Operation.LIST_TOPIC);
+ Event listTopicFailureEvent = new ListTopicFailureEvent(USER, namespace,
new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listTopicFailureEvent),
AuditLog.Operation.LIST_TOPIC);
+
+ Event listFilesetEvent = new ListFilesetEvent(USER, namespace);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listFilesetEvent),
AuditLog.Operation.LIST_FILESET);
+ Event listFilesetFailureEvent = new ListFilesetFailureEvent(USER,
namespace, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listFilesetFailureEvent),
AuditLog.Operation.LIST_FILESET);
+
+ Event listPartitionEvent = new ListPartitionEvent(USER,
partitionIdentifier);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listPartitionEvent),
AuditLog.Operation.LIST_PARTITION);
+ Event listPartitionFailureEvent =
+ new ListPartitionFailureEvent(USER, partitionIdentifier, new
Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(listPartitionFailureEvent),
AuditLog.Operation.LIST_PARTITION);
+ }
+
+ @Test
+ public void testLoadOperation() {
+ Event loadMetalakeEvent = new LoadMetalakeEvent(USER, metalakeIdentifier,
metalakeInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadMetalakeEvent),
AuditLog.Operation.LOAD_METALAKE);
+ Event loadMetalakeFailureEvent =
+ new LoadMetalakeFailureEvent(USER, metalakeIdentifier, new
Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadMetalakeFailureEvent),
AuditLog.Operation.LOAD_METALAKE);
+
+ Event loadCatalogEvent = new LoadCatalogEvent(USER, catalogIdentifier,
catalogInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadCatalogEvent),
AuditLog.Operation.LOAD_CATALOG);
+ Event loadCatalogFailureEvent =
+ new LoadCatalogFailureEvent(USER, catalogIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadCatalogFailureEvent),
AuditLog.Operation.LOAD_CATALOG);
+
+ Event loadSchemaEvent = new LoadSchemaEvent(USER, schemaIdentifier,
schemaInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadSchemaEvent),
AuditLog.Operation.LOAD_SCHEMA);
+ Event loadSchemaFailureEvent =
+ new LoadSchemaFailureEvent(USER, schemaIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadSchemaFailureEvent),
AuditLog.Operation.LOAD_SCHEMA);
+
+ Event loadTableEvent = new LoadTableEvent(USER, tableIdentifier,
tableInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadTableEvent),
AuditLog.Operation.LOAD_TABLE);
+ Event loadTableFailureEvent = new LoadTableFailureEvent(USER,
tableIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadTableFailureEvent),
AuditLog.Operation.LOAD_TABLE);
+
+ Event loadFilesetEvent = new LoadFilesetEvent(USER, filesetIdentifier,
filesetInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadFilesetEvent),
AuditLog.Operation.LOAD_FILESET);
+ Event loadFilesetFailureEvent =
+ new LoadFilesetFailureEvent(USER, filesetIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadFilesetFailureEvent),
AuditLog.Operation.LOAD_FILESET);
+
+ Event loadTopicEvent = new LoadTopicEvent(USER, topicIdentifier,
topicInfo);
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadTopicEvent),
AuditLog.Operation.LOAD_TOPIC);
+ Event loadTopicFailureEvent = new LoadTopicFailureEvent(USER,
topicIdentifier, new Exception());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(loadTopicFailureEvent),
AuditLog.Operation.LOAD_TOPIC);
+ }
+
+ @Test
+ public void testExistsOperation() {
+ Event partitionExistsEvent =
+ new PartitionExistsEvent(USER, partitionIdentifier, true,
partitionIdentifier.name());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(partitionExistsEvent),
AuditLog.Operation.PARTITION_EXIST);
+
+ Event partitionExistsFailureEvent =
+ new PartitionExistsEvent(USER, partitionIdentifier, true,
partitionIdentifier.name());
+ Assertions.assertEquals(
+ AuditLog.Operation.fromEvent(partitionExistsFailureEvent),
+ AuditLog.Operation.PARTITION_EXIST);
+ }
+
+ private NameIdentifier mockMetalakeIdentifier() {
+ return NameIdentifier.of("metalake");
+ }
+
+ private MetalakeInfo mockMetalakeInfo() {
+ return new MetalakeInfo("metalake", "comment", ImmutableMap.of("a", "b"),
null);
+ }
+
+ private NameIdentifier mockCatalogIndentifier() {
+ return NameIdentifier.of("metalake", "catalog");
+ }
+
+ private CatalogInfo mockCatalogInfo() {
+ return new CatalogInfo(
+ "catalog", Catalog.Type.RELATIONAL, "hive", "comment",
ImmutableMap.of("a", "b"), null);
+ }
+
+ private NameIdentifier mockSchemaIdentifier() {
+ return NameIdentifier.of("metalake", "catalog", "schema");
+ }
+
+ private SchemaInfo mockSchemaInfo() {
+ return new SchemaInfo("schema", "comment", ImmutableMap.of("a", "b"),
null);
+ }
+
+ private NameIdentifier mockTableIdentifier() {
+ return NameIdentifier.of("metalake", "catalog", "table");
+ }
+
+ private TableInfo mockTableInfo() {
+ return new TableInfo(
+ "table",
+ new Column[] {Column.of("a", Types.IntegerType.get())},
+ "comment",
+ ImmutableMap.of("a", "b"),
+ new Transform[] {Transforms.identity("a")},
+ Distributions.of(Strategy.HASH, 10, NamedReference.field("a")),
+ new SortOrder[] {SortOrders.ascending(NamedReference.field("a"))},
+ new Index[] {Indexes.primary("p", new String[][] {{"a"}, {"b"}})},
+ null);
+ }
+
+ private NameIdentifier mockFilesetIdentifier() {
+ return NameIdentifier.of("metalake", "catalog", "fileset");
+ }
+
+ private FilesetInfo mockFilesetInfo() {
+ return new FilesetInfo(
+ "fileset", "comment", Fileset.Type.MANAGED, "location",
ImmutableMap.of("a", "b"), null);
+ }
+
+ private NameIdentifier mockTopicIdentifier() {
+ return NameIdentifier.of("metalake", "catalog", "topic");
+ }
+
+ private TopicInfo mockTopicInfo() {
+ return new TopicInfo("topic", "comment", ImmutableMap.of("a", "b"), null);
+ }
+
+ private NameIdentifier mockPartitionIdentifier() {
+ return NameIdentifier.of("metalake", "catalog", "schema", "table",
PARTITION_NAME);
+ }
+
+ private PartitionInfo mockPartitionInfo() {
+ return new IdentityPartitionInfo(
+ PARTITION_NAME,
+ new String[][] {{"dt"}, {"country"}},
+ new Literal[] {
+ Literals.dateLiteral(LocalDate.parse("2008-08-08")),
Literals.stringLiteral("us")
+ },
+ ImmutableMap.of("location",
"/user/hive/warehouse/tpch_flat_orc_2.db/orders"));
+ }
+}
diff --git a/docs/gravitino-server-config.md b/docs/gravitino-server-config.md
index efc842813..9d319d289 100644
--- a/docs/gravitino-server-config.md
+++ b/docs/gravitino-server-config.md
@@ -138,6 +138,33 @@ The plugin provides several operational modes for how to
process event, supporti
For more details, please refer to the definition of the plugin.
+### Audit log configuration
+
+The audit log framework defines how audit logs are formatted and written to
various storages. The formatter defines an interface that transforms different
`Event` types into a unified `AuditLog`. The writer defines an interface to
writing AuditLog to different storages.
+
+Gravitino provides a default implement to log basic audit information to a
file, you could extend the audit system by implementation corresponding
interfaces.
+
+| Property name | Description
| Default value | Required | Since Version
|
+|---------------------------------------|----------------------------------------|---------------------------------------------|----------|----------------------------|
+| `gravitino.audit.enabled` | The audit log enable flag.
| false | NO | 0.7.0-incubating
|
+| `gravitino.audit.writer.className` | The class name of audit log writer.
| org.apache.gravitino.audit.FileAuditWriter | NO | 0.7.0-incubating
|
+| `gravitino.audit.formatter.className` | The class name of audit log
formatter. | org.apache.gravitino.audit.SimpleFormatter | NO |
0.7.0-incubating |
+
+#### Audit log formatter
+
+The Formatter defines an interface that formats metadata audit logs into a
unified format. `SimpleFormatter` is a default implement to format audit
information, you don't need to do extra configurations.
+
+#### Audit log writer
+
+The `AuditLogWriter` defines an interface that enables the writing of metadata
audit logs to different storage mediums such as files, databases, etc.
+
+Writer configuration begins with `gravitino.audit.writer.${name}`, where
${name} is replaced with the actual writer name defined in method `name()`.
`FileAuditWriter` is a default implement to log audit information, whose name
is `file`.
+
+| Property name | Description
| Default
value | Required | Since Version |
+|-----------------------------------------------------|-----------------------------------------------------------------------------------------|---------------------|----------|----------------------------|
+| `gravitino.audit.writer.file.fileName` | The audit log file
name, the path is ${sys:gravitino.log.path}/${fileName}. |
gravitino_audit.log | NO | 0.7.0-incubating |
+| `gravitino.audit.writer.file.append` | Whether the log will
be written to the end or the beginning of the file. | true
| NO | 0.7.0-incubating |
+
### Security configuration
Refer to [security](security/security.md) for HTTPS and authentication
configurations.