This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 414d5cf51 [#1711] feat(server): Make the health checker execution
timeout reconfigurable (#1754)
414d5cf51 is described below
commit 414d5cf51848b67b0a0e0fc23df19d67a5741f1b
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jun 3 11:30:39 2024 +0800
[#1711] feat(server): Make the health checker execution timeout
reconfigurable (#1754)
### What changes were proposed in this pull request?
1. Make the health checker execution timeout reconfigurable
2. Introduce the `rssConf.getReconfigurableConf` to use the reconfigurable
conf.
### Why are the changes needed?
For #1711
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../java/org/apache/uniffle/common/ReconfigurableConfManager.java | 4 ++++
.../src/main/java/org/apache/uniffle/common/config/RssConf.java | 6 ++++++
.../org/apache/uniffle/common/ReconfigurableConfManagerTest.java | 8 ++++----
.../java/org/apache/uniffle/coordinator/SimpleClusterManager.java | 2 +-
.../coordinator/access/checker/AccessClusterLoadChecker.java | 2 +-
.../apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java | 3 +--
.../main/java/org/apache/uniffle/server/LocalStorageChecker.java | 7 ++++---
.../org/apache/uniffle/server/buffer/ShuffleBufferManager.java | 2 +-
8 files changed, 22 insertions(+), 12 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
index 9a4cdd6b2..3a69cd53e 100644
---
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
+++
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -138,6 +138,10 @@ public class ReconfigurableConfManager<T> {
reconfigurableConfManager = manager;
}
+ /**
+ * This should not be invoked directly when getting the reconfigurable conf.
Please using the
+ * `rssConf.getReconfigurableConf(configOption)`
+ */
public static <T> Reconfigurable<T> register(RssConf conf, ConfigOption<T>
configOption) {
if (reconfigurableConfManager == null) {
LOGGER.warn(
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index ac5424e01..21808f420 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
+import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.UnitConverter;
@@ -427,6 +428,11 @@ public class RssConf implements Cloneable {
return getOptional(configOption).orElseGet(configOption::defaultValue);
}
+ public <T> ReconfigurableConfManager.Reconfigurable<T> getReconfigurableConf(
+ ConfigOption<T> configOption) {
+ return ReconfigurableConfManager.register(this, configOption);
+ }
+
/**
* Adds the given key/value pair to the configuration object.
*
diff --git
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
index ff9363f33..a43a63ed1 100644
---
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
@@ -57,11 +57,11 @@ public class ReconfigurableConfManagerTest {
ReconfigurableConfManager.initForTest(base, supplier);
ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
- ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+ base.getReconfigurableConf(JETTY_HTTP_PORT);
ReconfigurableConfManager.Reconfigurable<Integer> rpcReconfigurable =
- ReconfigurableConfManager.register(base, RPC_SERVER_PORT);
+ base.getReconfigurableConf(RPC_SERVER_PORT);
ReconfigurableConfManager.Reconfigurable<List<String>> typeReconfigurable =
- ReconfigurableConfManager.register(base, RSS_STORAGE_BASE_PATH);
+ base.getReconfigurableConf(RSS_STORAGE_BASE_PATH);
assertEquals(19998, portReconfigurable.get());
assertEquals(19999, rpcReconfigurable.get());
assertNull(typeReconfigurable.get());
@@ -78,7 +78,7 @@ public class ReconfigurableConfManagerTest {
RssConf base = new RssConf();
base.set(JETTY_HTTP_PORT, 100);
ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
- ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+ base.getReconfigurableConf(JETTY_HTTP_PORT);
assertEquals(100, portReconfigurable.get());
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 35af94e33..28a9a1189 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -87,7 +87,7 @@ public class SimpleClusterManager implements ClusterManager {
public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf)
throws Exception {
this.shuffleNodesMax =
- ReconfigurableConfManager.register(conf,
CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+
conf.getReconfigurableConf(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.heartbeatTimeout =
conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
// the thread for checking if shuffle server report heartbeat in time
scheduledExecutorService =
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
index 971f2d380..7511bfb97 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
@@ -61,7 +61,7 @@ public class AccessClusterLoadChecker extends
AbstractAccessChecker {
this.availableServerNumThreshold =
conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD,
-1);
this.defaultRequiredShuffleServerNumber =
- ReconfigurableConfManager.register(conf,
CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+
conf.getReconfigurableConf(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
}
@Override
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
index 09140d143..4eb10caa0 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
@@ -99,8 +99,7 @@ public class CoordinatorReconfigureNodeMaxTest extends
CoordinatorTestBase {
public void testReconfigureNodeMax() throws Exception {
// case1: check the initial node max val = 10
ReconfigurableConfManager.Reconfigurable<Integer> nodeMax =
- ReconfigurableConfManager.register(
- new RssConf(), CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+ new
RssConf().getReconfigurableConf(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
assertEquals(DEFAULT_SHUFFLE_NODES_MAX, nodeMax.get());
ShuffleWriteClientImpl shuffleWriteClient =
diff --git
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index c15f2e038..10b12e74c 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -41,6 +41,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.future.CompletableFutureExtension;
import org.apache.uniffle.common.util.RssUtils;
@@ -60,7 +61,7 @@ public class LocalStorageChecker extends Checker {
protected List<StorageInfo> storageInfos = Lists.newArrayList();
private boolean isHealthy = true;
private ExecutorService workers;
- private final long diskCheckerExecutionTimeoutMs;
+ private ReconfigurableConfManager.Reconfigurable<Long>
diskCheckerExecutionTimeoutMs;
public LocalStorageChecker(ShuffleServerConf conf, List<LocalStorage>
storages) {
super(conf);
@@ -85,7 +86,7 @@ public class LocalStorageChecker extends Checker {
conf.getDouble(ShuffleServerConf.HEALTH_MIN_STORAGE_PERCENTAGE);
this.diskCheckerExecutionTimeoutMs =
-
conf.getLong(ShuffleServerConf.HEALTH_CHECKER_LOCAL_STORAGE_EXECUTE_TIMEOUT);
+
conf.getReconfigurableConf(ShuffleServerConf.HEALTH_CHECKER_LOCAL_STORAGE_EXECUTE_TIMEOUT);
this.workers = Executors.newFixedThreadPool(basePaths.size());
}
@@ -136,7 +137,7 @@ public class LocalStorageChecker extends Checker {
futureMap.put(
storageInfo,
CompletableFutureExtension.orTimeout(
- storageCheckFuture, diskCheckerExecutionTimeoutMs,
TimeUnit.MILLISECONDS));
+ storageCheckFuture, diskCheckerExecutionTimeoutMs.get(),
TimeUnit.MILLISECONDS));
}
for (Map.Entry<StorageInfo, CompletableFuture<Void>> entry :
futureMap.entrySet()) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 257af9319..fd78d898d 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -128,7 +128,7 @@ public class ShuffleBufferManager {
this.shuffleFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
this.hugePartitionSizeThresholdRef =
- ReconfigurableConfManager.register(conf,
ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
+
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
this.hugePartitionMemoryLimitSize =
Math.round(
capacity *
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));