This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5dd4eefa7 [#1711] feat(server): Introduce the reconfigurable conf
(#1712)
5dd4eefa7 is described below
commit 5dd4eefa7664fd7dd2bd433c48dde2b9f6d4e35f
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed May 22 15:55:22 2024 +0800
[#1711] feat(server): Introduce the reconfigurable conf (#1712)
### What changes were proposed in this pull request?
Introduce the reconfigurable conf for server or other componts.
### Why are the changes needed?
Fix: #1711
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
---
.../uniffle/common/ReconfigurableConfManager.java | 190 +++++++++++++++++++++
.../common/ReconfigurableConfManagerTest.java | 84 +++++++++
.../org/apache/uniffle/server/ShuffleServer.java | 3 +
.../server/buffer/ShuffleBufferManager.java | 13 +-
4 files changed, 284 insertions(+), 6 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
new file mode 100644
index 000000000..7365a0ef6
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/ReconfigurableConfManager.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.ConfigOption;
+import org.apache.uniffle.common.config.ConfigUtils;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+import static
org.apache.uniffle.common.config.RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC;
+
+public class ReconfigurableConfManager<T> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ReconfigurableConfManager.class);
+
+ private static ReconfigurableConfManager reconfigurableConfManager;
+
+ private RssConf rssConf;
+ private ScheduledExecutorService scheduledThreadPoolExecutor;
+ private List<ConfigOption<T>> updateConfOptions;
+
+ private long latestModificationTimestamp;
+
+ private ReconfigurableConfManager(RssConf rssConf, String rssConfFilePath,
Class confCls) {
+ Supplier<RssConf> confSupplier = getConfFromFile(rssConfFilePath, confCls);
+ initialize(rssConf, confSupplier);
+ }
+
+ private ReconfigurableConfManager(RssConf rssConf, Supplier<RssConf>
confSupplier) {
+ initialize(rssConf, confSupplier);
+ }
+
+ private void initialize(RssConf rssConf, Supplier<RssConf> confSupplier) {
+ this.rssConf = new RssConf(rssConf);
+ if (confSupplier != null) {
+ this.updateConfOptions = new ArrayList<>();
+ this.scheduledThreadPoolExecutor =
+
ThreadUtils.getDaemonSingleThreadScheduledExecutor("Refresh-rss-conf");
+ LOGGER.info("Starting scheduled reconfigurable conf checker...");
+ scheduledThreadPoolExecutor.scheduleAtFixedRate(
+ () -> {
+ try {
+ RssConf latestConf = confSupplier.get();
+ update(latestConf);
+ } catch (Exception e) {
+ LOGGER.error("Errors on refreshing the rss conf.", e);
+ }
+ },
+ 1,
+ rssConf.get(RSS_RECONFIGURE_INTERVAL_SEC),
+ TimeUnit.SECONDS);
+ }
+ }
+
+ private Supplier<RssConf> getConfFromFile(String rssConfFilePath, Class
confCls) {
+ return () -> {
+ File confFile = new File(rssConfFilePath);
+ if (confFile.exists() && confFile.isFile()) {
+ long lastModified = confFile.lastModified();
+ if (lastModified > latestModificationTimestamp) {
+ latestModificationTimestamp = lastModified;
+ RssBaseConf conf = new RssBaseConf();
+ conf.loadConfFromFile(rssConfFilePath,
ConfigUtils.getAllConfigOptions(confCls));
+ return conf;
+ }
+ }
+ LOGGER.info("Rss conf file:{} is invalid. Ignore updating.",
rssConfFilePath);
+ return null;
+ };
+ }
+
+ private void update(RssConf latestConf) {
+ if (latestConf == null) {
+ return;
+ }
+ for (ConfigOption<T> configOption : updateConfOptions) {
+ T val = latestConf.get(configOption);
+ if (!Objects.equals(val, rssConf.get(configOption))) {
+ LOGGER.info(
+ "Update the config option: {} from {} -> {}",
+ configOption.key(),
+ val,
+ rssConf.get(configOption));
+ rssConf.set(configOption, val);
+ }
+ }
+ }
+
+ private RssConf getConfRef() {
+ return rssConf;
+ }
+
+ private void registerInternal(ConfigOption<T> configOption) {
+ this.updateConfOptions.add(configOption);
+ }
+
+ public static void init(RssConf rssConf, String rssConfFilePath) {
+ ReconfigurableConfManager manager =
+ new ReconfigurableConfManager(rssConf, rssConfFilePath,
rssConf.getClass());
+ reconfigurableConfManager = manager;
+ }
+
+ @VisibleForTesting
+ protected static void initForTest(RssConf rssConf, Supplier<RssConf>
confSupplier) {
+ ReconfigurableConfManager manager = new ReconfigurableConfManager(rssConf,
confSupplier);
+ reconfigurableConfManager = manager;
+ }
+
+ public static <T> Reconfigurable<T> register(RssConf conf, ConfigOption<T>
configOption) {
+ if (reconfigurableConfManager == null) {
+ LOGGER.warn(
+ "{} is not initialized. The conf of [{}] will not be updated.",
+ ReconfigurableConfManager.class.getSimpleName(),
+ configOption.key());
+ return new FixedReconfigurable<>(conf, configOption);
+ }
+
+ reconfigurableConfManager.registerInternal(configOption);
+ Reconfigurable<T> reconfigurable =
+ new Reconfigurable<T>(reconfigurableConfManager, configOption);
+ return reconfigurable;
+ }
+
+ public static class FixedReconfigurable<T> extends Reconfigurable<T> {
+ RssConf conf;
+ ConfigOption<T> option;
+
+ FixedReconfigurable(RssConf conf, ConfigOption<T> option) {
+ this.conf = conf;
+ this.option = option;
+ }
+
+ @Override
+ public T get() {
+ return conf.get(option);
+ }
+
+ @Override
+ public long getSizeAsBytes() {
+ return conf.getSizeAsBytes((ConfigOption<Long>) option);
+ }
+ }
+
+ public static class Reconfigurable<T> {
+ ReconfigurableConfManager reconfigurableConfManager;
+ ConfigOption<T> option;
+
+ Reconfigurable() {}
+
+ Reconfigurable(ReconfigurableConfManager reconfigurableConfManager,
ConfigOption<T> option) {
+ this.reconfigurableConfManager = reconfigurableConfManager;
+ this.option = option;
+ }
+
+ public T get() {
+ return reconfigurableConfManager.getConfRef().get(option);
+ }
+
+ public long getSizeAsBytes() {
+ return
reconfigurableConfManager.getConfRef().getSizeAsBytes((ConfigOption<Long>)
option);
+ }
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
new file mode 100644
index 000000000..ff9363f33
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/ReconfigurableConfManagerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssBaseConf.JETTY_HTTP_PORT;
+import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
+import static
org.apache.uniffle.common.config.RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC;
+import static
org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ReconfigurableConfManagerTest {
+
+ @Test
+ public void test() throws InterruptedException {
+ AtomicInteger i = new AtomicInteger(0);
+ Supplier<RssConf> supplier =
+ () -> {
+ if (i.getAndIncrement() <= 1) {
+ return new RssConf();
+ }
+ RssConf conf = new RssConf();
+ conf.set(JETTY_HTTP_PORT, 100);
+ conf.set(RPC_SERVER_PORT, 200);
+ conf.set(RSS_STORAGE_BASE_PATH, Arrays.asList("/d1"));
+ return conf;
+ };
+
+ RssConf base = new RssConf();
+ base.set(RSS_RECONFIGURE_INTERVAL_SEC, 1L);
+ ReconfigurableConfManager.initForTest(base, supplier);
+
+ ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
+ ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+ ReconfigurableConfManager.Reconfigurable<Integer> rpcReconfigurable =
+ ReconfigurableConfManager.register(base, RPC_SERVER_PORT);
+ ReconfigurableConfManager.Reconfigurable<List<String>> typeReconfigurable =
+ ReconfigurableConfManager.register(base, RSS_STORAGE_BASE_PATH);
+ assertEquals(19998, portReconfigurable.get());
+ assertEquals(19999, rpcReconfigurable.get());
+ assertNull(typeReconfigurable.get());
+
+ Awaitility.await()
+ .timeout(5, TimeUnit.SECONDS)
+ .until(() -> portReconfigurable.get().equals(100));
+ assertEquals(200, rpcReconfigurable.get());
+ assertEquals(Arrays.asList("/d1"), typeReconfigurable.get());
+ }
+
+ @Test
+ public void testWithoutInitialization() {
+ RssConf base = new RssConf();
+ base.set(JETTY_HTTP_PORT, 100);
+ ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
+ ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
+ assertEquals(100, portReconfigurable.get());
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index ae37393ef..de874816a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import org.apache.uniffle.common.Arguments;
+import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
@@ -122,6 +123,8 @@ public class ShuffleServer {
LOG.info("Start to init shuffle server using config {}", configFile);
ShuffleServerConf shuffleServerConf = new ShuffleServerConf(configFile);
+ ReconfigurableConfManager.init(shuffleServerConf, configFile);
+
final ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf);
shuffleServer.start();
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index fd77a04cf..79d619fcd 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -39,6 +39,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.rpc.StatusCode;
@@ -69,7 +70,7 @@ public class ShuffleBufferManager {
// reduce small I/Os to persistent storage, especially for local HDDs.
private long shuffleFlushThreshold;
// Huge partition vars
- private long hugePartitionSizeThreshold;
+ private ReconfigurableConfManager.Reconfigurable<Long>
hugePartitionSizeThresholdRef;
private long hugePartitionMemoryLimitSize;
protected long bufferSize = 0;
@@ -126,8 +127,8 @@ public class ShuffleBufferManager {
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
this.shuffleFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
- this.hugePartitionSizeThreshold =
- conf.getSizeAsBytes(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
+ this.hugePartitionSizeThresholdRef =
+ ReconfigurableConfManager.register(conf,
ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
this.hugePartitionMemoryLimitSize =
Math.round(
capacity *
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
@@ -702,16 +703,16 @@ public class ShuffleBufferManager {
boolean isHugePartition(String appId, int shuffleId, int partitionId) {
return shuffleTaskManager != null
&& shuffleTaskManager.getPartitionDataSize(appId, shuffleId,
partitionId)
- > hugePartitionSizeThreshold;
+ > hugePartitionSizeThresholdRef.getSizeAsBytes();
}
public boolean isHugePartition(long usedPartitionDataSize) {
- return usedPartitionDataSize > hugePartitionSizeThreshold;
+ return usedPartitionDataSize >
hugePartitionSizeThresholdRef.getSizeAsBytes();
}
public boolean limitHugePartition(
String appId, int shuffleId, int partitionId, long
usedPartitionDataSize) {
- if (usedPartitionDataSize > hugePartitionSizeThreshold) {
+ if (usedPartitionDataSize >
hugePartitionSizeThresholdRef.getSizeAsBytes()) {
long memoryUsed = getShuffleBufferEntry(appId, shuffleId,
partitionId).getValue().getSize();
if (memoryUsed > hugePartitionMemoryLimitSize) {
LOG.warn(