This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dd4eefa7 [#1711] feat(server): Introduce the reconfigurable conf 
(#1712)
5dd4eefa7 is described below

commit 5dd4eefa7664fd7dd2bd433c48dde2b9f6d4e35f
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed May 22 15:55:22 2024 +0800

    [#1711] feat(server): Introduce the reconfigurable conf (#1712)
    
    ### What changes were proposed in this pull request?
    
    Introduce the reconfigurable conf for server or other componts.
    
    ### Why are the changes needed?
    
    Fix: #1711
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests.
---
 .../uniffle/common/ReconfigurableConfManager.java  | 190 +++++++++++++++++++++
 .../common/ReconfigurableConfManagerTest.java      |  84 +++++++++
 .../org/apache/uniffle/server/ShuffleServer.java   |   3 +
 .../server/buffer/ShuffleBufferManager.java        |  13 +-
 4 files changed, 284 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
new file mode 100644
index 000000000..7365a0ef6
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.ConfigOption;
+import org.apache.uniffle.common.config.ConfigUtils;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC;
+
+public class ReconfigurableConfManager<T> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ReconfigurableConfManager.class);
+
+  private static ReconfigurableConfManager reconfigurableConfManager;
+
+  private RssConf rssConf;
+  private ScheduledExecutorService scheduledThreadPoolExecutor;
+  private List<ConfigOption<T>> updateConfOptions;
+
+  private long latestModificationTimestamp;
+
+  private ReconfigurableConfManager(RssConf rssConf, String rssConfFilePath, 
Class confCls) {
+    Supplier<RssConf> confSupplier = getConfFromFile(rssConfFilePath, confCls);
+    initialize(rssConf, confSupplier);
+  }
+
+  private ReconfigurableConfManager(RssConf rssConf, Supplier<RssConf> 
confSupplier) {
+    initialize(rssConf, confSupplier);
+  }
+
+  private void initialize(RssConf rssConf, Supplier<RssConf> confSupplier) {
+    this.rssConf = new RssConf(rssConf);
+    if (confSupplier != null) {
+      this.updateConfOptions = new ArrayList<>();
+      this.scheduledThreadPoolExecutor =
+          
ThreadUtils.getDaemonSingleThreadScheduledExecutor("Refresh-rss-conf");
+      LOGGER.info("Starting scheduled reconfigurable conf checker...");
+      scheduledThreadPoolExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              RssConf latestConf = confSupplier.get();
+              update(latestConf);
+            } catch (Exception e) {
+              LOGGER.error("Errors on refreshing the rss conf.", e);
+            }
+          },
+          1,
+          rssConf.get(RSS_RECONFIGURE_INTERVAL_SEC),
+          TimeUnit.SECONDS);
+    }
+  }
+
+  private Supplier<RssConf> getConfFromFile(String rssConfFilePath, Class 
confCls) {
+    return () -> {
+      File confFile = new File(rssConfFilePath);
+      if (confFile.exists() && confFile.isFile()) {
+        long lastModified = confFile.lastModified();
+        if (lastModified > latestModificationTimestamp) {
+          latestModificationTimestamp = lastModified;
+          RssBaseConf conf = new RssBaseConf();
+          conf.loadConfFromFile(rssConfFilePath, 
ConfigUtils.getAllConfigOptions(confCls));
+          return conf;
+        }
+      }
+      LOGGER.info("Rss conf file:{} is invalid. Ignore updating.", 
rssConfFilePath);
+      return null;
+    };
+  }
+
+  private void update(RssConf latestConf) {
+    if (latestConf == null) {
+      return;
+    }
+    for (ConfigOption<T> configOption : updateConfOptions) {
+      T val = latestConf.get(configOption);
+      if (!Objects.equals(val, rssConf.get(configOption))) {
+        LOGGER.info(
+            "Update the config option: {} from {} -> {}",
+            configOption.key(),
+            val,
+            rssConf.get(configOption));
+        rssConf.set(configOption, val);
+      }
+    }
+  }
+
+  private RssConf getConfRef() {
+    return rssConf;
+  }
+
+  private void registerInternal(ConfigOption<T> configOption) {
+    this.updateConfOptions.add(configOption);
+  }
+
+  public static void init(RssConf rssConf, String rssConfFilePath) {
+    ReconfigurableConfManager manager =
+        new ReconfigurableConfManager(rssConf, rssConfFilePath, 
rssConf.getClass());
+    reconfigurableConfManager = manager;
+  }
+
+  @VisibleForTesting
+  protected static void initForTest(RssConf rssConf, Supplier<RssConf> 
confSupplier) {
+    ReconfigurableConfManager manager = new ReconfigurableConfManager(rssConf, 
confSupplier);
+    reconfigurableConfManager = manager;
+  }
+
+  public static <T> Reconfigurable<T> register(RssConf conf, ConfigOption<T> 
configOption) {
+    if (reconfigurableConfManager == null) {
+      LOGGER.warn(
+          "{} is not initialized. The conf of [{}] will not be updated.",
+          ReconfigurableConfManager.class.getSimpleName(),
+          configOption.key());
+      return new FixedReconfigurable<>(conf, configOption);
+    }
+
+    reconfigurableConfManager.registerInternal(configOption);
+    Reconfigurable<T> reconfigurable =
+        new Reconfigurable<T>(reconfigurableConfManager, configOption);
+    return reconfigurable;
+  }
+
+  public static class FixedReconfigurable<T> extends Reconfigurable<T> {
+    RssConf conf;
+    ConfigOption<T> option;
+
+    FixedReconfigurable(RssConf conf, ConfigOption<T> option) {
+      this.conf = conf;
+      this.option = option;
+    }
+
+    @Override
+    public T get() {
+      return conf.get(option);
+    }
+
+    @Override
+    public long getSizeAsBytes() {
+      return conf.getSizeAsBytes((ConfigOption<Long>) option);
+    }
+  }
+
+  public static class Reconfigurable<T> {
+    ReconfigurableConfManager reconfigurableConfManager;
+    ConfigOption<T> option;
+
+    Reconfigurable() {}
+
+    Reconfigurable(ReconfigurableConfManager reconfigurableConfManager, 
ConfigOption<T> option) {
+      this.reconfigurableConfManager = reconfigurableConfManager;
+      this.option = option;
+    }
+
+    public T get() {
+      return reconfigurableConfManager.getConfRef().get(option);
+    }
+
+    public long getSizeAsBytes() {
+      return 
reconfigurableConfManager.getConfRef().getSizeAsBytes((ConfigOption<Long>) 
option);
+    }
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
new file mode 100644
index 000000000..ff9363f33
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssBaseConf.JETTY_HTTP_PORT;
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC;
+import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ReconfigurableConfManagerTest {
+
+  @Test
+  public void test() throws InterruptedException {
+    AtomicInteger i = new AtomicInteger(0);
+    Supplier<RssConf> supplier =
+        () -> {
+          if (i.getAndIncrement() <= 1) {
+            return new RssConf();
+          }
+          RssConf conf = new RssConf();
+          conf.set(JETTY_HTTP_PORT, 100);
+          conf.set(RPC_SERVER_PORT, 200);
+          conf.set(RSS_STORAGE_BASE_PATH, Arrays.asList("/d1"));
+          return conf;
+        };
+
+    RssConf base = new RssConf();
+    base.set(RSS_RECONFIGURE_INTERVAL_SEC, 1L);
+    ReconfigurableConfManager.initForTest(base, supplier);
+
+    ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
+        ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+    ReconfigurableConfManager.Reconfigurable<Integer> rpcReconfigurable =
+        ReconfigurableConfManager.register(base, RPC_SERVER_PORT);
+    ReconfigurableConfManager.Reconfigurable<List<String>> typeReconfigurable =
+        ReconfigurableConfManager.register(base, RSS_STORAGE_BASE_PATH);
+    assertEquals(19998, portReconfigurable.get());
+    assertEquals(19999, rpcReconfigurable.get());
+    assertNull(typeReconfigurable.get());
+
+    Awaitility.await()
+        .timeout(5, TimeUnit.SECONDS)
+        .until(() -> portReconfigurable.get().equals(100));
+    assertEquals(200, rpcReconfigurable.get());
+    assertEquals(Arrays.asList("/d1"), typeReconfigurable.get());
+  }
+
+  @Test
+  public void testWithoutInitialization() {
+    RssConf base = new RssConf();
+    base.set(JETTY_HTTP_PORT, 100);
+    ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
+        ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+    assertEquals(100, portReconfigurable.get());
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index ae37393ef..de874816a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import picocli.CommandLine;
 
 import org.apache.uniffle.common.Arguments;
+import org.apache.uniffle.common.ReconfigurableConfManager;
 import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.InvalidRequestException;
@@ -122,6 +123,8 @@ public class ShuffleServer {
     LOG.info("Start to init shuffle server using config {}", configFile);
 
     ShuffleServerConf shuffleServerConf = new ShuffleServerConf(configFile);
+    ReconfigurableConfManager.init(shuffleServerConf, configFile);
+
     final ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf);
     shuffleServer.start();
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index fd77a04cf..79d619fcd 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -39,6 +39,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ReconfigurableConfManager;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.rpc.StatusCode;
@@ -69,7 +70,7 @@ public class ShuffleBufferManager {
   // reduce small I/Os to persistent storage, especially for local HDDs.
   private long shuffleFlushThreshold;
   // Huge partition vars
-  private long hugePartitionSizeThreshold;
+  private ReconfigurableConfManager.Reconfigurable<Long> 
hugePartitionSizeThresholdRef;
   private long hugePartitionMemoryLimitSize;
 
   protected long bufferSize = 0;
@@ -126,8 +127,8 @@ public class ShuffleBufferManager {
         conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
     this.shuffleFlushThreshold =
         conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
-    this.hugePartitionSizeThreshold =
-        conf.getSizeAsBytes(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
+    this.hugePartitionSizeThresholdRef =
+        ReconfigurableConfManager.register(conf, 
ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
     this.hugePartitionMemoryLimitSize =
         Math.round(
             capacity * 
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
@@ -702,16 +703,16 @@ public class ShuffleBufferManager {
   boolean isHugePartition(String appId, int shuffleId, int partitionId) {
     return shuffleTaskManager != null
         && shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 
partitionId)
-            > hugePartitionSizeThreshold;
+            > hugePartitionSizeThresholdRef.getSizeAsBytes();
   }
 
   public boolean isHugePartition(long usedPartitionDataSize) {
-    return usedPartitionDataSize > hugePartitionSizeThreshold;
+    return usedPartitionDataSize > 
hugePartitionSizeThresholdRef.getSizeAsBytes();
   }
 
   public boolean limitHugePartition(
       String appId, int shuffleId, int partitionId, long 
usedPartitionDataSize) {
-    if (usedPartitionDataSize > hugePartitionSizeThreshold) {
+    if (usedPartitionDataSize > 
hugePartitionSizeThresholdRef.getSizeAsBytes()) {
       long memoryUsed = getShuffleBufferEntry(appId, shuffleId, 
partitionId).getValue().getSize();
       if (memoryUsed > hugePartitionMemoryLimitSize) {
         LOG.warn(

Reply via email to