This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f1c14dae [#2158] feat(common): Introduce reconfigure listener for 
server/coordinator conf (#2159)
1f1c14dae is described below

commit 1f1c14dae685a442d0d698f1a501d70d0528db08
Author: maobaolong <[email protected]>
AuthorDate: Sun Oct 6 10:43:12 2024 +0800

    [#2158] feat(common): Introduce reconfigure listener for server/coordinator 
conf (#2159)
    
    ### What changes were proposed in this pull request?
    
    Introduce the ReconfigRegistry and ReconfigureListener to support update 
the changed config value dynamically.
    
    ### Why are the changes needed?
    
    For: #2158
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    new UT.
---
 .../uniffle/common/ReconfigurableConfManager.java  |  11 +-
 .../uniffle/common/ReconfigurableRegistry.java     | 186 +++++++++++++++++++++
 .../org/apache/uniffle/common/config/RssConf.java  |   4 +
 .../uniffle/common/ReconfigurableRegistryTest.java | 139 +++++++++++++++
 .../coordinator/CoordinatorGrpcService.java        |  41 +++--
 .../apache/uniffle/server/ShuffleFlushManager.java |   2 +-
 .../uniffle/server/ShuffleServerGrpcService.java   |  39 +++--
 .../server/netty/ShuffleServerNettyHandler.java    |  40 +++--
 .../server/storage/LocalStorageManager.java        |  24 ++-
 9 files changed, 446 insertions(+), 40 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
index 1c9219ebd..cd750b0aa 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -19,7 +19,10 @@ package org.apache.uniffle.common;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
@@ -107,6 +110,7 @@ public class ReconfigurableConfManager<T> {
     if (latestConf == null) {
       return;
     }
+    Map<String, Object> changedProperties = new HashMap<>();
     for (ConfigOption<T> configOption : updateConfOptions) {
       Optional<T> valOptional = latestConf.getOptional(configOption);
       if (valOptional.isPresent()) {
@@ -118,11 +122,16 @@ public class ReconfigurableConfManager<T> {
               rssConf.get(configOption),
               val);
           rssConf.set(configOption, val);
+          changedProperties.put(configOption.key(), val);
         }
-      } else {
+      } else if (rssConf.isSet(configOption.key())) {
         rssConf.remove(configOption.key());
+        changedProperties.put(configOption.key(), rssConf.get(configOption));
       }
     }
+    if (!changedProperties.isEmpty()) {
+      ReconfigurableRegistry.update(rssConf, 
Collections.unmodifiableMap(changedProperties));
+    }
   }
 
   private RssConf getConfRef() {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
new file mode 100644
index 000000000..7dee23eca
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssConf;
+
+public class ReconfigurableRegistry {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReconfigurableRegistry.class);
+
+  private static final HashMap<Set<String>, List<ReconfigureListener>> 
LISTENER_MAP =
+      new HashMap<>();
+
+  // prevent instantiation
+  private ReconfigurableRegistry() {}
+
+  /**
+   * Add a listener which listens to all properties.
+   *
+   * @param listener the given property listener
+   */
+  public static synchronized void register(ReconfigureListener listener) {
+    register(Collections.emptySet(), listener);
+  }
+
+  /**
+   * Add a listener which listens to the given keys.
+   *
+   * @param key the given key
+   * @param listener the given property listener
+   */
+  public static synchronized void register(String key, ReconfigureListener 
listener) {
+    register(Sets.newHashSet(key), listener);
+  }
+
+  /**
+   * Add a listener which listens to the given keys.
+   *
+   * @param keys the given keys
+   * @param listener the given property listener
+   */
+  public static synchronized void register(Set<String> keys, 
ReconfigureListener listener) {
+    List listenerList = LISTENER_MAP.computeIfAbsent(keys, k -> new 
ArrayList<>());
+    listenerList.add(listener);
+  }
+
+  /**
+   * Remove all listeners related to the given key.
+   *
+   * @param key the given key
+   * @return true if the listeners are removed, otherwise false
+   */
+  public static synchronized boolean unregister(String key) {
+    return unregister(Sets.newHashSet(key));
+  }
+
+  /**
+   * Remove all listeners related to the given keys.
+   *
+   * @param keys the given keys
+   * @return true if the listeners are removed, otherwise false
+   */
+  public static synchronized boolean unregister(Set<String> keys) {
+    return LISTENER_MAP.remove(keys) != null;
+  }
+
+  /**
+   * Remove the listener related to the given keys.
+   *
+   * @param key the given key
+   * @param listener the given listener
+   * @return true if the listeners are removed, otherwise false
+   */
+  public static synchronized boolean unregister(String key, 
ReconfigureListener listener) {
+    return unregister(Sets.newHashSet(key), listener);
+  }
+
+  /**
+   * Remove the listener related to the given keys.
+   *
+   * @param keys the given keys
+   * @param listener the given listener
+   * @return true if the listeners are removed, otherwise false
+   */
+  public static synchronized boolean unregister(Set<String> keys, 
ReconfigureListener listener) {
+    List<ReconfigureListener> listenerList = LISTENER_MAP.get(keys);
+    if (listenerList == null) {
+      return false;
+    }
+    boolean removed = listenerList.remove(listener);
+    if (listenerList.isEmpty()) {
+      LISTENER_MAP.remove(keys);
+    }
+    return removed;
+  }
+
+  /**
+   * Remove the listener from all keys listeners first, if the listener is not 
in any keys, will
+   * scan all the listener maps to remove the listener.
+   *
+   * @param listener the given listener
+   * @return true if the listeners are removed, otherwise false
+   */
+  public static synchronized boolean unregister(ReconfigureListener listener) {
+    boolean removed = unregister(Collections.emptySet(), listener);
+    if (!removed) {
+      for (Map.Entry<Set<String>, List<ReconfigureListener>> entry : 
LISTENER_MAP.entrySet()) {
+        removed = unregister(entry.getKey(), listener);
+        if (removed) {
+          break;
+        }
+      }
+    }
+    return removed;
+  }
+
+  @VisibleForTesting
+  public static int getSize() {
+    return LISTENER_MAP.size();
+  }
+
+  /**
+   * When the property was reconfigured, this function will be invoked. This 
property listeners will
+   * be notified.
+   *
+   * @param conf the rss conf
+   * @param changedProperties the changed properties
+   */
+  public static synchronized void update(RssConf conf, Map<String, Object> 
changedProperties) {
+    for (Map.Entry<Set<String>, List<ReconfigureListener>> entry : 
LISTENER_MAP.entrySet()) {
+      // check if the keys is empty, if empty, it means all keys are listened.
+      Set<String> intersection =
+          entry.getKey().isEmpty()
+              ? changedProperties.keySet()
+              : Sets.intersection(entry.getKey(), changedProperties.keySet());
+      if (!intersection.isEmpty()) {
+        Map<String, Object> filteredMap =
+            Maps.filterKeys(changedProperties, intersection::contains);
+        for (ReconfigureListener listener : entry.getValue()) {
+          try {
+            listener.update(conf, filteredMap);
+          } catch (Throwable e) {
+            LOG.warn("Exception while updating config for {}", 
changedProperties, e);
+          }
+        }
+      }
+    }
+  }
+
+  public interface ReconfigureListener {
+    /**
+     * When the property changed, this function will be invoked.
+     *
+     * @param conf the rss conf
+     * @param changedProperties the changed properties
+     */
+    void update(RssConf conf, Map<String, Object> changedProperties);
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index 74d1c2bdb..c2003bef9 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -684,6 +684,10 @@ public class RssConf implements Cloneable {
     this.settings.remove(key);
   }
 
+  public boolean isSet(String key) {
+    return this.settings.containsKey(key);
+  }
+
   public Map<String, Object> getPropsWithPrefix(String confPrefix) {
     Map<String, Object> configMap = new HashMap<>();
     for (Map.Entry<String, Object> entry : settings.entrySet()) {
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
new file mode 100644
index 000000000..b2a17d32c
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests enum type {@link ReconfigurableRegistry}. */
+public class ReconfigurableRegistryTest {
+
+  @Test
+  public void testUpdate() {
+    ReconfigurableBad bad = new ReconfigurableBad();
+    ReconfigurableGood good0 = new ReconfigurableGood();
+    ReconfigurableGood good1 = new ReconfigurableGood();
+    try {
+      ReconfigurableRegistry.register(good0);
+      ReconfigurableRegistry.register(bad);
+      ReconfigurableRegistry.register(good1);
+      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      assertEquals(1, good0.mInvokeCount);
+      assertEquals(1, good1.mInvokeCount);
+      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      assertEquals(2, good0.mInvokeCount);
+      assertEquals(2, good1.mInvokeCount);
+      // remove bad guy
+      ReconfigurableRegistry.unregister(bad);
+      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      assertEquals(3, good0.mInvokeCount);
+      assertEquals(3, good1.mInvokeCount);
+      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      assertEquals(4, good0.mInvokeCount);
+      assertEquals(4, good1.mInvokeCount);
+    } finally {
+      assertTrue(!ReconfigurableRegistry.unregister(bad));
+      assertTrue(ReconfigurableRegistry.unregister(good0));
+      assertTrue(ReconfigurableRegistry.unregister(good1));
+      assertEquals(0, ReconfigurableRegistry.getSize());
+    }
+  }
+
+  @Test
+  public void testUpdateSpecificKey() {
+    ReconfigurableBad bad = new ReconfigurableBad();
+    ReconfigurableGood good0 = new ReconfigurableGood();
+    ReconfigurableGood good1 = new ReconfigurableGood();
+    ReconfigurableGood good1Follow = new ReconfigurableGood();
+    ReconfigurableGood goodAny = new ReconfigurableGood();
+    ReconfigurableGood good2 = new ReconfigurableGood();
+    ReconfigurableGood good01 = new ReconfigurableGood();
+    try {
+      ReconfigurableRegistry.register("key0", good0);
+      ReconfigurableRegistry.register(bad);
+      ReconfigurableRegistry.register("key1", good1);
+      ReconfigurableRegistry.register("key1", good1Follow);
+      ReconfigurableRegistry.register(goodAny);
+      ReconfigurableRegistry.register("key2", good2);
+      ReconfigurableRegistry.register(Sets.newHashSet("key0", "key1"), good01);
+      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+
+      assertEquals(0, good0.mInvokeCount);
+      assertEquals(0, good1.mInvokeCount);
+      assertEquals(0, good1Follow.mInvokeCount);
+      assertEquals(1, goodAny.mInvokeCount);
+      assertEquals(0, good2.mInvokeCount);
+      assertEquals(0, good01.mInvokeCount);
+
+      ReconfigurableRegistry.update(null, Collections.singletonMap("key1", 
"value1"));
+
+      assertEquals(0, good0.mInvokeCount);
+      assertEquals(1, good1.mInvokeCount);
+      assertEquals(1, good1Follow.mInvokeCount);
+      assertEquals(2, goodAny.mInvokeCount);
+      assertEquals(0, good2.mInvokeCount);
+      assertEquals(1, good01.mInvokeCount);
+
+      Map<String, Object> changedProperties = new HashMap<>();
+      changedProperties.put("key0", "value0");
+      changedProperties.put("key1", "value1");
+      changedProperties.put("key2", "value2");
+      ReconfigurableRegistry.update(null, changedProperties);
+
+      assertEquals(1, good0.mInvokeCount);
+      assertEquals(2, good1.mInvokeCount);
+      assertEquals(2, good1Follow.mInvokeCount);
+      assertEquals(5, goodAny.mInvokeCount);
+      assertEquals(1, good2.mInvokeCount);
+      assertEquals(3, good01.mInvokeCount);
+    } finally {
+      assertTrue(ReconfigurableRegistry.unregister(bad));
+      assertTrue(ReconfigurableRegistry.unregister(Sets.newHashSet("key0"), 
good0));
+      assertTrue(ReconfigurableRegistry.unregister(Sets.newHashSet("key1")));
+      assertTrue(ReconfigurableRegistry.unregister(goodAny));
+      assertTrue(ReconfigurableRegistry.unregister("key2", good2));
+      assertTrue(ReconfigurableRegistry.unregister(Sets.newHashSet("key0", 
"key1"), good01));
+      assertEquals(0, ReconfigurableRegistry.getSize());
+    }
+  }
+
+  class ReconfigurableBad implements 
ReconfigurableRegistry.ReconfigureListener {
+    @Override
+    public void update(RssConf conf, Map<String, Object> changedProperties) {
+      throw new RuntimeException("I am bad guy");
+    }
+  }
+
+  class ReconfigurableGood implements 
ReconfigurableRegistry.ReconfigureListener {
+    int mInvokeCount = 0;
+
+    @Override
+    public void update(RssConf rssConf, Map<String, Object> changedProperties) 
{
+      mInvokeCount += changedProperties.size();
+    }
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 9f7c5cdd4..d0eecc4a7 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.coordinator;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ReconfigurableRegistry;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.audit.RpcAuditContext;
@@ -78,23 +78,40 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   private static final Logger AUDIT_LOGGER = 
LoggerFactory.getLogger("COORDINATOR_RPC_AUDIT_LOG");
 
   private final CoordinatorServer coordinatorServer;
-  private final boolean isRpcAuditLogEnabled;
-  private final List<String> rpcAuditExcludeOpList;
+  private boolean isRpcAuditLogEnabled;
+  private List<String> rpcAuditExcludeOpList;
 
   public CoordinatorGrpcService(CoordinatorServer coordinatorServer) {
     this.coordinatorServer = coordinatorServer;
     isRpcAuditLogEnabled =
         coordinatorServer
             .getCoordinatorConf()
-            .getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
-    if (isRpcAuditLogEnabled) {
-      rpcAuditExcludeOpList =
-          coordinatorServer
-              .getCoordinatorConf()
-              .get(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST);
-    } else {
-      rpcAuditExcludeOpList = Collections.emptyList();
-    }
+            
.getReconfigurableConf(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED)
+            .get();
+    rpcAuditExcludeOpList =
+        coordinatorServer
+            .getCoordinatorConf()
+            
.getReconfigurableConf(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST)
+            .get();
+    ReconfigurableRegistry.register(
+        Sets.newHashSet(
+            CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key(),
+            CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST.key()),
+        (conf, changedProperties) -> {
+          if (changedProperties == null || conf == null) {
+            return;
+          }
+          if (changedProperties.containsKey(
+              CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key())) {
+            isRpcAuditLogEnabled =
+                
conf.getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
+          }
+          if (changedProperties.containsKey(
+              CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
+            rpcAuditExcludeOpList =
+                
conf.get(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST);
+          }
+        });
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index c08c358c0..625584d77 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -70,7 +70,7 @@ public class ShuffleFlushManager {
   private final StorageManager storageManager;
   private final long pendingEventTimeoutSec;
   private FlushEventHandler eventHandler;
-  private final boolean isStorageAuditLogEnabled;
+  private boolean isStorageAuditLogEnabled;
 
   public ShuffleFlushManager(
       ShuffleServerConf shuffleServerConf,
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index bcb3188c1..a2b4d47d6 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.server;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -42,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ReconfigurableRegistry;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
@@ -108,23 +108,38 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   private static final Logger AUDIT_LOGGER =
       LoggerFactory.getLogger("SHUFFLE_SERVER_RPC_AUDIT_LOG");
   private final ShuffleServer shuffleServer;
-  private final boolean isRpcAuditLogEnabled;
-  private final List<String> rpcAuditExcludeOpList;
+  private boolean isRpcAuditLogEnabled;
+  private List<String> rpcAuditExcludeOpList;
 
   public ShuffleServerGrpcService(ShuffleServer shuffleServer) {
     this.shuffleServer = shuffleServer;
     isRpcAuditLogEnabled =
         shuffleServer
             .getShuffleServerConf()
-            .getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
-    if (isRpcAuditLogEnabled) {
-      rpcAuditExcludeOpList =
-          shuffleServer
-              .getShuffleServerConf()
-              .get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
-    } else {
-      rpcAuditExcludeOpList = Collections.emptyList();
-    }
+            
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED)
+            .get();
+    rpcAuditExcludeOpList =
+        shuffleServer
+            .getShuffleServerConf()
+            
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST)
+            .get();
+    ReconfigurableRegistry.register(
+        Sets.newHashSet(
+            ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key(),
+            ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key()),
+        (conf, changedProperties) -> {
+          if (changedProperties == null) {
+            return;
+          }
+          if 
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
 {
+            isRpcAuditLogEnabled = 
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
+          }
+          if (changedProperties.containsKey(
+              ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
+            rpcAuditExcludeOpList =
+                
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
+          }
+        });
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 30fc44451..e40bf4d87 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -19,11 +19,11 @@ package org.apache.uniffle.server.netty;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ReconfigurableRegistry;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
@@ -79,23 +80,38 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       LoggerFactory.getLogger("SHUFFLE_SERVER_RPC_AUDIT_LOG");
   private static final int RPC_TIMEOUT = 60000;
   private final ShuffleServer shuffleServer;
-  private final boolean isRpcAuditLogEnabled;
-  private final List<String> rpcAuditExcludeOpList;
+  private boolean isRpcAuditLogEnabled;
+  private List<String> rpcAuditExcludeOpList;
 
   public ShuffleServerNettyHandler(ShuffleServer shuffleServer) {
     this.shuffleServer = shuffleServer;
     isRpcAuditLogEnabled =
         shuffleServer
             .getShuffleServerConf()
-            .getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
-    if (isRpcAuditLogEnabled) {
-      rpcAuditExcludeOpList =
-          shuffleServer
-              .getShuffleServerConf()
-              .get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
-    } else {
-      rpcAuditExcludeOpList = Collections.emptyList();
-    }
+            
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED)
+            .get();
+    rpcAuditExcludeOpList =
+        shuffleServer
+            .getShuffleServerConf()
+            
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST)
+            .get();
+    ReconfigurableRegistry.register(
+        Sets.newHashSet(
+            ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key(),
+            ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key()),
+        (conf, changedProperties) -> {
+          if (changedProperties == null) {
+            return;
+          }
+          if 
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
 {
+            isRpcAuditLogEnabled = 
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
+          }
+          if (changedProperties.containsKey(
+              ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
+            rpcAuditExcludeOpList =
+                
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
+          }
+        });
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 6be2af740..d9bc2c51b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.AuditType;
+import org.apache.uniffle.common.ReconfigurableRegistry;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.UnionKey;
 import org.apache.uniffle.common.exception.RssException;
@@ -90,7 +91,7 @@ public class LocalStorageManager extends SingleStorageManager 
{
   private final ConcurrentSkipListMap<String, LocalStorage> 
sortedPartitionsOfStorageMap;
   private final List<StorageMediaProvider> typeProviders = 
Lists.newArrayList();
 
-  private final boolean isStorageAuditLogEnabled;
+  private boolean isStorageAuditLogEnabled;
 
   @VisibleForTesting
   LocalStorageManager(ShuffleServerConf conf) {
@@ -175,7 +176,20 @@ public class LocalStorageManager extends 
SingleStorageManager {
         StringUtils.join(
             
localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())));
     this.checker = new LocalStorageChecker(conf, localStorages);
-    isStorageAuditLogEnabled = 
conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
+    isStorageAuditLogEnabled =
+        
conf.getReconfigurableConf(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED).get();
+    ReconfigurableRegistry.register(
+        ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.toString(),
+        (rssConf, changedProperties) -> {
+          if (changedProperties == null || rssConf == null) {
+            return;
+          }
+          if (changedProperties.containsKey(
+              ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key())) {
+            isStorageAuditLogEnabled =
+                
rssConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
+          }
+        });
   }
 
   private StorageMedia getStorageTypeForBasePath(String basePath) {
@@ -435,4 +449,10 @@ public class LocalStorageManager extends 
SingleStorageManager {
   public Map<String, LocalStorage> getSortedPartitionsOfStorageMap() {
     return sortedPartitionsOfStorageMap;
   }
+
+  @Override
+  public void stop() {
+    super.stop();
+    
ReconfigurableRegistry.unregister(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key());
+  }
 }

Reply via email to