This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1f1c14dae [#2158] feat(common): Introduce reconfigure listener for
server/coordinator conf (#2159)
1f1c14dae is described below
commit 1f1c14dae685a442d0d698f1a501d70d0528db08
Author: maobaolong <[email protected]>
AuthorDate: Sun Oct 6 10:43:12 2024 +0800
[#2158] feat(common): Introduce reconfigure listener for server/coordinator
conf (#2159)
### What changes were proposed in this pull request?
Introduce the ReconfigRegistry and ReconfigureListener to support update
the changed config value dynamically.
### Why are the changes needed?
For: #2158
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
new UT.
---
.../uniffle/common/ReconfigurableConfManager.java | 11 +-
.../uniffle/common/ReconfigurableRegistry.java | 186 +++++++++++++++++++++
.../org/apache/uniffle/common/config/RssConf.java | 4 +
.../uniffle/common/ReconfigurableRegistryTest.java | 139 +++++++++++++++
.../coordinator/CoordinatorGrpcService.java | 41 +++--
.../apache/uniffle/server/ShuffleFlushManager.java | 2 +-
.../uniffle/server/ShuffleServerGrpcService.java | 39 +++--
.../server/netty/ShuffleServerNettyHandler.java | 40 +++--
.../server/storage/LocalStorageManager.java | 24 ++-
9 files changed, 446 insertions(+), 40 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
index 1c9219ebd..cd750b0aa 100644
---
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
+++
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -19,7 +19,10 @@ package org.apache.uniffle.common;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
@@ -107,6 +110,7 @@ public class ReconfigurableConfManager<T> {
if (latestConf == null) {
return;
}
+ Map<String, Object> changedProperties = new HashMap<>();
for (ConfigOption<T> configOption : updateConfOptions) {
Optional<T> valOptional = latestConf.getOptional(configOption);
if (valOptional.isPresent()) {
@@ -118,11 +122,16 @@ public class ReconfigurableConfManager<T> {
rssConf.get(configOption),
val);
rssConf.set(configOption, val);
+ changedProperties.put(configOption.key(), val);
}
- } else {
+ } else if (rssConf.isSet(configOption.key())) {
rssConf.remove(configOption.key());
+ changedProperties.put(configOption.key(), rssConf.get(configOption));
}
}
+ if (!changedProperties.isEmpty()) {
+ ReconfigurableRegistry.update(rssConf,
Collections.unmodifiableMap(changedProperties));
+ }
}
private RssConf getConfRef() {
diff --git
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
new file mode 100644
index 000000000..7dee23eca
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssConf;
+
+public class ReconfigurableRegistry {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReconfigurableRegistry.class);
+
+ private static final HashMap<Set<String>, List<ReconfigureListener>>
LISTENER_MAP =
+ new HashMap<>();
+
+ // prevent instantiation
+ private ReconfigurableRegistry() {}
+
+ /**
+ * Add a listener which listens to all properties.
+ *
+ * @param listener the given property listener
+ */
+ public static synchronized void register(ReconfigureListener listener) {
+ register(Collections.emptySet(), listener);
+ }
+
+ /**
+ * Add a listener which listens to the given keys.
+ *
+ * @param key the given key
+ * @param listener the given property listener
+ */
+ public static synchronized void register(String key, ReconfigureListener
listener) {
+ register(Sets.newHashSet(key), listener);
+ }
+
+ /**
+ * Add a listener which listens to the given keys.
+ *
+ * @param keys the given keys
+ * @param listener the given property listener
+ */
+ public static synchronized void register(Set<String> keys,
ReconfigureListener listener) {
+ List listenerList = LISTENER_MAP.computeIfAbsent(keys, k -> new
ArrayList<>());
+ listenerList.add(listener);
+ }
+
+ /**
+ * Remove all listeners related to the given key.
+ *
+ * @param key the given key
+ * @return true if the listeners are removed, otherwise false
+ */
+ public static synchronized boolean unregister(String key) {
+ return unregister(Sets.newHashSet(key));
+ }
+
+ /**
+ * Remove all listeners related to the given keys.
+ *
+ * @param keys the given keys
+ * @return true if the listeners are removed, otherwise false
+ */
+ public static synchronized boolean unregister(Set<String> keys) {
+ return LISTENER_MAP.remove(keys) != null;
+ }
+
+ /**
+ * Remove the listener related to the given keys.
+ *
+ * @param key the given key
+ * @param listener the given listener
+ * @return true if the listeners are removed, otherwise false
+ */
+ public static synchronized boolean unregister(String key,
ReconfigureListener listener) {
+ return unregister(Sets.newHashSet(key), listener);
+ }
+
+ /**
+ * Remove the listener related to the given keys.
+ *
+ * @param keys the given keys
+ * @param listener the given listener
+ * @return true if the listeners are removed, otherwise false
+ */
+ public static synchronized boolean unregister(Set<String> keys,
ReconfigureListener listener) {
+ List<ReconfigureListener> listenerList = LISTENER_MAP.get(keys);
+ if (listenerList == null) {
+ return false;
+ }
+ boolean removed = listenerList.remove(listener);
+ if (listenerList.isEmpty()) {
+ LISTENER_MAP.remove(keys);
+ }
+ return removed;
+ }
+
+ /**
+ * Remove the listener from all keys listeners first, if the listener is not
in any keys, will
+ * scan all the listener maps to remove the listener.
+ *
+ * @param listener the given listener
+ * @return true if the listeners are removed, otherwise false
+ */
+ public static synchronized boolean unregister(ReconfigureListener listener) {
+ boolean removed = unregister(Collections.emptySet(), listener);
+ if (!removed) {
+ for (Map.Entry<Set<String>, List<ReconfigureListener>> entry :
LISTENER_MAP.entrySet()) {
+ removed = unregister(entry.getKey(), listener);
+ if (removed) {
+ break;
+ }
+ }
+ }
+ return removed;
+ }
+
+ @VisibleForTesting
+ public static int getSize() {
+ return LISTENER_MAP.size();
+ }
+
+ /**
+ * When the property was reconfigured, this function will be invoked. This
property listeners will
+ * be notified.
+ *
+ * @param conf the rss conf
+ * @param changedProperties the changed properties
+ */
+ public static synchronized void update(RssConf conf, Map<String, Object>
changedProperties) {
+ for (Map.Entry<Set<String>, List<ReconfigureListener>> entry :
LISTENER_MAP.entrySet()) {
+ // check if the keys is empty, if empty, it means all keys are listened.
+ Set<String> intersection =
+ entry.getKey().isEmpty()
+ ? changedProperties.keySet()
+ : Sets.intersection(entry.getKey(), changedProperties.keySet());
+ if (!intersection.isEmpty()) {
+ Map<String, Object> filteredMap =
+ Maps.filterKeys(changedProperties, intersection::contains);
+ for (ReconfigureListener listener : entry.getValue()) {
+ try {
+ listener.update(conf, filteredMap);
+ } catch (Throwable e) {
+ LOG.warn("Exception while updating config for {}",
changedProperties, e);
+ }
+ }
+ }
+ }
+ }
+
+ public interface ReconfigureListener {
+ /**
+ * When the property changed, this function will be invoked.
+ *
+ * @param conf the rss conf
+ * @param changedProperties the changed properties
+ */
+ void update(RssConf conf, Map<String, Object> changedProperties);
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index 74d1c2bdb..c2003bef9 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -684,6 +684,10 @@ public class RssConf implements Cloneable {
this.settings.remove(key);
}
+ public boolean isSet(String key) {
+ return this.settings.containsKey(key);
+ }
+
public Map<String, Object> getPropsWithPrefix(String confPrefix) {
Map<String, Object> configMap = new HashMap<>();
for (Map.Entry<String, Object> entry : settings.entrySet()) {
diff --git
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
new file mode 100644
index 000000000..b2a17d32c
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests enum type {@link ReconfigurableRegistry}. */
+public class ReconfigurableRegistryTest {
+
+ @Test
+ public void testUpdate() {
+ ReconfigurableBad bad = new ReconfigurableBad();
+ ReconfigurableGood good0 = new ReconfigurableGood();
+ ReconfigurableGood good1 = new ReconfigurableGood();
+ try {
+ ReconfigurableRegistry.register(good0);
+ ReconfigurableRegistry.register(bad);
+ ReconfigurableRegistry.register(good1);
+ ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ assertEquals(1, good0.mInvokeCount);
+ assertEquals(1, good1.mInvokeCount);
+ ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ assertEquals(2, good0.mInvokeCount);
+ assertEquals(2, good1.mInvokeCount);
+ // remove bad guy
+ ReconfigurableRegistry.unregister(bad);
+ ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ assertEquals(3, good0.mInvokeCount);
+ assertEquals(3, good1.mInvokeCount);
+ ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ assertEquals(4, good0.mInvokeCount);
+ assertEquals(4, good1.mInvokeCount);
+ } finally {
+ assertTrue(!ReconfigurableRegistry.unregister(bad));
+ assertTrue(ReconfigurableRegistry.unregister(good0));
+ assertTrue(ReconfigurableRegistry.unregister(good1));
+ assertEquals(0, ReconfigurableRegistry.getSize());
+ }
+ }
+
+ @Test
+ public void testUpdateSpecificKey() {
+ ReconfigurableBad bad = new ReconfigurableBad();
+ ReconfigurableGood good0 = new ReconfigurableGood();
+ ReconfigurableGood good1 = new ReconfigurableGood();
+ ReconfigurableGood good1Follow = new ReconfigurableGood();
+ ReconfigurableGood goodAny = new ReconfigurableGood();
+ ReconfigurableGood good2 = new ReconfigurableGood();
+ ReconfigurableGood good01 = new ReconfigurableGood();
+ try {
+ ReconfigurableRegistry.register("key0", good0);
+ ReconfigurableRegistry.register(bad);
+ ReconfigurableRegistry.register("key1", good1);
+ ReconfigurableRegistry.register("key1", good1Follow);
+ ReconfigurableRegistry.register(goodAny);
+ ReconfigurableRegistry.register("key2", good2);
+ ReconfigurableRegistry.register(Sets.newHashSet("key0", "key1"), good01);
+ ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+
+ assertEquals(0, good0.mInvokeCount);
+ assertEquals(0, good1.mInvokeCount);
+ assertEquals(0, good1Follow.mInvokeCount);
+ assertEquals(1, goodAny.mInvokeCount);
+ assertEquals(0, good2.mInvokeCount);
+ assertEquals(0, good01.mInvokeCount);
+
+ ReconfigurableRegistry.update(null, Collections.singletonMap("key1",
"value1"));
+
+ assertEquals(0, good0.mInvokeCount);
+ assertEquals(1, good1.mInvokeCount);
+ assertEquals(1, good1Follow.mInvokeCount);
+ assertEquals(2, goodAny.mInvokeCount);
+ assertEquals(0, good2.mInvokeCount);
+ assertEquals(1, good01.mInvokeCount);
+
+ Map<String, Object> changedProperties = new HashMap<>();
+ changedProperties.put("key0", "value0");
+ changedProperties.put("key1", "value1");
+ changedProperties.put("key2", "value2");
+ ReconfigurableRegistry.update(null, changedProperties);
+
+ assertEquals(1, good0.mInvokeCount);
+ assertEquals(2, good1.mInvokeCount);
+ assertEquals(2, good1Follow.mInvokeCount);
+ assertEquals(5, goodAny.mInvokeCount);
+ assertEquals(1, good2.mInvokeCount);
+ assertEquals(3, good01.mInvokeCount);
+ } finally {
+ assertTrue(ReconfigurableRegistry.unregister(bad));
+ assertTrue(ReconfigurableRegistry.unregister(Sets.newHashSet("key0"),
good0));
+ assertTrue(ReconfigurableRegistry.unregister(Sets.newHashSet("key1")));
+ assertTrue(ReconfigurableRegistry.unregister(goodAny));
+ assertTrue(ReconfigurableRegistry.unregister("key2", good2));
+ assertTrue(ReconfigurableRegistry.unregister(Sets.newHashSet("key0",
"key1"), good01));
+ assertEquals(0, ReconfigurableRegistry.getSize());
+ }
+ }
+
+ class ReconfigurableBad implements
ReconfigurableRegistry.ReconfigureListener {
+ @Override
+ public void update(RssConf conf, Map<String, Object> changedProperties) {
+ throw new RuntimeException("I am bad guy");
+ }
+ }
+
+ class ReconfigurableGood implements
ReconfigurableRegistry.ReconfigureListener {
+ int mInvokeCount = 0;
+
+ @Override
+ public void update(RssConf rssConf, Map<String, Object> changedProperties)
{
+ mInvokeCount += changedProperties.size();
+ }
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 9f7c5cdd4..d0eecc4a7 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.coordinator;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -34,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ReconfigurableRegistry;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.audit.RpcAuditContext;
@@ -78,23 +78,40 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
private static final Logger AUDIT_LOGGER =
LoggerFactory.getLogger("COORDINATOR_RPC_AUDIT_LOG");
private final CoordinatorServer coordinatorServer;
- private final boolean isRpcAuditLogEnabled;
- private final List<String> rpcAuditExcludeOpList;
+ private boolean isRpcAuditLogEnabled;
+ private List<String> rpcAuditExcludeOpList;
public CoordinatorGrpcService(CoordinatorServer coordinatorServer) {
this.coordinatorServer = coordinatorServer;
isRpcAuditLogEnabled =
coordinatorServer
.getCoordinatorConf()
- .getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
- if (isRpcAuditLogEnabled) {
- rpcAuditExcludeOpList =
- coordinatorServer
- .getCoordinatorConf()
- .get(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST);
- } else {
- rpcAuditExcludeOpList = Collections.emptyList();
- }
+
.getReconfigurableConf(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED)
+ .get();
+ rpcAuditExcludeOpList =
+ coordinatorServer
+ .getCoordinatorConf()
+
.getReconfigurableConf(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST)
+ .get();
+ ReconfigurableRegistry.register(
+ Sets.newHashSet(
+ CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key(),
+ CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST.key()),
+ (conf, changedProperties) -> {
+ if (changedProperties == null || conf == null) {
+ return;
+ }
+ if (changedProperties.containsKey(
+ CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key())) {
+ isRpcAuditLogEnabled =
+
conf.getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
+ }
+ if (changedProperties.containsKey(
+ CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
+ rpcAuditExcludeOpList =
+
conf.get(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST);
+ }
+ });
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index c08c358c0..625584d77 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -70,7 +70,7 @@ public class ShuffleFlushManager {
private final StorageManager storageManager;
private final long pendingEventTimeoutSec;
private FlushEventHandler eventHandler;
- private final boolean isStorageAuditLogEnabled;
+ private boolean isStorageAuditLogEnabled;
public ShuffleFlushManager(
ShuffleServerConf shuffleServerConf,
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index bcb3188c1..a2b4d47d6 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.server;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -42,6 +41,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ReconfigurableRegistry;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
@@ -108,23 +108,38 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
private static final Logger AUDIT_LOGGER =
LoggerFactory.getLogger("SHUFFLE_SERVER_RPC_AUDIT_LOG");
private final ShuffleServer shuffleServer;
- private final boolean isRpcAuditLogEnabled;
- private final List<String> rpcAuditExcludeOpList;
+ private boolean isRpcAuditLogEnabled;
+ private List<String> rpcAuditExcludeOpList;
public ShuffleServerGrpcService(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
isRpcAuditLogEnabled =
shuffleServer
.getShuffleServerConf()
- .getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
- if (isRpcAuditLogEnabled) {
- rpcAuditExcludeOpList =
- shuffleServer
- .getShuffleServerConf()
- .get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
- } else {
- rpcAuditExcludeOpList = Collections.emptyList();
- }
+
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED)
+ .get();
+ rpcAuditExcludeOpList =
+ shuffleServer
+ .getShuffleServerConf()
+
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST)
+ .get();
+ ReconfigurableRegistry.register(
+ Sets.newHashSet(
+ ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key(),
+ ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key()),
+ (conf, changedProperties) -> {
+ if (changedProperties == null) {
+ return;
+ }
+ if
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
{
+ isRpcAuditLogEnabled =
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
+ }
+ if (changedProperties.containsKey(
+ ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
+ rpcAuditExcludeOpList =
+
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
+ }
+ });
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 30fc44451..e40bf4d87 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -19,11 +19,11 @@ package org.apache.uniffle.server.netty;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ReconfigurableRegistry;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
@@ -79,23 +80,38 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
LoggerFactory.getLogger("SHUFFLE_SERVER_RPC_AUDIT_LOG");
private static final int RPC_TIMEOUT = 60000;
private final ShuffleServer shuffleServer;
- private final boolean isRpcAuditLogEnabled;
- private final List<String> rpcAuditExcludeOpList;
+ private boolean isRpcAuditLogEnabled;
+ private List<String> rpcAuditExcludeOpList;
public ShuffleServerNettyHandler(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
isRpcAuditLogEnabled =
shuffleServer
.getShuffleServerConf()
- .getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
- if (isRpcAuditLogEnabled) {
- rpcAuditExcludeOpList =
- shuffleServer
- .getShuffleServerConf()
- .get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
- } else {
- rpcAuditExcludeOpList = Collections.emptyList();
- }
+
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED)
+ .get();
+ rpcAuditExcludeOpList =
+ shuffleServer
+ .getShuffleServerConf()
+
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST)
+ .get();
+ ReconfigurableRegistry.register(
+ Sets.newHashSet(
+ ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key(),
+ ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key()),
+ (conf, changedProperties) -> {
+ if (changedProperties == null) {
+ return;
+ }
+ if
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
{
+ isRpcAuditLogEnabled =
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
+ }
+ if (changedProperties.containsKey(
+ ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
+ rpcAuditExcludeOpList =
+
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
+ }
+ });
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 6be2af740..d9bc2c51b 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.AuditType;
+import org.apache.uniffle.common.ReconfigurableRegistry;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.UnionKey;
import org.apache.uniffle.common.exception.RssException;
@@ -90,7 +91,7 @@ public class LocalStorageManager extends SingleStorageManager
{
private final ConcurrentSkipListMap<String, LocalStorage>
sortedPartitionsOfStorageMap;
private final List<StorageMediaProvider> typeProviders =
Lists.newArrayList();
- private final boolean isStorageAuditLogEnabled;
+ private boolean isStorageAuditLogEnabled;
@VisibleForTesting
LocalStorageManager(ShuffleServerConf conf) {
@@ -175,7 +176,20 @@ public class LocalStorageManager extends
SingleStorageManager {
StringUtils.join(
localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())));
this.checker = new LocalStorageChecker(conf, localStorages);
- isStorageAuditLogEnabled =
conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
+ isStorageAuditLogEnabled =
+
conf.getReconfigurableConf(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED).get();
+ ReconfigurableRegistry.register(
+ ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.toString(),
+ (rssConf, changedProperties) -> {
+ if (changedProperties == null || rssConf == null) {
+ return;
+ }
+ if (changedProperties.containsKey(
+ ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key())) {
+ isStorageAuditLogEnabled =
+
rssConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
+ }
+ });
}
private StorageMedia getStorageTypeForBasePath(String basePath) {
@@ -435,4 +449,10 @@ public class LocalStorageManager extends
SingleStorageManager {
public Map<String, LocalStorage> getSortedPartitionsOfStorageMap() {
return sortedPartitionsOfStorageMap;
}
+
+ @Override
+ public void stop() {
+ super.stop();
+
ReconfigurableRegistry.unregister(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key());
+ }
}