This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e4f7ea8e0 [CELEBORN-1264] ConfigService supports TENANT_USER config 
level
e4f7ea8e0 is described below

commit e4f7ea8e01c9f8460254d3d13aee5a920a3822ab
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sun Feb 18 16:22:22 2024 +0800

    [CELEBORN-1264] ConfigService supports TENANT_USER config level
    
    ### What changes were proposed in this pull request?
     ConfigService support user level config
    
    ### Why are the changes needed?
    Support more case of config, later can integrate with quota manager
    
    ### Does this PR introduce _any_ user-facing change?
    With this pr, user's setting form config service will have three level
    
    - User
    - Tenant
    - System
    
    User identifier is construct by username and tenantId,
    If there is no specify setting for username, will fallback to tenant level 
setting, if tenant level setting also not set, fallback to system setting
    
    ### How was this patch tested?
    Added UT
    
    Closes #2285 from AngersZhuuuu/CELEBORN-1264.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../service/config/BaseConfigServiceImpl.java      |  9 +++
 .../server/common/service/config/ConfigLevel.java  |  1 +
 .../common/service/config/ConfigService.java       | 11 ++++
 .../common/service/config/DbConfigServiceImpl.java | 10 +++
 .../common/service/config/FsConfigServiceImpl.java | 47 ++++++++++---
 .../server/common/service/config/TenantConfig.java |  6 +-
 .../common/service/model/ClusterTenantConfig.java  | 14 ++--
 .../common/service/store/IServiceManager.java      |  2 +
 .../service/store/db/DbServiceManagerImpl.java     | 28 ++++++++
 .../store/db/mapper/ClusterTenantConfigMapper.java |  2 +-
 .../common/service/config/ConfigServiceSuiteJ.java | 77 ++++++++++++++++++++--
 .../test/resources/celeborn-0.5.0-h2-ut-data.sql   | 14 +++-
 service/src/test/resources/celeborn-0.5.0-h2.sql   |  4 +-
 service/src/test/resources/dynamicConfig.yaml      | 17 +++++
 14 files changed, 217 insertions(+), 25 deletions(-)

diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java
index 5e7c27009..93e013220 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,9 @@ public abstract class BaseConfigServiceImpl implements 
ConfigService {
   protected final AtomicReference<Map<String, TenantConfig>> 
tenantConfigAtomicReference =
       new AtomicReference<>(new HashMap<>());
 
+  protected final AtomicReference<Map<Pair<String, String>, TenantConfig>>
+      tenantUserConfigAtomicReference = new AtomicReference<>(new HashMap<>());
+
   private final ScheduledExecutorService configRefreshService =
       
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");
 
@@ -81,6 +85,11 @@ public abstract class BaseConfigServiceImpl implements 
ConfigService {
     return tenantConfigAtomicReference.get().get(tenantId);
   }
 
+  @Override
+  public TenantConfig getRawTenantUserConfig(String tenantId, String userId) {
+    return tenantUserConfigAtomicReference.get().get(Pair.of(tenantId, 
userId));
+  }
+
   @Override
   public void shutdown() {
     ThreadUtils.shutdown(configRefreshService);
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java
index 121821461..7fcd510e6 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java
@@ -20,4 +20,5 @@ package org.apache.celeborn.server.common.service.config;
 public enum ConfigLevel {
   SYSTEM,
   TENANT,
+  TENANT_USER
 }
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java
index fc918b0a5..1f141f551 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java
@@ -38,6 +38,17 @@ public interface ConfigService {
     }
   }
 
+  TenantConfig getRawTenantUserConfig(String tenantId, String userId);
+
+  default DynamicConfig getTenantUserConfig(String tenantId, String userId) {
+    TenantConfig tenantConfig = getRawTenantUserConfig(tenantId, userId);
+    if (tenantConfig == null) {
+      return getTenantConfigFromCache(tenantId);
+    } else {
+      return tenantConfig;
+    }
+  }
+
   void refreshAllCache() throws IOException;
 
   void shutdown();
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java
index 6217545b5..22bf79532 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.server.common.service.store.IServiceManager;
 import org.apache.celeborn.server.common.service.store.db.DbServiceManagerImpl;
@@ -50,5 +52,13 @@ public class DbConfigServiceImpl extends 
BaseConfigServiceImpl implements Config
         allTenantConfigs.stream()
             .collect(Collectors.toMap(TenantConfig::getTenantId, 
Function.identity()));
     tenantConfigAtomicReference.set(tenantConfigMap);
+    List<TenantConfig> allTenantUserConfigs = 
iServiceManager.getAllTenantUserConfigs();
+    Map<Pair<String, String>, TenantConfig> tenantUserConfigMap =
+        allTenantUserConfigs.stream()
+            .collect(
+                Collectors.toMap(
+                    tenantConfig -> Pair.of(tenantConfig.getTenantId(), 
tenantConfig.getName()),
+                    Function.identity()));
+    tenantUserConfigAtomicReference.set(tenantUserConfigMap);
   }
 }
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java
index decf061af..df3e80709 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -35,6 +36,8 @@ import org.apache.celeborn.common.CelebornConf;
 public class FsConfigServiceImpl extends BaseConfigServiceImpl implements 
