This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 704c70c3c [#1873] feat(server): Add audit log support for write and
delete operations (#1874)
704c70c3c is described below
commit 704c70c3ca9e33e195708e3a93067438ea12aa9a
Author: RickyMa <[email protected]>
AuthorDate: Tue Jul 9 14:16:03 2024 +0800
[#1873] feat(server): Add audit log support for write and delete operations
(#1874)
### What changes were proposed in this pull request?
Add audit log support for write and delete operations.
Introduce a switch `rss.server.audit.log.enabled`, the default value is
`false`.
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1873.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
For write operations, the audit.log will be:
```
w|application_1716779728283_7775663_1720409662460|1|858_858|local|/data7/rssdata|56401402|2024-07-08
11:36:32|2024-07-08 11:36:32|85
w|application_1716779728283_7775663_1720409662460|1|860_860|local|/data13/rssdata|56365595|2024-07-08
11:36:32|2024-07-08 11:36:32|76
w|application_1703049085550_15349071_1720381124695|0|520_520|qy-xxx-30-v3|hdfs://qy-xxx-30-v3/rss/online|203300442|2024-07-08
03:41:11|2024-07-08 03:41:14|2543
```
For delete operations, the audit.log will be:
```
d|application_1716779728283_7775663_1720409662460|local|/data1/rssdata
d|application_1716779728283_7775663_1720409662460|local|/data2/rssdata
d|application_1703049085550_15349071_1720381124695|qy-xxx-30-v3|hdfs://qy-xxx-30-v3/rss/online
```
---
bin/start-shuffle-server.sh | 3 +-
.../java/org/apache/uniffle/common/AuditType.java | 32 ++++++++++++++++++++++
conf/log4j2.xml | 10 +++++++
.../apache/uniffle/server/ShuffleFlushManager.java | 24 ++++++++++++++++
.../apache/uniffle/server/ShuffleServerConf.java | 11 ++++++++
.../server/storage/HadoopStorageManager.java | 25 +++++++++++++++++
.../server/storage/LocalStorageManager.java | 23 ++++++++++++++++
7 files changed, 127 insertions(+), 1 deletion(-)
diff --git a/bin/start-shuffle-server.sh b/bin/start-shuffle-server.sh
index 1b1afe524..d6186b386 100755
--- a/bin/start-shuffle-server.sh
+++ b/bin/start-shuffle-server.sh
@@ -30,6 +30,7 @@ SHUFFLE_SERVER_CONF_FILE="${RSS_CONF_DIR}/server.conf"
JAR_DIR="${RSS_HOME}/jars"
LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml"
LOG_PATH="${RSS_LOG_DIR}/shuffle_server.log"
+AUDIT_LOG_PATH="${RSS_LOG_DIR}/audit.log"
if [ -z "${XMX_SIZE:-}" ]; then
echo "No env XMX_SIZE."
@@ -131,7 +132,7 @@ GC_LOG_ARGS_NEW=" -XX:+IgnoreUnrecognizedVMOptions \
ARGS=""
if [ -f ${LOG_CONF_FILE} ]; then
- ARGS="$ARGS -Dlog4j2.configurationFile=file:${LOG_CONF_FILE}
-Dlog.path=${LOG_PATH}"
+ ARGS="$ARGS -Dlog4j2.configurationFile=file:${LOG_CONF_FILE}
-Dlog.path=${LOG_PATH} -Daudit.log.path=${AUDIT_LOG_PATH}"
else
echo "Exit with error: ${LOG_CONF_FILE} file doesn't exist."
exit 1
diff --git a/common/src/main/java/org/apache/uniffle/common/AuditType.java
b/common/src/main/java/org/apache/uniffle/common/AuditType.java
new file mode 100644
index 000000000..7ed9bda3e
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/AuditType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum AuditType {
+ WRITE("w"),
+ DELETE("d");
+ private String value;
+
+ AuditType(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+}
diff --git a/conf/log4j2.xml b/conf/log4j2.xml
index 6ea652a08..b05d669ac 100644
--- a/conf/log4j2.xml
+++ b/conf/log4j2.xml
@@ -27,6 +27,13 @@
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
+ <RollingFile name="auditAppender" fileName="${sys:audit.log.path}"
filePattern="${sys:audit.log.path}.%i">
+ <PatternLayout pattern="%m%n"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="2GB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingFile>
</Appenders>
<Loggers>
<Root level="info">
@@ -45,5 +52,8 @@
<AppenderRef ref="console"/>
<AppenderRef ref="RollingAppender"/>
</Logger>
+ <Logger name="audit" level="INFO" additivity="false">
+ <AppenderRef ref="auditAppender"/>
+ </Logger>
</Loggers>
</Configuration>
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 9237884fd..92a3da081 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -26,11 +26,13 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.AuditType;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
@@ -50,6 +52,8 @@ import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY
public class ShuffleFlushManager {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleFlushManager.class);
+ private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
+ private static final String AUDIT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
public static final AtomicLong ATOMIC_EVENT_ID = new AtomicLong(0);
private final ShuffleServer shuffleServer;
private final List<String> storageBasePaths;
@@ -65,6 +69,7 @@ public class ShuffleFlushManager {
private final StorageManager storageManager;
private final long pendingEventTimeoutSec;
private FlushEventHandler eventHandler;
+ private final boolean isAuditLogEnabled;
public ShuffleFlushManager(
ShuffleServerConf shuffleServerConf,
@@ -83,6 +88,8 @@ public class ShuffleFlushManager {
eventHandler =
new DefaultFlushEventHandler(
shuffleServerConf, storageManager, shuffleServer,
this::processFlushEvent);
+ isAuditLogEnabled =
+
this.shuffleServerConf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED);
}
public void addToFlushQueue(ShuffleDataFlushEvent event) {
@@ -162,15 +169,32 @@ public class ShuffleFlushManager {
user,
maxConcurrencyPerPartitionToWrite);
ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+ long startTime = System.currentTimeMillis();
boolean writeSuccess = storageManager.write(storage, handler, event);
if (!writeSuccess) {
throw new EventRetryException();
}
+ long endTime = System.currentTimeMillis();
// update some metrics for shuffle task
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
event.getShuffleBlocks());
ShuffleTaskInfo shuffleTaskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
+ if (isAuditLogEnabled) {
+ AUDIT_LOGGER.info(
+ String.format(
+ "%s|%s|%d|%s|%s|%s|%d|%s|%s|%d",
+ AuditType.WRITE.getValue(),
+ event.getAppId(),
+ event.getShuffleId(),
+ event.getStartPartition() + "_" + event.getEndPartition(),
+ event.getUnderStorage().getStorageHost(),
+ event.getUnderStorage().getStoragePath(),
+ event.getSize(),
+ DateFormatUtils.format(startTime, AUDIT_DATE_PATTERN),
+ DateFormatUtils.format(endTime, AUDIT_DATE_PATTERN),
+ endTime - startTime));
+ }
if (null != shuffleTaskInfo) {
String storageHost = event.getUnderStorage().getStorageHost();
if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index ebc4aaccb..55a43928d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -158,6 +158,17 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(10000L)
.withDescription("Threshold for write slow defined");
+ public static final ConfigOption<Boolean> SERVER_AUDIT_LOG_ENABLED =
+ ConfigOptions.key("rss.server.audit.log.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When set to true, for auditing purposes, the server will log
audit records for every disk write and delete operation. "
+ + "Each file write is logged, while delete operations are
specific to application ID/shuffle ID, "
+ + "removing all associated files and recording the deletion
of the entire application ID or shuffle ID. "
+ + "For a write operation, it includes the size of the data
written, the storage type and the specific disk to which it is written "
+ + "(for instance, in scenarios where multiple local disks
are mounted).");
+
public static final ConfigOption<Long> SERVER_EVENT_SIZE_THRESHOLD_L1 =
ConfigOptions.key("rss.server.event.size.threshold.l1")
.longType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 7ab9ef639..90d62a1ed 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.AuditType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
@@ -55,16 +56,19 @@ import org.apache.uniffle.storage.util.StorageType;
public class HadoopStorageManager extends SingleStorageManager {
private static final Logger LOG =
LoggerFactory.getLogger(HadoopStorageManager.class);
+ private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
private final Configuration hadoopConf;
private final String shuffleServerId;
private Map<String, HadoopStorage> appIdToStorages =
JavaUtils.newConcurrentMap();
private Map<String, HadoopStorage> pathToStorages =
JavaUtils.newConcurrentMap();
+ private final boolean isAuditLogEnabled;
HadoopStorageManager(ShuffleServerConf conf) {
super(conf);
hadoopConf = conf.getHadoopConf();
shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID,
"shuffleServerId");
+ isAuditLogEnabled =
conf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED);
}
@Override
@@ -116,11 +120,32 @@ public class HadoopStorageManager extends
SingleStorageManager {
if (event instanceof AppPurgeEvent) {
deletePaths.add(basicPath);
+ if (isAuditLogEnabled) {
+ AUDIT_LOGGER.info(
+ String.format(
+ "%s|%s|%s|%s",
+ AuditType.DELETE.getValue(),
+ appId,
+ storage.getStorageHost(),
+ storage.getStoragePath()));
+ }
} else {
for (Integer shuffleId : event.getShuffleIds()) {
deletePaths.add(
ShuffleStorageUtils.getFullShuffleDataFolder(basicPath,
String.valueOf(shuffleId)));
}
+ if (isAuditLogEnabled) {
+ AUDIT_LOGGER.info(
+ String.format(
+ "%s|%s|%s|%s|%s",
+ AuditType.DELETE.getValue(),
+ appId,
+ event.getShuffleIds().stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(",")),
+ storage.getStorageHost(),
+ storage.getStoragePath()));
+ }
}
deleteHandler.delete(deletePaths.toArray(new String[0]), appId,
event.getUser());
} else {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 724201231..c55fe9060 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.AuditType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.UnionKey;
import org.apache.uniffle.common.exception.RssException;
@@ -77,6 +78,7 @@ import static
org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALI
public class LocalStorageManager extends SingleStorageManager {
private static final Logger LOG =
LoggerFactory.getLogger(LocalStorageManager.class);
+ private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
private static final String UNKNOWN_USER_NAME = "unknown";
private final List<LocalStorage> localStorages;
@@ -86,6 +88,8 @@ public class LocalStorageManager extends SingleStorageManager
{
private final ConcurrentSkipListMap<String, LocalStorage>
sortedPartitionsOfStorageMap;
private final List<StorageMediaProvider> typeProviders =
Lists.newArrayList();
+ private final boolean isAuditLogEnabled;
+
@VisibleForTesting
LocalStorageManager(ShuffleServerConf conf) {
super(conf);
@@ -169,6 +173,7 @@ public class LocalStorageManager extends
SingleStorageManager {
StringUtils.join(
localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())));
this.checker = new LocalStorageChecker(conf, localStorages);
+ isAuditLogEnabled =
conf.getBoolean(ShuffleServerConf.SERVER_AUDIT_LOG_ENABLED);
}
private StorageMedia getStorageTypeForBasePath(String basePath) {
@@ -295,8 +300,26 @@ public class LocalStorageManager extends
SingleStorageManager {
ShuffleStorageUtils.getFullShuffleDataFolder(
basicPath, String.valueOf(shuffleId)));
}
+ if (isAuditLogEnabled) {
+ AUDIT_LOGGER.info(
+ String.format(
+ "%s|%s|%s|%s|%s",
+ AuditType.DELETE.getValue(),
+ appId,
+ shuffleSet.stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(",")),
+ LocalStorage.STORAGE_HOST,
+ path));
+ }
return paths.stream();
} else {
+ if (isAuditLogEnabled) {
+ AUDIT_LOGGER.info(
+ String.format(
+ "%s|%s|%s|%s",
+ AuditType.DELETE.getValue(), appId,
LocalStorage.STORAGE_HOST, path));
+ }
return Stream.of(basicPath);
}
})