This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 78fe934ef [#2158][FOLLOWUP] improvement(server)(coordinator): Support 
notify reconfig listeners from rest api (#2170)
78fe934ef is described below

commit 78fe934ef7c96e7135a1b3087a5d00567d268dcb
Author: maobaolong <[email protected]>
AuthorDate: Sat Oct 12 22:09:25 2024 +0800

    [#2158][FOLLOWUP] improvement(server)(coordinator): Support notify reconfig 
listeners from rest api (#2170)
    
    ### What changes were proposed in this pull request?
    
    - Refactor `ReconfigurableRegistry#update` to only notify the key of 
changed properties.
    - Support update multiply keys within a single rest request.
    - Support notify the listener by  rest api `/confOps/update`.
    
    ### Why are the changes needed?
    
    Fix: #2158
    
    ### Does this PR introduce _any_ user-facing change?
    
    Support rest api trigger update to reconfig listener
    
    ### How was this patch tested?
    
    - curl command line
    ```Console
    ➜  ~ curl -X POST http://localhost:19978/api/shuffleServer/confOps/update \
    -H "Content-Type: application/json" \
    -d '{"update":{"rss.server.rpc.audit.log.enabled": "true", 
"rss.server.rpc.audit.log.excludeList": ""}}'
    temporarily effective until restart: Update successfully%
    ```
    
    - server.log
    ```
    [2024-10-12 09:37:59.480] [Jetty-17] [INFO] ConfOpsResource - Dynamic 
updating ConfVO{update={rss.server.rpc.audit.log.enabled=true, 
rss.server.rpc.audit.log.excludeList=}, delete=[]}
    
    ```
---
 .../uniffle/common/ReconfigurableConfManager.java  | 12 +++---
 .../uniffle/common/ReconfigurableRegistry.java     | 19 +++++----
 .../common/web/resource/ConfOpsResource.java       | 37 +++++++++--------
 .../apache/uniffle/common/web/resource/ConfVO.java | 48 ++++++++++++++++++++++
 .../uniffle/common/ReconfigurableRegistryTest.java | 34 ++++++++-------
 .../coordinator/CoordinatorGrpcService.java        |  5 +--
 .../uniffle/server/ShuffleServerGrpcService.java   |  4 +-
 .../server/netty/ShuffleServerNettyHandler.java    |  4 +-
 .../server/storage/LocalStorageManager.java        |  2 +-
 .../server/storage/LocalStorageManagerTest.java    |  5 +--
 10 files changed, 115 insertions(+), 55 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
index cd750b0aa..8e9fb99b3 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -20,11 +20,11 @@ package org.apache.uniffle.common;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -110,7 +110,7 @@ public class ReconfigurableConfManager<T> {
     if (latestConf == null) {
       return;
     }
-    Map<String, Object> changedProperties = new HashMap<>();
+    Set<String> changedProperties = new HashSet<>();
     for (ConfigOption<T> configOption : updateConfOptions) {
       Optional<T> valOptional = latestConf.getOptional(configOption);
       if (valOptional.isPresent()) {
@@ -122,15 +122,15 @@ public class ReconfigurableConfManager<T> {
               rssConf.get(configOption),
               val);
           rssConf.set(configOption, val);
-          changedProperties.put(configOption.key(), val);
+          changedProperties.add(configOption.key());
         }
       } else if (rssConf.isSet(configOption.key())) {
         rssConf.remove(configOption.key());
-        changedProperties.put(configOption.key(), rssConf.get(configOption));
+        changedProperties.add(configOption.key());
       }
     }
     if (!changedProperties.isEmpty()) {
-      ReconfigurableRegistry.update(rssConf, 
Collections.unmodifiableMap(changedProperties));
+      ReconfigurableRegistry.update(rssConf, 
Collections.unmodifiableSet(changedProperties));
     }
   }
 
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
index 7dee23eca..d15506c8f 100644
--- a/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
+++ b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -146,6 +145,11 @@ public class ReconfigurableRegistry {
     return LISTENER_MAP.size();
   }
 
+  @VisibleForTesting
+  public static void clear() {
+    LISTENER_MAP.clear();
+  }
+
   /**
    * When the property was reconfigured, this function will be invoked. This 
property listeners will
    * be notified.
@@ -153,19 +157,18 @@ public class ReconfigurableRegistry {
    * @param conf the rss conf
    * @param changedProperties the changed properties
    */
-  public static synchronized void update(RssConf conf, Map<String, Object> 
changedProperties) {
+  public static synchronized void update(RssConf conf, Set<String> 
changedProperties) {
     for (Map.Entry<Set<String>, List<ReconfigureListener>> entry : 
LISTENER_MAP.entrySet()) {
       // check if the keys is empty, if empty, it means all keys are listened.
       Set<String> intersection =
           entry.getKey().isEmpty()
-              ? changedProperties.keySet()
-              : Sets.intersection(entry.getKey(), changedProperties.keySet());
+              ? changedProperties
+              : Sets.intersection(entry.getKey(), changedProperties);
       if (!intersection.isEmpty()) {
-        Map<String, Object> filteredMap =
-            Maps.filterKeys(changedProperties, intersection::contains);
+        Set<String> filteredSet = Sets.filter(changedProperties, 
intersection::contains);
         for (ReconfigureListener listener : entry.getValue()) {
           try {
-            listener.update(conf, filteredMap);
+            listener.update(conf, filteredSet);
           } catch (Throwable e) {
             LOG.warn("Exception while updating config for {}", 
changedProperties, e);
           }
@@ -181,6 +184,6 @@ public class ReconfigurableRegistry {
      * @param conf the rss conf
      * @param changedProperties the changed properties
      */
-    void update(RssConf conf, Map<String, Object> changedProperties);
+    void update(RssConf conf, Set<String> changedProperties);
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
 
b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
index 0f8e101bb..ea117aa75 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
@@ -17,17 +17,20 @@
 
 package org.apache.uniffle.common.web.resource;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import javax.servlet.ServletContext;
 
-import org.apache.hbase.thirdparty.javax.ws.rs.DefaultValue;
-import org.apache.hbase.thirdparty.javax.ws.rs.FormParam;
+import org.apache.hbase.thirdparty.javax.ws.rs.Consumes;
 import org.apache.hbase.thirdparty.javax.ws.rs.POST;
 import org.apache.hbase.thirdparty.javax.ws.rs.Path;
 import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
-import org.apache.logging.log4j.util.Strings;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ReconfigurableRegistry;
 import org.apache.uniffle.common.config.RssConf;
 
 @Path("/confOps")
@@ -41,24 +44,26 @@ public class ConfOpsResource {
   @Authorization
   @POST
   @Path("/update")
-  public String update(
-      @FormParam("key") String key,
-      @FormParam("value") String value,
-      @FormParam("delete") @DefaultValue("false") boolean delete) {
-    LOG.info("Dynamic updating {} to {}, delete={}", key, value, delete);
-    if (Strings.isNotEmpty(key)) {
+  @Consumes(MediaType.APPLICATION_JSON)
+  public String update(ConfVO updateConfigs) {
+    LOG.info("Dynamic updating {}", updateConfigs);
+    String ret = "Nothing changed";
+    if (updateConfigs != null) {
       RssConf conf = (RssConf) 
servletContext.getAttribute(SERVLET_CONTEXT_ATTR_CONF);
       if (conf != null) {
-        if (delete) {
+        Set<String> changedProperties = new HashSet<>();
+        for (Map.Entry<String, String> entry : 
updateConfigs.getUpdate().entrySet()) {
+          conf.setString(entry.getKey(), entry.getValue());
+          changedProperties.add(entry.getKey());
+        }
+        for (String key : updateConfigs.getDelete()) {
           conf.remove(key);
-          return String.format("Remove(%s) key: %s", WARNING_MSG, key);
-        } else {
-          String oldValue = conf.getString(key, null);
-          conf.setString(key, value);
-          return String.format("Set(%s) key: %s from %s to %s", WARNING_MSG, 
key, oldValue, value);
+          changedProperties.add(key);
         }
+        ReconfigurableRegistry.update(conf, changedProperties);
+        ret = WARNING_MSG + ": Update successfully";
       }
     }
-    return "Nothing changed";
+    return ret;
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/web/resource/ConfVO.java 
b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfVO.java
new file mode 100644
index 000000000..069c60818
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfVO.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.web.resource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ConfVO {
+  private Map<String, String> update = Collections.emptyMap();
+  private List<String> delete = Collections.emptyList();
+
+  public List<String> getDelete() {
+    return delete;
+  }
+
+  public void setDelete(List<String> delete) {
+    this.delete = delete;
+  }
+
+  public Map<String, String> getUpdate() {
+    return update;
+  }
+
+  public void setUpdate(Map<String, String> update) {
+    this.update = update;
+  }
+
+  @Override
+  public String toString() {
+    return "ConfVO{" + "update=" + update + ", delete=" + delete + '}';
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
index b2a17d32c..3559d815a 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
@@ -18,10 +18,11 @@
 package org.apache.uniffle.common;
 
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
 import com.google.common.collect.Sets;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.config.RssConf;
@@ -32,6 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Tests enum type {@link ReconfigurableRegistry}. */
 public class ReconfigurableRegistryTest {
 
+  @BeforeEach
+  public void before() {
+    ReconfigurableRegistry.clear();
+  }
+
   @Test
   public void testUpdate() {
     ReconfigurableBad bad = new ReconfigurableBad();
@@ -41,18 +47,18 @@ public class ReconfigurableRegistryTest {
       ReconfigurableRegistry.register(good0);
       ReconfigurableRegistry.register(bad);
       ReconfigurableRegistry.register(good1);
-      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      ReconfigurableRegistry.update(null, Collections.singleton("key"));
       assertEquals(1, good0.mInvokeCount);
       assertEquals(1, good1.mInvokeCount);
-      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      ReconfigurableRegistry.update(null, Collections.singleton("key"));
       assertEquals(2, good0.mInvokeCount);
       assertEquals(2, good1.mInvokeCount);
       // remove bad guy
       ReconfigurableRegistry.unregister(bad);
-      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      ReconfigurableRegistry.update(null, Collections.singleton("key"));
       assertEquals(3, good0.mInvokeCount);
       assertEquals(3, good1.mInvokeCount);
-      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      ReconfigurableRegistry.update(null, Collections.singleton("key"));
       assertEquals(4, good0.mInvokeCount);
       assertEquals(4, good1.mInvokeCount);
     } finally {
@@ -80,7 +86,7 @@ public class ReconfigurableRegistryTest {
       ReconfigurableRegistry.register(goodAny);
       ReconfigurableRegistry.register("key2", good2);
       ReconfigurableRegistry.register(Sets.newHashSet("key0", "key1"), good01);
-      ReconfigurableRegistry.update(null, Collections.singletonMap("key", 
"value"));
+      ReconfigurableRegistry.update(null, Collections.singleton("key"));
 
       assertEquals(0, good0.mInvokeCount);
       assertEquals(0, good1.mInvokeCount);
@@ -89,7 +95,7 @@ public class ReconfigurableRegistryTest {
       assertEquals(0, good2.mInvokeCount);
       assertEquals(0, good01.mInvokeCount);
 
-      ReconfigurableRegistry.update(null, Collections.singletonMap("key1", 
"value1"));
+      ReconfigurableRegistry.update(null, Collections.singleton("key1"));
 
       assertEquals(0, good0.mInvokeCount);
       assertEquals(1, good1.mInvokeCount);
@@ -98,10 +104,10 @@ public class ReconfigurableRegistryTest {
       assertEquals(0, good2.mInvokeCount);
       assertEquals(1, good01.mInvokeCount);
 
-      Map<String, Object> changedProperties = new HashMap<>();
-      changedProperties.put("key0", "value0");
-      changedProperties.put("key1", "value1");
-      changedProperties.put("key2", "value2");
+      Set<String> changedProperties = new HashSet<>();
+      changedProperties.add("key0");
+      changedProperties.add("key1");
+      changedProperties.add("key2");
       ReconfigurableRegistry.update(null, changedProperties);
 
       assertEquals(1, good0.mInvokeCount);
@@ -123,7 +129,7 @@ public class ReconfigurableRegistryTest {
 
   class ReconfigurableBad implements 
ReconfigurableRegistry.ReconfigureListener {
     @Override
-    public void update(RssConf conf, Map<String, Object> changedProperties) {
+    public void update(RssConf conf, Set<String> changedProperties) {
       throw new RuntimeException("I am bad guy");
     }
   }
@@ -132,7 +138,7 @@ public class ReconfigurableRegistryTest {
     int mInvokeCount = 0;
 
     @Override
-    public void update(RssConf rssConf, Map<String, Object> changedProperties) 
{
+    public void update(RssConf rssConf, Set<String> changedProperties) {
       mInvokeCount += changedProperties.size();
     }
   }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index d0eecc4a7..33e08c474 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -101,12 +101,11 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
           if (changedProperties == null || conf == null) {
             return;
           }
-          if (changedProperties.containsKey(
-              CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key())) {
+          if 
(changedProperties.contains(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key()))
 {
             isRpcAuditLogEnabled =
                 
conf.getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
           }
-          if (changedProperties.containsKey(
+          if (changedProperties.contains(
               CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
             rpcAuditExcludeOpList =
                 
conf.get(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index d43d1e71f..8ed1fae73 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -130,10 +130,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
           if (changedProperties == null) {
             return;
           }
-          if 
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
 {
+          if 
(changedProperties.contains(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
 {
             isRpcAuditLogEnabled = 
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
           }
-          if (changedProperties.containsKey(
+          if (changedProperties.contains(
               ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
             rpcAuditExcludeOpList =
                 
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 3c7bb736f..b352fdae9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -102,10 +102,10 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           if (changedProperties == null) {
             return;
           }
-          if 
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
 {
+          if 
(changedProperties.contains(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
 {
             isRpcAuditLogEnabled = 
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
           }
-          if (changedProperties.containsKey(
+          if (changedProperties.contains(
               ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
             rpcAuditExcludeOpList =
                 
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index d9bc2c51b..ab5824896 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -184,7 +184,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
           if (changedProperties == null || rssConf == null) {
             return;
           }
-          if (changedProperties.containsKey(
+          if (changedProperties.contains(
               ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key())) {
             isStorageAuditLogEnabled =
                 
rssConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index 63c5c12dd..374c7a14f 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -53,7 +53,6 @@ import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -366,8 +365,6 @@ public class LocalStorageManagerTest {
     shuffleTaskInfos.put("app1", new ShuffleTaskInfo("app1"));
     shuffleTaskInfos.put("app2", new ShuffleTaskInfo("app2"));
     
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
-    TestLoggerExtension testLogger = 
TestLoggerExtension.getTestLogger(context);
-    assertFalse(testLogger.wasLogged("app"));
 
     // test race condition case, app 3 is new app
     shuffleTaskInfos.put("3", new ShuffleTaskInfo("app3"));
@@ -375,6 +372,8 @@ public class LocalStorageManagerTest {
     
when(mockLocalStorage.getAppIds()).thenReturn(Collections.singleton("app3"));
     storages.add(mockLocalStorage);
     
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
+
+    TestLoggerExtension testLogger = 
TestLoggerExtension.getTestLogger(context);
     assertTrue(testLogger.wasLogged("Delete shuffle data for 
appId\\[app3\\]"));
     System.out.println();
   }

Reply via email to