This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 414d5cf51 [#1711] feat(server): Make the health checker execution 
timeout reconfigurable (#1754)
414d5cf51 is described below

commit 414d5cf51848b67b0a0e0fc23df19d67a5741f1b
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jun 3 11:30:39 2024 +0800

    [#1711] feat(server): Make the health checker execution timeout 
reconfigurable (#1754)
    
    ### What changes were proposed in this pull request?
    
    1. Make the health checker execution timeout reconfigurable
    2. Introduce the `rssConf.getReconfigurableConf` to use the reconfigurable 
conf.
    
    ### Why are the changes needed?
    
    For #1711
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../java/org/apache/uniffle/common/ReconfigurableConfManager.java | 4 ++++
 .../src/main/java/org/apache/uniffle/common/config/RssConf.java   | 6 ++++++
 .../org/apache/uniffle/common/ReconfigurableConfManagerTest.java  | 8 ++++----
 .../java/org/apache/uniffle/coordinator/SimpleClusterManager.java | 2 +-
 .../coordinator/access/checker/AccessClusterLoadChecker.java      | 2 +-
 .../apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java    | 3 +--
 .../main/java/org/apache/uniffle/server/LocalStorageChecker.java  | 7 ++++---
 .../org/apache/uniffle/server/buffer/ShuffleBufferManager.java    | 2 +-
 8 files changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
index 9a4cdd6b2..3a69cd53e 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -138,6 +138,10 @@ public class ReconfigurableConfManager<T> {
     reconfigurableConfManager = manager;
   }
 
+  /**
+   * This should not be invoked directly when getting the reconfigurable conf. 
Please using the
+   * `rssConf.getReconfigurableConf(configOption)`
+   */
   public static <T> Reconfigurable<T> register(RssConf conf, ConfigOption<T> 
configOption) {
     if (reconfigurableConfManager == null) {
       LOGGER.warn(
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index ac5424e01..21808f420 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
+import org.apache.uniffle.common.ReconfigurableConfManager;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.UnitConverter;
 
@@ -427,6 +428,11 @@ public class RssConf implements Cloneable {
     return getOptional(configOption).orElseGet(configOption::defaultValue);
   }
 
+  public <T> ReconfigurableConfManager.Reconfigurable<T> getReconfigurableConf(
+      ConfigOption<T> configOption) {
+    return ReconfigurableConfManager.register(this, configOption);
+  }
+
   /**
    * Adds the given key/value pair to the configuration object.
    *
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
index ff9363f33..a43a63ed1 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
@@ -57,11 +57,11 @@ public class ReconfigurableConfManagerTest {
     ReconfigurableConfManager.initForTest(base, supplier);
 
     ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
-        ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+        base.getReconfigurableConf(JETTY_HTTP_PORT);
     ReconfigurableConfManager.Reconfigurable<Integer> rpcReconfigurable =
-        ReconfigurableConfManager.register(base, RPC_SERVER_PORT);
+        base.getReconfigurableConf(RPC_SERVER_PORT);
     ReconfigurableConfManager.Reconfigurable<List<String>> typeReconfigurable =
-        ReconfigurableConfManager.register(base, RSS_STORAGE_BASE_PATH);
+        base.getReconfigurableConf(RSS_STORAGE_BASE_PATH);
     assertEquals(19998, portReconfigurable.get());
     assertEquals(19999, rpcReconfigurable.get());
     assertNull(typeReconfigurable.get());
@@ -78,7 +78,7 @@ public class ReconfigurableConfManagerTest {
     RssConf base = new RssConf();
     base.set(JETTY_HTTP_PORT, 100);
     ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
-        ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+        base.getReconfigurableConf(JETTY_HTTP_PORT);
     assertEquals(100, portReconfigurable.get());
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 35af94e33..28a9a1189 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -87,7 +87,7 @@ public class SimpleClusterManager implements ClusterManager {
 
   public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) 
throws Exception {
     this.shuffleNodesMax =
-        ReconfigurableConfManager.register(conf, 
CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+        
conf.getReconfigurableConf(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
     this.heartbeatTimeout = 
conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
     // the thread for checking if shuffle server report heartbeat in time
     scheduledExecutorService =
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
index 971f2d380..7511bfb97 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
@@ -61,7 +61,7 @@ public class AccessClusterLoadChecker extends 
AbstractAccessChecker {
     this.availableServerNumThreshold =
         
conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD,
 -1);
     this.defaultRequiredShuffleServerNumber =
-        ReconfigurableConfManager.register(conf, 
CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+        
conf.getReconfigurableConf(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
   }
 
   @Override
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
index 09140d143..4eb10caa0 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
@@ -99,8 +99,7 @@ public class CoordinatorReconfigureNodeMaxTest extends 
CoordinatorTestBase {
   public void testReconfigureNodeMax() throws Exception {
     // case1: check the initial node max val = 10
     ReconfigurableConfManager.Reconfigurable<Integer> nodeMax =
-        ReconfigurableConfManager.register(
-            new RssConf(), CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+        new 
RssConf().getReconfigurableConf(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
     assertEquals(DEFAULT_SHUFFLE_NODES_MAX, nodeMax.get());
 
     ShuffleWriteClientImpl shuffleWriteClient =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java 
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index c15f2e038..10b12e74c 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -41,6 +41,7 @@ import org.apache.commons.lang3.RandomUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ReconfigurableConfManager;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.future.CompletableFutureExtension;
 import org.apache.uniffle.common.util.RssUtils;
@@ -60,7 +61,7 @@ public class LocalStorageChecker extends Checker {
   protected List<StorageInfo> storageInfos = Lists.newArrayList();
   private boolean isHealthy = true;
   private ExecutorService workers;
-  private final long diskCheckerExecutionTimeoutMs;
+  private ReconfigurableConfManager.Reconfigurable<Long> 
diskCheckerExecutionTimeoutMs;
 
   public LocalStorageChecker(ShuffleServerConf conf, List<LocalStorage> 
storages) {
     super(conf);
@@ -85,7 +86,7 @@ public class LocalStorageChecker extends Checker {
         conf.getDouble(ShuffleServerConf.HEALTH_MIN_STORAGE_PERCENTAGE);
 
     this.diskCheckerExecutionTimeoutMs =
-        
conf.getLong(ShuffleServerConf.HEALTH_CHECKER_LOCAL_STORAGE_EXECUTE_TIMEOUT);
+        
conf.getReconfigurableConf(ShuffleServerConf.HEALTH_CHECKER_LOCAL_STORAGE_EXECUTE_TIMEOUT);
     this.workers = Executors.newFixedThreadPool(basePaths.size());
   }
 
@@ -136,7 +137,7 @@ public class LocalStorageChecker extends Checker {
       futureMap.put(
           storageInfo,
           CompletableFutureExtension.orTimeout(
-              storageCheckFuture, diskCheckerExecutionTimeoutMs, 
TimeUnit.MILLISECONDS));
+              storageCheckFuture, diskCheckerExecutionTimeoutMs.get(), 
TimeUnit.MILLISECONDS));
     }
 
     for (Map.Entry<StorageInfo, CompletableFuture<Void>> entry : 
futureMap.entrySet()) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 257af9319..fd78d898d 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -128,7 +128,7 @@ public class ShuffleBufferManager {
     this.shuffleFlushThreshold =
         conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
     this.hugePartitionSizeThresholdRef =
-        ReconfigurableConfManager.register(conf, 
ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
+        
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
     this.hugePartitionMemoryLimitSize =
         Math.round(
             capacity * 
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));

Reply via email to