ConfigService {
   private static final Logger LOG = 
LoggerFactory.getLogger(FsConfigServiceImpl.class);
   private static final String CONF_TENANT_ID = "tenantId";
+  private static final String CONF_TENANT_USERS = "users";
+  private static final String CONF_TENANT_NAME = "name";
   private static final String CONF_LEVEL = "level";
   private static final String CONF_CONFIG = "config";
 
@@ -51,21 +54,40 @@ public class FsConfigServiceImpl extends 
BaseConfigServiceImpl implements Config
 
     SystemConfig systemConfig = null;
     Map<String, TenantConfig> tenantConfs = new HashMap<>();
+    Map<Pair<String, String>, TenantConfig> tenantUserConfs = new HashMap<>();
     try (FileInputStream fileInputStream = new 
FileInputStream(configurationFile)) {
       Yaml yaml = new Yaml();
       List<Map<String, Object>> dynamicConfigs = yaml.load(fileInputStream);
       for (Map<String, Object> settings : dynamicConfigs) {
-        String tenantId = (String) settings.get(CONF_TENANT_ID);
         String level = (String) settings.get(CONF_LEVEL);
-        Map<String, String> config =
-            ((Map<String, Object>) settings.get(CONF_CONFIG))
-                .entrySet().stream()
-                    .collect(Collectors.toMap(Map.Entry::getKey, a -> 
a.getValue().toString()));
         if (ConfigLevel.TENANT.name().equals(level)) {
-          TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, 
config);
-          tenantConfs.put(tenantId, tenantConfig);
+          if (settings.containsKey(CONF_TENANT_ID)) {
+            String tenantId = (String) settings.get(CONF_TENANT_ID);
+            if (settings.containsKey(CONF_CONFIG)) {
+              Map<String, String> config = extractConfig(settings);
+              TenantConfig tenantConfig = new TenantConfig(this, tenantId, 
null, config);
+              tenantConfs.put(tenantId, tenantConfig);
+            }
+            if (settings.containsKey(CONF_TENANT_USERS)) {
+              List<Map<String, Object>> users =
+                  (List<Map<String, Object>>) settings.get(CONF_TENANT_USERS);
+              for (Map<String, Object> userSetting : users) {
+                if (userSetting.containsKey(CONF_TENANT_NAME)
+                    && userSetting.containsKey(CONF_CONFIG)) {
+                  String name = (String) userSetting.get(CONF_TENANT_NAME);
+                  Map<String, String> userConfig = extractConfig(userSetting);
+                  TenantConfig tenantUserConfig =
+                      new TenantConfig(this, tenantId, name, userConfig);
+                  tenantUserConfs.put(Pair.of(tenantId, name), 
tenantUserConfig);
+                }
+              }
+            }
+          }
         } else {
-          systemConfig = new SystemConfig(celebornConf, config);
+          if (settings.containsKey(CONF_CONFIG)) {
+            Map<String, String> config = extractConfig(settings);
+            systemConfig = new SystemConfig(celebornConf, config);
+          }
         }
       }
     } catch (Exception e) {
@@ -73,12 +95,21 @@ public class FsConfigServiceImpl extends 
BaseConfigServiceImpl implements Config
       return;
     }
 
