This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ad57c8b91 [CELEBORN-1052] Introduce dynamic ConfigService at
SystemLevel and TenantLevel
ad57c8b91 is described below
commit ad57c8b91ed8cdbaf9d7371f0d93ad75e861f14f
Author: Shuang <[email protected]>
AuthorDate: Mon Nov 27 12:17:05 2023 +0800
[CELEBORN-1052] Introduce dynamic ConfigService at SystemLevel and
TenantLevel
### What changes were proposed in this pull request?
This PR introduce dynamic ConfigService at SystemLevel and TenantLevel,
Dynamic configuration is a type of configuration that can be changed at runtime
as needed. It can be used at system level/tenant level. When applying dynamic
configuration, the priority order is as follows: tenant level overrides system
level, which in turn overrides static configuration(CelebornConf). This means
that if a configuration is defined at the tenant level, it will be used instead
of the system level or [...]
the system-level configuration will be used. If the system-level
configuration is also missing, CelebornConf
will be used as the default value.
There are several other tasks related to this feature that will be
implemented in the future.
- [ ] [Add isDynamic property for
CelebornConf](https://issues.apache.org/jira/browse/CELEBORN-1051)
- [ ] [Support DB based
Configserver](https://issues.apache.org/jira/browse/CELEBORN-1054)
- [ ] [Add restAPI for configuration
management](https://issues.apache.org/jira/browse/CELEBORN-1056)
### Why are the changes needed?
The current configuration of the server (CelebornConf) is static. When the
configuration is changed, the service needs to be restarted. This PR introduces
a dynamic configuration solution. The server side can use dynamic configuration
as needed. At the same time, it is considered that the tenant level will be
supported in the future (such as supporting tenant level dynamic quota control)
configuration, so this time we will also consider supporting dynamic
tenant-level configuration, a [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #2100 from RexXiong/CELEBORN-1052.
Authored-by: Shuang <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 20 ++++
conf/dynamicConfig.yaml.template | 27 +++++
docs/configuration/master.md | 2 +
docs/configuration/worker.md | 2 +
.../server/common/service/config/ConfigLevel.java | 23 ++++
.../common/service/config/ConfigService.java | 39 +++++++
.../common/service/config/DynamicConfig.java | 117 +++++++++++++++++++
.../config/DynamicConfigServiceFactory.java | 32 ++++++
.../common/service/config/FsConfigServiceImpl.java | 127 +++++++++++++++++++++
.../server/common/service/config/SystemConfig.java | 52 +++++++++
.../server/common/service/config/TenantConfig.java | 40 +++++++
service/src/test/resources/dynamicConfig.yaml | 36 ++++++
service/src/test/resources/dynamicConfig_2.yaml | 20 ++++
.../common/service/config/ConfigServiceSuiteJ.java | 123 ++++++++++++++++++++
14 files changed, 660 insertions(+)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5b170eb30..247727df2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -366,6 +366,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
}
}
+ def dynamicConfigStoreBackend: String = get(DYNAMIC_CONFIG_STORE_BACKEND)
+ def dynamicConfigRefreshTime: Long = get(DYNAMIC_CONFIG_REFRESH_TIME)
+
// //////////////////////////////////////////////////////
// Network //
// //////////////////////////////////////////////////////
@@ -4062,4 +4065,21 @@ object CelebornConf extends Logging {
.doc("Kerberos keytab file path for HDFS storage connection.")
.stringConf
.createOptional
+
+ val DYNAMIC_CONFIG_STORE_BACKEND: ConfigEntry[String] =
+ buildConf("celeborn.dynamicConfig.store.backend")
+ .categories("master", "worker")
+ .doc("Store backend for dynamic config, NONE means disabling dynamic
config store")
+ .version("0.4.0")
+ .stringConf
+ .checkValues(Set("FS", "NONE"))
+ .createWithDefault("NONE")
+
+ val DYNAMIC_CONFIG_REFRESH_TIME: ConfigEntry[Long] =
+ buildConf("celeborn.dynamicConfig.refresh.time")
+ .categories("master", "worker")
+ .version("0.4.0")
+ .doc("The time interval for refreshing the corresponding dynamic config
periodically")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("120s")
}
diff --git a/conf/dynamicConfig.yaml.template b/conf/dynamicConfig.yaml.template
new file mode 100644
index 000000000..87789754b
--- /dev/null
+++ b/conf/dynamicConfig.yaml.template
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- level: SYSTEM
+ config:
+ celeborn.worker.directMemoryRatioToPauseReceive: 0.75
+
+
+- tenantId: tenant_id
+ level: TENANT
+ config:
+
+
+
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index c21a76edd..025e9c15b 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -19,6 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
+| celeborn.dynamicConfig.refresh.time | 120s | The time interval for
refreshing the corresponding dynamic config periodically | 0.4.0 |
+| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic
config, NONE means disabling dynamic config store | 0.4.0 |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial
partition size for estimation, it will change according to runtime stats. |
0.3.0 |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial
delay time before start updating partition size for estimation. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of
updating partition size for estimation. | 0.3.0 |
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 1570d1249..a40cc9c54 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -19,6 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
+| celeborn.dynamicConfig.refresh.time | 120s | The time interval for
refreshing the corresponding dynamic config periodically | 0.4.0 |
+| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic
config, NONE means disabling dynamic config store | 0.4.0 |
| celeborn.master.endpoints | <localhost>:9097 | Endpoints of master
nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size
smaller than this configuration of partition size for estimation. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged
shuffle data. For example, if a reducer's shuffle data is 128M and the data
will need 16 fetch chunk requests to fetch. | 0.2.0 |
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java
new file mode 100644
index 000000000..121821461
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+public enum ConfigLevel {
+ SYSTEM,
+ TENANT,
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java
new file mode 100644
index 000000000..362d2b71e
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+public interface ConfigService {
+
+ SystemConfig getSystemConfig();
+
+ TenantConfig getRawTenantConfig(String tenantId);
+
+ default DynamicConfig getTenantConfig(String tenantId) {
+ TenantConfig tenantConfig = getRawTenantConfig(tenantId);
+ if (tenantConfig == null || tenantConfig.getConfigs().isEmpty()) {
+ return getSystemConfig();
+ } else {
+ return tenantConfig;
+ }
+ }
+
+ void refreshAllCache();
+
+ void shutdown();
+
+}
\ No newline at end of file
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java
new file mode 100644
index 000000000..9a051eea7
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+import org.apache.celeborn.common.internal.config.ConfigEntry;
+import org.apache.celeborn.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Dynamic configuration is a type of configuration that can be changed at
runtime as needed. It can be used at system level/tenant level.
+ * When applying dynamic configuration, the priority order is as follows:
tenant level overrides system level,
+ * which in turn overrides static configuration(CelebornConf). This means that
if a configuration is defined at the tenant level,
+ * it will be used instead of the system level or static
configuration(CelebornConf). If the tenant-level configuration is missing,
+ * the system-level configuration will be used. If the system-level
configuration is also missing, CelebornConf
+ * will be used as the default value.
+ */
+public abstract class DynamicConfig {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicConfig.class);
+ protected Map<String, String> configs = new HashMap<>();
+
+ public abstract DynamicConfig getParentLevelConfig();
+
+ public <T> T getWithDefaultValue(
+ String configKey, T defaultValue, Class<T> finalType, ConfigType
configType) {
+ String configValue = configs.get(configKey);
+ T formatValue = configValue != null ? formatValue(configKey, configValue,
finalType, configType) : null;
+ if (formatValue == null) {
+ return defaultValue;
+ } else {
+ return formatValue;
+ }
+ }
+
+ public <T> T getValue(String configKey, ConfigEntry<Object> configEntry,
Class<T> finalType, ConfigType configType) {
+ String configValue = configs.get(configKey);
+ T formatValue = configValue != null ? formatValue(configKey, configValue,
finalType, configType) : null;
+ if (formatValue == null) {
+ DynamicConfig parentLevelConfig = getParentLevelConfig();
+ return parentLevelConfig != null? parentLevelConfig.getValue(configKey,
configEntry, finalType, configType): null;
+ } else {
+ return formatValue;
+ }
+ }
+
+ public <T> T formatValue(String configKey, String configValue, Class<T>
finalType, ConfigType configType) {
+ try {
+ if (configValue != null) {
+ if (ConfigType.BYTES == configType) {
+ return convert(finalType,
String.valueOf(Utils.byteStringAsBytes(configValue)));
+ } else if (ConfigType.TIME_MS == configType) {
+ return convert(finalType,
String.valueOf(Utils.timeStringAsMs(configValue)));
+ } else {
+ return convert(finalType, configValue);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Config {} value format is not valid, refer to parent if
exist", configKey, e);
+ }
+ return null;
+ }
+
+ public Map<String, String> getConfigs() {
+ return configs;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("DynamicConfig{");
+ sb.append("configs=").append(configs);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public enum ConfigType {
+ BYTES,
+ STRING,
+ TIME_MS,
+ }
+
+ public static <T> T convert(Class<T> clazz, String value) {
+ if (Boolean.TYPE == clazz) {
+ return (T) Boolean.valueOf(value);
+ } else if (Byte.TYPE == clazz) {
+ return (T) Byte.valueOf(value);
+ } else if (Short.TYPE == clazz) {
+ return (T) Short.valueOf(value);
+ } else if (Integer.TYPE == clazz) {
+ return (T) Integer.valueOf(value);
+ } else if (Long.TYPE == clazz) {
+ return (T) Long.valueOf(value);
+ } else if (Float.TYPE == clazz) {
+ return (T) Float.valueOf(value);
+ } else if (Double.TYPE == clazz) {
+ return (T) Double.valueOf(value);
+ }
+ return (T) value;
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java
new file mode 100644
index 000000000..7346ddc20
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class DynamicConfigServiceFactory {
+
+ public static ConfigService getConfigService(CelebornConf celebornConf) {
+ String configStoreBackend = celebornConf.dynamicConfigStoreBackend();
+ if ("FS".equals(configStoreBackend)) {
+ return new FsConfigServiceImpl(celebornConf);
+ }
+
+ return null;
+ }
+}
\ No newline at end of file
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java
new file mode 100644
index 000000000..7a9b60a88
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+import org.apache.celeborn.common.util.ThreadUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import org.apache.celeborn.common.CelebornConf;
+
+import scala.concurrent.duration.Duration;
+
+public class FsConfigServiceImpl implements ConfigService {
+ private static final Logger LOG =
LoggerFactory.getLogger(FsConfigServiceImpl.class);
+ private CelebornConf celebornConf;
+ private final AtomicReference<SystemConfig> systemConfigAtomicReference =
new AtomicReference<>();
+ private final AtomicReference<Map<String, TenantConfig>>
tenantConfigAtomicReference = new AtomicReference<>(new HashMap<>());
+ private static final String CONF_TENANT_ID = "tenantId";
+ private static final String CONF_LEVEL = "level";
+ private static final String CONF_CONFIG = "config";
+
+ private final ScheduledExecutorService configRefreshService =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("config-refresh-service");
+
+ public FsConfigServiceImpl(CelebornConf celebornConf) {
+ this.celebornConf = celebornConf;
+ this.refresh();
+ long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshTime();
+ this.configRefreshService.scheduleWithFixedDelay(
+ () -> refresh(),
+ dynamicConfigRefreshTime,
+ dynamicConfigRefreshTime,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void refresh() {
+ File configurationFile = getConfigurationFile(System.getenv());
+ if (!configurationFile.exists()) {
+ return;
+ }
+
+ SystemConfig systemConfig = null;
+ Map<String, TenantConfig> tenantConfs = new HashMap<>();
+ try (FileInputStream fileInputStream = new
FileInputStream(configurationFile)) {
+ Yaml yaml = new Yaml();
+ List<Map<String, Object>> dynamicConfigs = yaml.load(fileInputStream);
+ for (Map<String, Object> settings : dynamicConfigs) {
+ String tenantId = (String) settings.get(CONF_TENANT_ID);
+ String level = (String) settings.get(CONF_LEVEL);
+ Map<String, String> config =
+ ((Map<String, Object>) settings.get(CONF_CONFIG))
+ .entrySet().stream()
+ .collect(Collectors.toMap(a -> a.getKey(), a ->
a.getValue().toString()));
+ if (ConfigLevel.TENANT.name().equals(level)) {
+ TenantConfig tenantConfig = new TenantConfig(this, tenantId, config);
+ tenantConfs.put(tenantId, tenantConfig);
+ } else {
+ systemConfig = new SystemConfig(celebornConf, config);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Refresh dynamic config error: {}", e.getMessage(), e);
+ }
+
+ tenantConfigAtomicReference.set(tenantConfs);
+ systemConfigAtomicReference.set(systemConfig == null ? new
SystemConfig(celebornConf) : systemConfig);
+ }
+
+ @Override
+ public SystemConfig getSystemConfig() {
+ return systemConfigAtomicReference.get();
+ }
+
+ @Override
+ public TenantConfig getRawTenantConfig(String tenantId) {
+ return tenantConfigAtomicReference.get().get(tenantId);
+ }
+
+ @Override
+ public void refreshAllCache() {
+ this.refresh();
+ }
+
+ @Override
+ public void shutdown() {
+ ThreadUtils.shutdown(configRefreshService, Duration.apply("800ms"));
+ }
+
+ private File getConfigurationFile(Map<String, String> env) {
+ if (!this.celebornConf.quotaConfigurationPath().isEmpty()) {
+ return new File(this.celebornConf.quotaConfigurationPath().get());
+ } else {
+ String dynamicConfPath =
+ Optional.ofNullable(env.get("CELEBORN_CONF_DIR"))
+ .orElse(env.getOrDefault("CELEBORN_HOME", ".") + File.separator
+ "conf");
+ return new File(dynamicConfPath + File.separator + "dynamicConfig.yaml");
+ }
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java
new file mode 100644
index 000000000..ff73bb7f2
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.internal.config.ConfigEntry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SystemConfig extends DynamicConfig {
+ private CelebornConf celebornConf;
+ public SystemConfig(CelebornConf celebornConf, Map<String, String> configs) {
+ this.celebornConf = celebornConf;
+ this.configs.putAll(configs);
+ }
+
+ public SystemConfig(CelebornConf celebornConf) {
+ this.celebornConf = celebornConf;
+ this.configs = new HashMap<>();
+ }
+
+ @Override
+ public DynamicConfig getParentLevelConfig() {
+ return null;
+ }
+
+ public <T> T getValue(String configKey, ConfigEntry<Object> configEntry,
Class<T> finalType, ConfigType configType) {
+ String configValue = configs.get(configKey);
+ T formatValue = configValue != null ? formatValue(configKey, configValue,
finalType, configType) : null;
+ if (formatValue == null && configEntry != null) {
+ return convert(finalType, celebornConf.get(configEntry).toString());
+ } else {
+ return formatValue;
+ }
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java
new file mode 100644
index 000000000..26198d6e0
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+import java.util.Map;
+
+public class TenantConfig extends DynamicConfig {
+ private String tenantId;
+ private ConfigService configService;
+
+ public TenantConfig(ConfigService configService, String tenantId,
Map<String, String> configs) {
+ this.configService = configService;
+ this.configs.putAll(configs);
+ this.tenantId = tenantId;
+ }
+
+ public Map<String, String> getConfigs() {
+ return configs;
+ }
+
+ @Override
+ public DynamicConfig getParentLevelConfig() {
+ return configService.getSystemConfig();
+ }
+}
diff --git a/service/src/test/resources/dynamicConfig.yaml
b/service/src/test/resources/dynamicConfig.yaml
new file mode 100644
index 000000000..22d17ee62
--- /dev/null
+++ b/service/src/test/resources/dynamicConfig.yaml
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- level: SYSTEM
+ config:
+ celeborn.client.push.buffer.initial.size: 100k
+ celeborn.client.push.buffer.max.size: 1000k
+ celeborn.worker.fetch.heartbeat.enabled: true
+ celeborn.client.push.buffer.initial.size.only: 10k
+ celeborn.test.timeoutMs.only: 100s
+ celeborn.test.enabled.only: false
+ celeborn.test.int.only: 10
+
+- tenantId: tenant_id
+ level: TENANT
+ config:
+ celeborn.client.push.buffer.initial.size: 10k
+ celeborn.client.push.buffer.initial.size.only: 100k
+ celeborn.worker.fetch.heartbeat.enabled: false
+ celeborn.test.tenant.timeoutMs.only: 100s
+ celeborn.test.tenant.enabled.only: false
+ celeborn.test.tenant.int.only: 10
+
diff --git a/service/src/test/resources/dynamicConfig_2.yaml
b/service/src/test/resources/dynamicConfig_2.yaml
new file mode 100644
index 000000000..57645e0b8
--- /dev/null
+++ b/service/src/test/resources/dynamicConfig_2.yaml
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- level: SYSTEM
+ config:
+ celeborn.test.int.only: 100
+
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java
b/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java
new file mode 100644
index 000000000..47c41b011
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.config;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.server.common.service.config.ConfigService;
+import org.apache.celeborn.server.common.service.config.DynamicConfig;
+import
org.apache.celeborn.server.common.service.config.DynamicConfig.ConfigType;
+import org.apache.celeborn.server.common.service.config.FsConfigServiceImpl;
+import org.apache.celeborn.server.common.service.config.SystemConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConfigServiceSuiteJ {
+
+ @Test
+ public void testFsConfig() {
+ CelebornConf celebornConf = new CelebornConf();
+ String file = getClass().getResource("/dynamicConfig.yaml").getFile();
+ celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
+ celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_TIME(), 5l);
+ FsConfigServiceImpl fsConfigService = new
FsConfigServiceImpl(celebornConf);
+
+ verifyConfig(fsConfigService);
+
+ // change -> refresh config
+ file = getClass().getResource("/dynamicConfig_2.yaml").getFile();
+ celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
+
+ fsConfigService.refreshAllCache();
+ SystemConfig systemConfig = fsConfigService.getSystemConfig();
+
+ // verify systemConfig's intConf
+ Integer intConfValue = systemConfig.getValue("celeborn.test.int.only",
null, Integer.TYPE, ConfigType.STRING);
+ Assert.assertEquals(intConfValue.intValue(), 100);
+
+ // verify systemConfig's bytesConf -- defer to celebornConf
+ Long value =
systemConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(),
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 1073741824);
+ }
+
+ public void verifyConfig(ConfigService configService) {
+ // ------------- Verify SystemConfig ----------------- //
+ SystemConfig systemConfig = configService.getSystemConfig();
+ // verify systemConfig's bytesConf -- use systemConfig
+ Long value =
systemConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 102400);
+
+ // verify systemConfig's bytesConf -- defer to celebornConf
+ value =
systemConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(),
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 1073741824);
+
+ // verify systemConfig's bytesConf only -- use systemConfig
+ value =
systemConfig.getValue("celeborn.client.push.buffer.initial.size.only", null,
Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 10240);
+
+ // verify systemConfig's bytesConf with none
+ value =
systemConfig.getValue("celeborn.client.push.buffer.initial.size.only.none",
null, Long.TYPE, ConfigType.BYTES);
+ Assert.assertNull(value);
+
+ // verify systemConfig's timesConf
+ value = systemConfig.getValue("celeborn.test.timeoutMs.only", null,
Long.TYPE, ConfigType.TIME_MS);
+ Assert.assertEquals(value.longValue(), 100000);
+
+ // verify systemConfig's BooleanConf
+ Boolean booleanConfValue =
systemConfig.getValue("celeborn.test.timeoutMs.only", null, Boolean.TYPE,
ConfigType.STRING);
+ Assert.assertFalse(booleanConfValue);
+
+ // verify systemConfig's intConf
+ Integer intConfValue = systemConfig.getValue("celeborn.test.int.only",
null, Integer.TYPE, ConfigType.STRING);
+ Assert.assertEquals(intConfValue.intValue(), 10);
+
+ // ------------- Verify TenantConfig ----------------- //
+ DynamicConfig tenantConfig = configService.getTenantConfig("tenant_id");
+ // verify tenantConfig's bytesConf -- use tenantConf
+ value =
tenantConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 10240);
+
+ // verify tenantConfig's bytesConf -- defer to systemConf
+ value =
tenantConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 1024000);
+
+ // verify tenantConfig's bytesConf -- defer to celebornConf
+ value =
tenantConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(),
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 1073741824);
+
+ // verify tenantConfig's bytesConf only -- use tenantConf
+ value =
tenantConfig.getValue("celeborn.client.push.buffer.initial.size.only", null,
Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 102400);
+
+ // verify tenantConfig's bytesConf with none
+ value =
tenantConfig.getValue("celeborn.client.push.buffer.initial.size.only.none",
null, Long.TYPE, ConfigType.BYTES);
+ Assert.assertNull(value);
+
+ DynamicConfig tenantConfigNone =
configService.getTenantConfig("tenant_id_none");
+ // verify tenantConfig's bytesConf -- defer to systemConf
+ value =
tenantConfigNone.getValue(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 1024000);
+
+
+ // ------------- Verify with defaultValue ----------------- //
+ value =
tenantConfig.getWithDefaultValue("celeborn.client.push.buffer.initial.size.only",
100l, Long.TYPE, ConfigType.BYTES);
+ Assert.assertEquals(value.longValue(), 102400);
+
+ Long withDefaultValue = tenantConfigNone.getWithDefaultValue("none", 10l,
Long.TYPE, ConfigType.STRING);
+ Assert.assertEquals(withDefaultValue.longValue(), 10);
+ }
+}
\ No newline at end of file