This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 78fe934ef [#2158][FOLLOWUP] improvement(server)(coordinator): Support
notify reconfig listeners from rest api (#2170)
78fe934ef is described below
commit 78fe934ef7c96e7135a1b3087a5d00567d268dcb
Author: maobaolong <[email protected]>
AuthorDate: Sat Oct 12 22:09:25 2024 +0800
[#2158][FOLLOWUP] improvement(server)(coordinator): Support notify reconfig
listeners from rest api (#2170)
### What changes were proposed in this pull request?
- Refactor `ReconfigurableRegistry#update` to only notify the key of
changed properties.
- Support update multiply keys within a single rest request.
- Support notify the listener by rest api `/confOps/update`.
### Why are the changes needed?
Fix: #2158
### Does this PR introduce _any_ user-facing change?
Support rest api trigger update to reconfig listener
### How was this patch tested?
- curl command line
```Console
➜ ~ curl -X POST http://localhost:19978/api/shuffleServer/confOps/update \
-H "Content-Type: application/json" \
-d '{"update":{"rss.server.rpc.audit.log.enabled": "true",
"rss.server.rpc.audit.log.excludeList": ""}}'
temporarily effective until restart: Update successfully%
```
- server.log
```
[2024-10-12 09:37:59.480] [Jetty-17] [INFO] ConfOpsResource - Dynamic
updating ConfVO{update={rss.server.rpc.audit.log.enabled=true,
rss.server.rpc.audit.log.excludeList=}, delete=[]}
```
---
.../uniffle/common/ReconfigurableConfManager.java | 12 +++---
.../uniffle/common/ReconfigurableRegistry.java | 19 +++++----
.../common/web/resource/ConfOpsResource.java | 37 +++++++++--------
.../apache/uniffle/common/web/resource/ConfVO.java | 48 ++++++++++++++++++++++
.../uniffle/common/ReconfigurableRegistryTest.java | 34 ++++++++-------
.../coordinator/CoordinatorGrpcService.java | 5 +--
.../uniffle/server/ShuffleServerGrpcService.java | 4 +-
.../server/netty/ShuffleServerNettyHandler.java | 4 +-
.../server/storage/LocalStorageManager.java | 2 +-
.../server/storage/LocalStorageManagerTest.java | 5 +--
10 files changed, 115 insertions(+), 55 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
index cd750b0aa..8e9fb99b3 100644
---
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
+++
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -20,11 +20,11 @@ package org.apache.uniffle.common;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -110,7 +110,7 @@ public class ReconfigurableConfManager<T> {
if (latestConf == null) {
return;
}
- Map<String, Object> changedProperties = new HashMap<>();
+ Set<String> changedProperties = new HashSet<>();
for (ConfigOption<T> configOption : updateConfOptions) {
Optional<T> valOptional = latestConf.getOptional(configOption);
if (valOptional.isPresent()) {
@@ -122,15 +122,15 @@ public class ReconfigurableConfManager<T> {
rssConf.get(configOption),
val);
rssConf.set(configOption, val);
- changedProperties.put(configOption.key(), val);
+ changedProperties.add(configOption.key());
}
} else if (rssConf.isSet(configOption.key())) {
rssConf.remove(configOption.key());
- changedProperties.put(configOption.key(), rssConf.get(configOption));
+ changedProperties.add(configOption.key());
}
}
if (!changedProperties.isEmpty()) {
- ReconfigurableRegistry.update(rssConf,
Collections.unmodifiableMap(changedProperties));
+ ReconfigurableRegistry.update(rssConf,
Collections.unmodifiableSet(changedProperties));
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
index 7dee23eca..d15506c8f 100644
--- a/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
+++ b/common/src/main/java/org/apache/uniffle/common/ReconfigurableRegistry.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,6 +145,11 @@ public class ReconfigurableRegistry {
return LISTENER_MAP.size();
}
+ @VisibleForTesting
+ public static void clear() {
+ LISTENER_MAP.clear();
+ }
+
/**
* When the property was reconfigured, this function will be invoked. This
property listeners will
* be notified.
@@ -153,19 +157,18 @@ public class ReconfigurableRegistry {
* @param conf the rss conf
* @param changedProperties the changed properties
*/
- public static synchronized void update(RssConf conf, Map<String, Object>
changedProperties) {
+ public static synchronized void update(RssConf conf, Set<String>
changedProperties) {
for (Map.Entry<Set<String>, List<ReconfigureListener>> entry :
LISTENER_MAP.entrySet()) {
// check if the keys is empty, if empty, it means all keys are listened.
Set<String> intersection =
entry.getKey().isEmpty()
- ? changedProperties.keySet()
- : Sets.intersection(entry.getKey(), changedProperties.keySet());
+ ? changedProperties
+ : Sets.intersection(entry.getKey(), changedProperties);
if (!intersection.isEmpty()) {
- Map<String, Object> filteredMap =
- Maps.filterKeys(changedProperties, intersection::contains);
+ Set<String> filteredSet = Sets.filter(changedProperties,
intersection::contains);
for (ReconfigureListener listener : entry.getValue()) {
try {
- listener.update(conf, filteredMap);
+ listener.update(conf, filteredSet);
} catch (Throwable e) {
LOG.warn("Exception while updating config for {}",
changedProperties, e);
}
@@ -181,6 +184,6 @@ public class ReconfigurableRegistry {
* @param conf the rss conf
* @param changedProperties the changed properties
*/
- void update(RssConf conf, Map<String, Object> changedProperties);
+ void update(RssConf conf, Set<String> changedProperties);
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
index 0f8e101bb..ea117aa75 100644
---
a/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
+++
b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfOpsResource.java
@@ -17,17 +17,20 @@
package org.apache.uniffle.common.web.resource;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import javax.servlet.ServletContext;
-import org.apache.hbase.thirdparty.javax.ws.rs.DefaultValue;
-import org.apache.hbase.thirdparty.javax.ws.rs.FormParam;
+import org.apache.hbase.thirdparty.javax.ws.rs.Consumes;
import org.apache.hbase.thirdparty.javax.ws.rs.POST;
import org.apache.hbase.thirdparty.javax.ws.rs.Path;
import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
-import org.apache.logging.log4j.util.Strings;
+import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.ReconfigurableRegistry;
import org.apache.uniffle.common.config.RssConf;
@Path("/confOps")
@@ -41,24 +44,26 @@ public class ConfOpsResource {
@Authorization
@POST
@Path("/update")
- public String update(
- @FormParam("key") String key,
- @FormParam("value") String value,
- @FormParam("delete") @DefaultValue("false") boolean delete) {
- LOG.info("Dynamic updating {} to {}, delete={}", key, value, delete);
- if (Strings.isNotEmpty(key)) {
+ @Consumes(MediaType.APPLICATION_JSON)
+ public String update(ConfVO updateConfigs) {
+ LOG.info("Dynamic updating {}", updateConfigs);
+ String ret = "Nothing changed";
+ if (updateConfigs != null) {
RssConf conf = (RssConf)
servletContext.getAttribute(SERVLET_CONTEXT_ATTR_CONF);
if (conf != null) {
- if (delete) {
+ Set<String> changedProperties = new HashSet<>();
+ for (Map.Entry<String, String> entry :
updateConfigs.getUpdate().entrySet()) {
+ conf.setString(entry.getKey(), entry.getValue());
+ changedProperties.add(entry.getKey());
+ }
+ for (String key : updateConfigs.getDelete()) {
conf.remove(key);
- return String.format("Remove(%s) key: %s", WARNING_MSG, key);
- } else {
- String oldValue = conf.getString(key, null);
- conf.setString(key, value);
- return String.format("Set(%s) key: %s from %s to %s", WARNING_MSG,
key, oldValue, value);
+ changedProperties.add(key);
}
+ ReconfigurableRegistry.update(conf, changedProperties);
+ ret = WARNING_MSG + ": Update successfully";
}
}
- return "Nothing changed";
+ return ret;
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/web/resource/ConfVO.java
b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfVO.java
new file mode 100644
index 000000000..069c60818
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/web/resource/ConfVO.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.web.resource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ConfVO {
+ private Map<String, String> update = Collections.emptyMap();
+ private List<String> delete = Collections.emptyList();
+
+ public List<String> getDelete() {
+ return delete;
+ }
+
+ public void setDelete(List<String> delete) {
+ this.delete = delete;
+ }
+
+ public Map<String, String> getUpdate() {
+ return update;
+ }
+
+ public void setUpdate(Map<String, String> update) {
+ this.update = update;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfVO{" + "update=" + update + ", delete=" + delete + '}';
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
index b2a17d32c..3559d815a 100644
---
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableRegistryTest.java
@@ -18,10 +18,11 @@
package org.apache.uniffle.common;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import com.google.common.collect.Sets;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.config.RssConf;
@@ -32,6 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests enum type {@link ReconfigurableRegistry}. */
public class ReconfigurableRegistryTest {
+ @BeforeEach
+ public void before() {
+ ReconfigurableRegistry.clear();
+ }
+
@Test
public void testUpdate() {
ReconfigurableBad bad = new ReconfigurableBad();
@@ -41,18 +47,18 @@ public class ReconfigurableRegistryTest {
ReconfigurableRegistry.register(good0);
ReconfigurableRegistry.register(bad);
ReconfigurableRegistry.register(good1);
- ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ ReconfigurableRegistry.update(null, Collections.singleton("key"));
assertEquals(1, good0.mInvokeCount);
assertEquals(1, good1.mInvokeCount);
- ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ ReconfigurableRegistry.update(null, Collections.singleton("key"));
assertEquals(2, good0.mInvokeCount);
assertEquals(2, good1.mInvokeCount);
// remove bad guy
ReconfigurableRegistry.unregister(bad);
- ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ ReconfigurableRegistry.update(null, Collections.singleton("key"));
assertEquals(3, good0.mInvokeCount);
assertEquals(3, good1.mInvokeCount);
- ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ ReconfigurableRegistry.update(null, Collections.singleton("key"));
assertEquals(4, good0.mInvokeCount);
assertEquals(4, good1.mInvokeCount);
} finally {
@@ -80,7 +86,7 @@ public class ReconfigurableRegistryTest {
ReconfigurableRegistry.register(goodAny);
ReconfigurableRegistry.register("key2", good2);
ReconfigurableRegistry.register(Sets.newHashSet("key0", "key1"), good01);
- ReconfigurableRegistry.update(null, Collections.singletonMap("key",
"value"));
+ ReconfigurableRegistry.update(null, Collections.singleton("key"));
assertEquals(0, good0.mInvokeCount);
assertEquals(0, good1.mInvokeCount);
@@ -89,7 +95,7 @@ public class ReconfigurableRegistryTest {
assertEquals(0, good2.mInvokeCount);
assertEquals(0, good01.mInvokeCount);
- ReconfigurableRegistry.update(null, Collections.singletonMap("key1",
"value1"));
+ ReconfigurableRegistry.update(null, Collections.singleton("key1"));
assertEquals(0, good0.mInvokeCount);
assertEquals(1, good1.mInvokeCount);
@@ -98,10 +104,10 @@ public class ReconfigurableRegistryTest {
assertEquals(0, good2.mInvokeCount);
assertEquals(1, good01.mInvokeCount);
- Map<String, Object> changedProperties = new HashMap<>();
- changedProperties.put("key0", "value0");
- changedProperties.put("key1", "value1");
- changedProperties.put("key2", "value2");
+ Set<String> changedProperties = new HashSet<>();
+ changedProperties.add("key0");
+ changedProperties.add("key1");
+ changedProperties.add("key2");
ReconfigurableRegistry.update(null, changedProperties);
assertEquals(1, good0.mInvokeCount);
@@ -123,7 +129,7 @@ public class ReconfigurableRegistryTest {
class ReconfigurableBad implements
ReconfigurableRegistry.ReconfigureListener {
@Override
- public void update(RssConf conf, Map<String, Object> changedProperties) {
+ public void update(RssConf conf, Set<String> changedProperties) {
throw new RuntimeException("I am bad guy");
}
}
@@ -132,7 +138,7 @@ public class ReconfigurableRegistryTest {
int mInvokeCount = 0;
@Override
- public void update(RssConf rssConf, Map<String, Object> changedProperties)
{
+ public void update(RssConf rssConf, Set<String> changedProperties) {
mInvokeCount += changedProperties.size();
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index d0eecc4a7..33e08c474 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -101,12 +101,11 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
if (changedProperties == null || conf == null) {
return;
}
- if (changedProperties.containsKey(
- CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key())) {
+ if
(changedProperties.contains(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED.key()))
{
isRpcAuditLogEnabled =
conf.getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
}
- if (changedProperties.containsKey(
+ if (changedProperties.contains(
CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
rpcAuditExcludeOpList =
conf.get(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_EXCLUDE_LIST);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index d43d1e71f..8ed1fae73 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -130,10 +130,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
if (changedProperties == null) {
return;
}
- if
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
{
+ if
(changedProperties.contains(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
{
isRpcAuditLogEnabled =
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
}
- if (changedProperties.containsKey(
+ if (changedProperties.contains(
ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
rpcAuditExcludeOpList =
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 3c7bb736f..b352fdae9 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -102,10 +102,10 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
if (changedProperties == null) {
return;
}
- if
(changedProperties.containsKey(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
{
+ if
(changedProperties.contains(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key()))
{
isRpcAuditLogEnabled =
conf.getBoolean(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED);
}
- if (changedProperties.containsKey(
+ if (changedProperties.contains(
ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST.key())) {
rpcAuditExcludeOpList =
conf.get(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index d9bc2c51b..ab5824896 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -184,7 +184,7 @@ public class LocalStorageManager extends
SingleStorageManager {
if (changedProperties == null || rssConf == null) {
return;
}
- if (changedProperties.containsKey(
+ if (changedProperties.contains(
ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED.key())) {
isStorageAuditLogEnabled =
rssConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index 63c5c12dd..374c7a14f 100644
---
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -53,7 +53,6 @@ import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -366,8 +365,6 @@ public class LocalStorageManagerTest {
shuffleTaskInfos.put("app1", new ShuffleTaskInfo("app1"));
shuffleTaskInfos.put("app2", new ShuffleTaskInfo("app2"));
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
- TestLoggerExtension testLogger =
TestLoggerExtension.getTestLogger(context);
- assertFalse(testLogger.wasLogged("app"));
// test race condition case, app 3 is new app
shuffleTaskInfos.put("3", new ShuffleTaskInfo("app3"));
@@ -375,6 +372,8 @@ public class LocalStorageManagerTest {
when(mockLocalStorage.getAppIds()).thenReturn(Collections.singleton("app3"));
storages.add(mockLocalStorage);
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
+
+ TestLoggerExtension testLogger =
TestLoggerExtension.getTestLogger(context);
assertTrue(testLogger.wasLogged("Delete shuffle data for
appId\\[app3\\]"));
System.out.println();
}