+    tenantUserConfigAtomicReference.set(tenantUserConfs);
     tenantConfigAtomicReference.set(tenantConfs);
     if (systemConfig != null) {
       systemConfigAtomicReference.set(systemConfig);
     }
   }
 
+  private Map<String, String> extractConfig(Map<String, Object> setting) {
+    Map<String, String> config =
+        ((Map<String, Object>) setting.get(CONF_CONFIG))
+            .entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, a -> 
a.getValue().toString()));
+    return config;
+  }
+
   private File getConfigurationFile(Map<String, String> env) {
     if (!this.celebornConf.quotaConfigurationPath().isEmpty()) {
       return new File(this.celebornConf.quotaConfigurationPath().get());
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java
index bea2f5960..070c7e4d5 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java
@@ -56,6 +56,10 @@ public class TenantConfig extends DynamicConfig {
 
   @Override
   public DynamicConfig getParentLevelConfig() {
-    return configService.getSystemConfigFromCache();
+    if (name == null) {
+      return configService.getSystemConfigFromCache();
+    } else {
+      return configService.getTenantConfigFromCache(tenantId);
+    }
   }
 }
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java
index b3c7d068d..ebb2e9977 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java
@@ -28,7 +28,7 @@ public class ClusterTenantConfig {
   private Integer clusterId;
   private String tenantId;
   private String level;
-  private String user;
+  private String name;
   private String configKey;
   private String configValue;
   private String type;
@@ -67,12 +67,12 @@ public class ClusterTenantConfig {
     this.level = level;
   }
 
-  public String getUser() {
-    return StringUtils.isBlank(user) ? null : user;
+  public String getName() {
+    return StringUtils.isBlank(name) ? null : name;
   }
 
-  public void setUser(String user) {
-    this.user = user;
+  public void setName(String name) {
+    this.name = name;
   }
 
   public String getConfigKey() {
@@ -116,7 +116,7 @@ public class ClusterTenantConfig {
   }
 
   public Pair getTenantInfo() {
-    return Pair.of(tenantId, user);
+    return Pair.of(tenantId, name);
   }
 
   @Override
@@ -126,7 +126,7 @@ public class ClusterTenantConfig {
     sb.append(", clusterId=").append(clusterId);
     sb.append(", tenantId='").append(tenantId).append('\'');
     sb.append(", level='").append(level).append('\'');
-    sb.append(", user='").append(user).append('\'');
+    sb.append(", user='").append(name).append('\'');
     sb.append(", configKey='").append(configKey).append('\'');
     sb.append(", configValue='").append(configValue).append('\'');
     sb.append(", type='").append(type).append('\'');
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java
index c2a8a9d0e..7889297d9 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java
@@ -31,5 +31,7 @@ public interface IServiceManager {
 
   List<TenantConfig> getAllTenantConfigs();
 
+  List<TenantConfig> getAllTenantUserConfigs();
+
   SystemConfig getSystemConfig();
 }
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java
index e957cfed5..49a65ef7a 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.slf4j.Logger;
@@ -113,6 +114,33 @@ public class DbServiceManagerImpl implements 
IServiceManager {
     }
   }
 
+  @Override
+  public List<TenantConfig> getAllTenantUserConfigs() {
+    try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
+      ClusterTenantConfigMapper mapper = 
sqlSession.getMapper(ClusterTenantConfigMapper.class);
+      int totalNum = mapper.getClusterTenantConfigsNum(clusterId, 
ConfigLevel.TENANT_USER.name());
+      int offset = 0;
+      List<ClusterTenantConfig> clusterAllTenantConfigs = new ArrayList<>();
+      while (offset < totalNum) {
+        List<ClusterTenantConfig> clusterTenantConfigs =
+            mapper.getClusterTenantConfigs(
+                clusterId, ConfigLevel.TENANT_USER.name(), offset, pageSize);
+        clusterAllTenantConfigs.addAll(clusterTenantConfigs);
+        offset = offset + pageSize;
+      }
+
+      Map<Pair<String, String>, List<ClusterTenantConfig>> tenantConfigMaps =
+          clusterAllTenantConfigs.stream()
+              
.collect(Collectors.groupingBy(ClusterTenantConfig::getTenantInfo));
+      return tenantConfigMaps.entrySet().stream()
+          .map(
+              t ->
+                  new TenantConfig(
+                      configService, t.getKey().getKey(), 
t.getKey().getValue(), t.getValue()))
+          .collect(Collectors.toList());
+    }
+  }
+
   @Override
   public SystemConfig getSystemConfig() {
     try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java
index af4539f66..b67c2c233 100644
--- 
a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java
@@ -27,7 +27,7 @@ import 
org.apache.celeborn.server.common.service.model.ClusterTenantConfig;
 public interface ClusterTenantConfigMapper {
 
   @Select(
-      "SELECT id, cluster_id, tenant_id, level, user, config_key, 
config_value, type, gmt_create, gmt_modify "
+      "SELECT id, cluster_id, tenant_id, level, name, config_key, 
config_value, type, gmt_create, gmt_modify "
           + "FROM celeborn_cluster_tenant_config WHERE cluster_id = 
#{clusterId} AND level=#{level} LIMIT #{offset}, #{pageSize}")
   List<ClusterTenantConfig> getClusterTenantConfigs(
       @Param("clusterId") int clusterId,
diff --git 
a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java
 
b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java
index 03cdc9c30..089c70514 100644
--- 
a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java
+++ 
b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java
@@ -44,7 +44,9 @@ public class ConfigServiceSuiteJ {
         CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_DRIVER_CLASS_NAME(), 
"org.h2.Driver");
     
celebornConf.set(CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_MAXIMUM_POOL_SIZE(),
 "1");
     configService = new DbConfigServiceImpl(celebornConf);
-    verifyConfig(configService);
+    verifySystemConfig(configService);
+    verifyTenantConfig(configService);
+    verifyTenantUserConfig(configService);
 
     SqlSessionFactory sqlSessionFactory = DBSessionFactory.get(celebornConf);
     try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
@@ -68,7 +70,9 @@ public class ConfigServiceSuiteJ {
     celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
     celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 5L);
     configService = new FsConfigServiceImpl(celebornConf);
-    verifyConfig(configService);
+    verifySystemConfig(configService);
+    verifyTenantConfig(configService);
+    verifyTenantUserConfig(configService);
     // change -> refresh config
     file = getClass().getResource("/dynamicConfig_2.yaml").getFile();
     celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
@@ -84,7 +88,7 @@ public class ConfigServiceSuiteJ {
     }
   }
 
-  public void verifyConfig(ConfigService configService) {
+  public void verifySystemConfig(ConfigService configService) {
     // ------------- Verify SystemConfig ----------------- //
     SystemConfig systemConfig = configService.getSystemConfigFromCache();
     // verify systemConfig's bytesConf -- use systemConfig
@@ -135,11 +139,13 @@ public class ConfigServiceSuiteJ {
     Integer intConfValue =
         systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, 
ConfigType.STRING);
     Assert.assertEquals(intConfValue.intValue(), 10);
+  }
 
+  public void verifyTenantConfig(ConfigService configService) {
     // ------------- Verify TenantConfig ----------------- //
     DynamicConfig tenantConfig = 
configService.getTenantConfigFromCache("tenant_id");
     // verify tenantConfig's bytesConf -- use tenantConf
-    value =
+    Long value =
         tenantConfig.getValue(
             CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
             CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
@@ -201,6 +207,69 @@ public class ConfigServiceSuiteJ {
     Assert.assertEquals(withDefaultValue.longValue(), 10);
   }
 
+  public void verifyTenantUserConfig(ConfigService configService) {
+    // ------------- Verify UserConfig ----------------- //
+    DynamicConfig userConfig = configService.getTenantUserConfig("tenant_id1", 
"Jerry");
+    // verify userConfig's bytesConf -- use userConf
+    Long value =
+        userConfig.getValue(
+            CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
+            CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
+            Long.TYPE,
+            ConfigType.BYTES);
+    Assert.assertEquals(value.longValue(), 1024);
+
+    // verify userConfig's bytesConf -- defer to tenantConf
+    value =
+        userConfig.getValue(
+            CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY().key(),
+            CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY(),
+            Long.TYPE,
+            ConfigType.BYTES);
+    Assert.assertEquals(value.longValue(), 1024);
+
+    // verify userConfig's bytesConf -- defer to systemConf
+    value =
+        userConfig.getValue(
+            CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
+            CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
+            Long.TYPE,
+            ConfigType.BYTES);
+    Assert.assertEquals(value.longValue(), 1024000);
+
+    // verify userConfig's bytesConf -- defer to celebornConf
+    value =
+        userConfig.getValue(
+            CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(),
+            CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(),
+            Long.TYPE,
+            ConfigType.BYTES);
+    Assert.assertEquals(value.longValue(), 1073741824);
+
+    // verify userConfig's bytesConf with none
+    value =
+        userConfig.getValue(
+            "celeborn.client.push.buffer.initial.size.only.none",
+            null,
+            Long.TYPE,
+            ConfigType.BYTES);
+    Assert.assertNull(value);
+
+    DynamicConfig userConfigNone = 
configService.getTenantUserConfig("tenant_id", "non_exist");
+    // verify userConfig's bytesConf -- defer to tenantConf
+    value =
+        userConfigNone.getValue(
+            CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
+            CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
+            Long.TYPE,
+            ConfigType.BYTES);
+    Assert.assertEquals(value.longValue(), 1024000);
+
+    Long withDefaultValue =
+        userConfigNone.getWithDefaultValue("none", 10L, Long.TYPE, 
ConfigType.STRING);
+    Assert.assertEquals(withDefaultValue.longValue(), 10);
+  }
+
   public void verifyConfigChanged(ConfigService configService) {
 
     SystemConfig systemConfig = configService.getSystemConfigFromCache();
diff --git a/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql 
b/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql
index 7e0b9b813..d6dc77f87 100644
--- a/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql
+++ b/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql
@@ -27,11 +27,21 @@ VALUES
     ( 6, 1, 'celeborn.test.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
     ( 7, 1, 'celeborn.test.enabled.only', 'false', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
     ( 8, 1, 'celeborn.test.int.only', '10', 'QUOTA', '2023-08-26 22:08:30', 
'2023-08-26 22:08:30' );
-INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, 
`tenant_id`, `level`, `user`, `config_key`, `config_value`, `type`, 
`gmt_create`, `gmt_modify` )
+INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, 
`tenant_id`, `level`, `name`, `config_key`, `config_value`, `type`, 
`gmt_create`, `gmt_modify` )
 VALUES
     ( 1, 1, 'tenant_id', 'TENANT', '', 
'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
     ( 2, 1, 'tenant_id', 'TENANT', '', 
'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
     ( 3, 1, 'tenant_id', 'TENANT', '', 
'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
     ( 4, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', 
'100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
     ( 5, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 
'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
-    ( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', 
'100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' );
+    ( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', 
'100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
+    ( 7, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.queue.capacity', 
'1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
+    ( 8, 1, 'tenant_id1', 'TENANT', '', 
'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
+    ( 9, 1, 'tenant_id1', 'TENANT', '', 
'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
+    ( 10, 1, 'tenant_id1', 'TENANT', '', 
'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
+    ( 11, 1, 'tenant_id1', 'TENANT', '', 
'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', 
'2023-08-26 22:08:30' ),
+    ( 12, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 
'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
+    ( 13, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.int.only', 
'100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
+    ( 14, 1, 'tenant_id1', 'TENANT', '', 
'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', 
'2023-08-26 22:08:30' ),
+    ( 15, 1, 'tenant_id1', 'TENANT_USER', 'Jerry', 
'celeborn.client.push.buffer.initial.size', '1k', 'QUOTA', '2023-08-26 
22:08:30', '2023-08-26 22:08:30' ),
+    ( 16, 1, 'tenant_id1', 'TENANT_USER', 'Jerry', 
'celeborn.client.push.buffer.initial.size.user.only', '512k', 'QUOTA', 
'2023-08-26 22:08:30', '2023-08-26 22:08:30' );
diff --git a/service/src/test/resources/celeborn-0.5.0-h2.sql 
b/service/src/test/resources/celeborn-0.5.0-h2.sql
index b57de8028..0a912eae7 100644
--- a/service/src/test/resources/celeborn-0.5.0-h2.sql
+++ b/service/src/test/resources/celeborn-0.5.0-h2.sql
@@ -53,12 +53,12 @@ CREATE TABLE IF NOT EXISTS celeborn_cluster_tenant_config
     cluster_id   int          NOT NULL,
     tenant_id    varchar(255) NOT NULL,
     level        varchar(255) NOT NULL COMMENT 'config level, valid level is 
TENANT,USER',
-    `user`       varchar(255) DEFAULT NULL COMMENT 'tenant sub user',
+    name         varchar(255) NOT NULL COMMENT 'tenant sub user',
     config_key   varchar(255) NOT NULL,
     config_value varchar(255) NOT NULL,
     type         varchar(255) DEFAULT NULL COMMENT 'conf categories, such as 
quota',
     gmt_create   timestamp NOT NULL,
     gmt_modify   timestamp NOT NULL,
     PRIMARY KEY (id),
-    UNIQUE KEY `index_unique_tenant_config_key` (`cluster_id`, `tenant_id`, 
`user`, `config_key`)
+    UNIQUE KEY `index_unique_tenant_config_key` (`cluster_id`, `tenant_id`, 
`name`, `config_key`)
 );
diff --git a/service/src/test/resources/dynamicConfig.yaml 
b/service/src/test/resources/dynamicConfig.yaml
index 22d17ee62..ea759a4f3 100644
--- a/service/src/test/resources/dynamicConfig.yaml
+++ b/service/src/test/resources/dynamicConfig.yaml
@@ -34,3 +34,20 @@
      celeborn.test.tenant.enabled.only: false
      celeborn.test.tenant.int.only: 10
 
+-  tenantId: tenant_id1
+   level: TENANT
+   config:
+     celeborn.client.push.buffer.initial.size: 10k
+     celeborn.client.push.buffer.initial.size.only: 100k
+     celeborn.worker.fetch.heartbeat.enabled: false
+     celeborn.test.tenant.timeoutMs.only: 100s
+     celeborn.test.tenant.enabled.only: false
+     celeborn.test.tenant.int.only: 10
+     celeborn.client.push.queue.capacity: 1024
+   users:
+     - name: Jerry
+       config:
+         celeborn.client.push.buffer.initial.size: 1k
+         celeborn.client.push.buffer.initial.size.user.only: 512k
+
+

Reply via email to