This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e8d77a48 [INLONG-8106][DataProxy] Optimize ConfigManager 
implementation ( part one ) (#8107)
6e8d77a48 is described below

commit 6e8d77a487baaeb6754f65a98be3c3c8d274d1ee
Author: Goson Zhang <[email protected]>
AuthorDate: Mon May 29 16:36:12 2023 +0800

    [INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one ) 
(#8107)
---
 .../dataproxy/channel/FailoverChannelSelector.java |  14 +-
 .../inlong/dataproxy/config/ConfigHolder.java      |   8 +-
 .../inlong/dataproxy/config/ConfigManager.java     | 116 ++++++---
 .../inlong/dataproxy/config/PropertiesHolder.java  | 270 +++++++++++++++++++++
 .../config/holder/BlackListConfigHolder.java       |  38 +++
 .../config/holder/GroupIdNumConfigHolder.java      | 268 ++++++++++++++++++++
 .../config/holder/GroupIdPropertiesHolder.java     |  88 -------
 .../config/holder/MQClusterConfigHolder.java       |   2 +-
 .../config/holder/MxPropertiesHolder.java          |   2 +-
 .../config/holder/PropertiesConfigHolder.java      |   2 +-
 ...sitConfigHolder.java => VisitConfigHolder.java} |  12 +-
 .../config/holder/WeightConfigHolder.java          | 157 ++++++++++++
 .../config/holder/WhiteListConfigHolder.java       |  40 +++
 .../dataproxy/heartbeat/HeartbeatManager.java      |  17 +-
 .../dataproxy/source/ServerMessageHandler.java     |  17 +-
 .../dataproxy/source/SimpleMessageHandler.java     |  17 +-
 .../inlong/dataproxy/source2/BaseSource.java       |  14 +-
 .../inlong/dataproxy/source2/SourceConstants.java  |   3 -
 .../dataproxy/source2/v0msg/CodecBinMsg.java       |  55 ++---
 19 files changed, 916 insertions(+), 224 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
index 5bdb1e8da..921d595ed 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
@@ -103,9 +103,7 @@ public class FailoverChannelSelector extends 
AbstractChannelSelector {
      */
     private List<String> splitChannelName(String channelName) {
         List<String> fileMetricList = new ArrayList<String>();
-        if (StringUtils.isEmpty(channelName)) {
-            LOG.info("channel name is null!");
-        } else {
+        if (StringUtils.isNotBlank(channelName)) {
             fileMetricList = Arrays.asList(channelName.split("\\s+"));
         }
         return fileMetricList;
@@ -145,11 +143,9 @@ public class FailoverChannelSelector extends 
AbstractChannelSelector {
                 this.slaveChannels.add(channel);
             }
         }
-        LOG.info("masters:" + this.masterChannels);
-        LOG.info("orders:" + this.orderChannels);
-        LOG.info("slaves:" + this.slaveChannels);
-        LOG.info("transfers:" + this.transferChannels);
-        LOG.info("agentFileMetrics:" + this.agentFileMetricChannels);
-        LOG.info("slaMetrics:" + this.slaMetricChannels);
+        LOG.info(
+                "Configure channels, masters={}, orders={}, slaves={}, 
transfers={}, agentFileMetrics={}, slaMetrics={}",
+                this.masterChannels, this.orderChannels, this.slaveChannels,
+                this.transferChannels, this.agentFileMetricChannels, 
this.slaMetricChannels);
     }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
index 7748470f5..d6e533494 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
@@ -79,7 +79,7 @@ public abstract class ConfigHolder {
      *
      * @return - true if configure updated
      */
-    public abstract boolean loadFromFileToHolder();
+    protected abstract boolean loadFromFileToHolder();
 
     /**
      * check updater
@@ -92,7 +92,7 @@ public abstract class ConfigHolder {
             if (configFile != null) {
                 this.lastModifyTime = configFile.lastModified();
             }
-            LOG.info("File {} has changed, reload from local file agent", 
getFileName());
+            LOG.info("File {} has changed, reload from local file", 
this.fileName);
             return loadFromFileToHolder();
         }
         return false;
@@ -125,8 +125,8 @@ public abstract class ConfigHolder {
         if (url != null) {
             this.filePath = url.getPath();
             this.configFile = new File(this.filePath);
-            LOG.info("set file path lastTime: {}, currentTime: {}",
-                    lastModifyTime, configFile.lastModified());
+            LOG.info("Set {} file path, lastTime: {}, currentTime: {}",
+                    fileName, lastModifyTime, configFile.lastModified());
         }
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index e495ef0ad..6307d2bf8 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -20,14 +20,16 @@ package org.apache.inlong.dataproxy.config;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigRequest;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.holder.GroupIdPropertiesHolder;
-import org.apache.inlong.dataproxy.config.holder.IPVisitConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.MQClusterConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
+import org.apache.inlong.dataproxy.config.holder.WeightConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.WhiteListConfigHolder;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -52,6 +54,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -65,15 +68,18 @@ public class ConfigManager {
     public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new 
ArrayList<>();
     private static volatile boolean isInit = false;
     private static ConfigManager instance = null;
+    // node weight configure
+    private final WeightConfigHolder weightConfigHolder = new 
WeightConfigHolder();
+    // black list configure
+    private final BlackListConfigHolder blacklistConfigHolder = new 
BlackListConfigHolder();
+    // whitelist configure
+    private final WhiteListConfigHolder whitelistConfigHolder = new 
WhiteListConfigHolder();
+    // group id num 2 name configure
+    private final GroupIdNumConfigHolder groupIdConfig = new 
GroupIdNumConfigHolder();
 
     private final MQClusterConfigHolder mqClusterConfigHolder = new 
MQClusterConfigHolder("mq_cluster.properties");
     private final PropertiesConfigHolder topicConfig = new 
PropertiesConfigHolder("topics.properties");
     private final MxPropertiesHolder mxConfig = new 
MxPropertiesHolder("mx.properties");
-
-    private final GroupIdPropertiesHolder groupIdConfig = new 
GroupIdPropertiesHolder("groupid_mapping.properties");
-    private final PropertiesConfigHolder weightHolder = new 
PropertiesConfigHolder("weight.properties");
-    private final IPVisitConfigHolder blackListConfig = new 
IPVisitConfigHolder(true, "blacklist.properties");
-    private final IPVisitConfigHolder whiteListConfig = new 
IPVisitConfigHolder(false, "whitelist.properties");
     // source report configure holder
     private final SourceReportConfigHolder sourceReportConfigHolder = new 
SourceReportConfigHolder();
     // mq clusters ready
@@ -101,8 +107,25 @@ public class ConfigManager {
         return instance;
     }
 
-    public Map<String, String> getWeightProperties() {
-        return weightHolder.getHolder();
+    // get node weight configure
+    public double getCpuWeight() {
+        return weightConfigHolder.getCachedCpuWeight();
+    }
+
+    public double getNetInWeight() {
+        return weightConfigHolder.getCachedNetInWeight();
+    }
+
+    public double getNetOutWeight() {
+        return weightConfigHolder.getCachedNetOutWeight();
+    }
+
+    public double getTcpWeight() {
+        return weightConfigHolder.getCachedTcpWeight();
+    }
+
+    public double getCpuThresholdWeight() {
+        return weightConfigHolder.getCachedCpuThreshold();
     }
 
     /**
@@ -134,24 +157,59 @@ public class ConfigManager {
         return updatePropertiesHolder(result, topicConfig, false);
     }
 
-    public Map<String, String> getMxProperties() {
-        return mxConfig.getHolder();
+    // get groupId num 2 name info
+    public boolean isEnableNum2NameTrans(String groupIdNum) {
+        return groupIdConfig.isEnableNum2NameTrans(groupIdNum);
+    }
+
+    public boolean isGroupIdNumConfigEmpty() {
+        return groupIdConfig.isGroupIdNumConfigEmpty();
+    }
+
+    public boolean isStreamIdNumConfigEmpty() {
+        return groupIdConfig.isStreamIdNumConfigEmpty();
+    }
+
+    public String getGroupIdNameByNum(String groupIdNum) {
+        return groupIdConfig.getGroupIdNameByNum(groupIdNum);
+    }
+
+    public String getStreamIdNameByIdNum(String groupIdNum, String 
streamIdNum) {
+        return groupIdConfig.getStreamIdNameByIdNum(groupIdNum, streamIdNum);
+    }
+
+    public ConcurrentHashMap<String, String> getGroupIdNumMap() {
+        return groupIdConfig.getGroupIdNumMap();
     }
 
+    public ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
getStreamIdNumMap() {
+        return groupIdConfig.getStreamIdNumMap();
+    }
+
+    // get blacklist whitelist configure info
     public void regIPVisitConfigChgCallback(ConfigUpdateCallback callback) {
-        blackListConfig.addUpdateCallback(callback);
-        whiteListConfig.addUpdateCallback(callback);
+        blacklistConfigHolder.addUpdateCallback(callback);
+        whitelistConfigHolder.addUpdateCallback(callback);
     }
 
     public boolean needChkIllegalIP() {
-        return (!blackListConfig.isEmptyConfig()
-                || CommonConfigHolder.getInstance().isEnableWhiteList());
+        return (blacklistConfigHolder.needCheckBlacklist()
+                || whitelistConfigHolder.needCheckWhitelist());
     }
 
     public boolean isIllegalIP(String strRemoteIP) {
         return strRemoteIP == null
-                || blackListConfig.isContain(strRemoteIP)
-                || (CommonConfigHolder.getInstance().isEnableWhiteList() && 
!whiteListConfig.isContain(strRemoteIP));
+                || blacklistConfigHolder.isIllegalIP(strRemoteIP)
+                || whitelistConfigHolder.isIllegalIP(strRemoteIP);
+    }
+
+    // get mx configure info
+    public Map<String, Map<String, String>> getMxPropertiesMaps() {
+        return mxConfig.getMxPropertiesMaps();
+    }
+
+    public Map<String, String> getMxProperties() {
+        return mxConfig.getHolder();
     }
 
     public boolean addMxProperties(Map<String, String> result) {
@@ -260,22 +318,6 @@ public class ConfigManager {
         }
     }
 
-    public Map<String, Map<String, String>> getMxPropertiesMaps() {
-        return mxConfig.getMxPropertiesMaps();
-    }
-
-    public Map<String, String> getGroupIdMappingProperties() {
-        return groupIdConfig.getGroupIdMappingProperties();
-    }
-
-    public Map<String, Map<String, String>> getStreamIdMappingProperties() {
-        return groupIdConfig.getStreamIdMappingProperties();
-    }
-
-    public Map<String, String> getGroupIdEnableMappingProperties() {
-        return groupIdConfig.getGroupIdEnableMappingProperties();
-    }
-
     public PropertiesConfigHolder getTopicConfig() {
         return topicConfig;
     }
@@ -395,11 +437,17 @@ public class ConfigManager {
                     }
 
                     for (DataProxyTopicInfo topic : 
configJson.getData().getTopicList()) {
+                        if (topic == null
+                                || !topic.isValid()
+                                || 
StringUtils.isBlank(topic.getInlongGroupId())
+                                || StringUtils.isBlank(topic.getTopic())) {
+                            continue;
+                        }
                         if (!StringUtils.isEmpty(topic.getM())) {
                             groupIdToMValue.put(topic.getInlongGroupId(), 
topic.getM());
                         }
                         if (!StringUtils.isEmpty(topic.getTopic())) {
-                            groupIdToTopic.put(topic.getInlongGroupId(), 
topic.getTopic());
+                            
groupIdToTopic.put(topic.getInlongGroupId().trim(), topic.getTopic().trim());
                         }
                     }
                     configManager.updateMxProperties(groupIdToMValue);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
new file mode 100644
index 000000000..4104ce409
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * properties to map
+ */
+public abstract class PropertiesHolder extends ConfigHolder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PropertiesHolder.class);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    protected final ConcurrentHashMap<String, String> confHolder = new 
ConcurrentHashMap<>();
+
+    public PropertiesHolder(String fileName) {
+        super(fileName);
+    }
+
+    public boolean fullUpdateConfigMap(Map<String, String> newConfigMap) {
+        if (newConfigMap == null || newConfigMap.isEmpty()) {
+            return false;
+        }
+        Map<String, String> filterMap = filterInValidRecords(newConfigMap);
+        if (filterMap.isEmpty()) {
+            LOG.info("Update properties {}, but the records are all illegal 
{}",
+                    getFileName(), newConfigMap);
+            return false;
+        }
+        return compAndStorePropertiesToFile(filterMap);
+    }
+
+    public boolean insertNewConfigMap(Map<String, String> insertConfigMap) {
+        return insertOrRemoveProperties(true, insertConfigMap);
+    }
+
+    public boolean deleteConfigMap(Map<String, String> rmvConfigMap) {
+        return insertOrRemoveProperties(false, rmvConfigMap);
+    }
+
+    protected abstract Map<String, String> filterInValidRecords(Map<String, 
String> configMap);
+
+    protected abstract boolean updateCacheData();
+
+    @Override
+    protected boolean loadFromFileToHolder() {
+        readWriteLock.readLock().lock();
+        try {
+            Map<String, String> loadMap = loadConfigFromFile();
+            if (loadMap == null || loadMap.isEmpty()) {
+                LOG.info("Load changed properties {}, but no records 
configured", getFileName());
+                return false;
+            }
+            // filter blank items
+            Map<String, String> filteredMap = filterInValidRecords(loadMap);
+            if (filteredMap.isEmpty()) {
+                LOG.info("Load changed properties {}, but the records are all 
illegal {}",
+                        getFileName(), loadMap);
+                return false;
+            }
+            // remove records
+            Set<String> rmvKeys = new HashSet<>();
+            for (Map.Entry<String, String> entry : confHolder.entrySet()) {
+                if (entry == null || entry.getKey() == null) {
+                    continue;
+                }
+                if (!filteredMap.containsKey(entry.getKey())) {
+                    rmvKeys.add(entry.getKey());
+                }
+            }
+            for (String tmpKey : rmvKeys) {
+                confHolder.remove(tmpKey);
+            }
+            // insert records
+            Set<String> repKeys = new HashSet<>();
+            for (Map.Entry<String, String> entry : filteredMap.entrySet()) {
+                if (!entry.getValue().equals(confHolder.get(entry.getKey()))) {
+                    confHolder.put(entry.getKey(), entry.getValue());
+                    repKeys.add(entry.getKey());
+                }
+            }
+            if (rmvKeys.isEmpty() && repKeys.isEmpty()) {
+                return false;
+            }
+            // update cache data
+            boolean result = updateCacheData();
+            // output update result
+            LOG.info("Load changed properties {}, loaded config {}, updated 
holder {}, updated cache {}",
+                    getFileName(), loadMap, confHolder, result);
+            return true;
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+    }
+
+    private synchronized boolean insertOrRemoveProperties(boolean isInsert,
+            Map<String, String> changeConfigMap) {
+        if (changeConfigMap == null || changeConfigMap.isEmpty()) {
+            return false;
+        }
+        Map<String, String> filteredMap = 
filterInValidRecords(changeConfigMap);
+        if (filteredMap.isEmpty()) {
+            LOG.info("Part {} properties {}, but the records are all illegal 
{}",
+                    (isInsert ? "insert" : "remove"), getFileName(), 
changeConfigMap);
+            return false;
+        }
+        boolean changed = false;
+        Map<String, String> newConfigMap = forkHolder();
+        if (isInsert) {
+            for (Map.Entry<String, String> entry : filteredMap.entrySet()) {
+                String oldValue = newConfigMap.put(entry.getKey(), 
entry.getValue());
+                if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+                    changed = true;
+                }
+            }
+        } else {
+            for (Map.Entry<String, String> entry : filteredMap.entrySet()) {
+                String oldValue = newConfigMap.remove(entry.getKey());
+                if (oldValue != null) {
+                    changed = true;
+                }
+            }
+        }
+        if (!changed) {
+            return false;
+        }
+        return compAndStorePropertiesToFile(newConfigMap);
+    }
+
+    private boolean compAndStorePropertiesToFile(Map<String, String> 
newConfigMap) {
+        if (newConfigMap == null || newConfigMap.isEmpty()) {
+            return false;
+        }
+        boolean changed = false;
+        for (Map.Entry<String, String> entry : newConfigMap.entrySet()) {
+            if (!entry.getValue().equals(confHolder.get(entry.getKey()))) {
+                changed = true;
+                break;
+            }
+        }
+        if (!changed) {
+            for (Map.Entry<String, String> entry : confHolder.entrySet()) {
+                if (entry == null || entry.getKey() == null) {
+                    continue;
+                }
+                if (!newConfigMap.containsKey(entry.getKey())) {
+                    changed = true;
+                    break;
+                }
+            }
+        }
+        if (!changed) {
+            return false;
+        }
+        List<String> lines = new ArrayList<>();
+        for (Map.Entry<String, String> entry : newConfigMap.entrySet()) {
+            lines.add(entry.getKey() + "=" + entry.getValue());
+        }
+        return storeConfigToFile(lines);
+    }
+
+    /**
+     * fork current cached records
+     */
+    private Map<String, String> forkHolder() {
+        Map<String, String> tmpHolder = new HashMap<>();
+        if (confHolder != null) {
+            tmpHolder.putAll(confHolder);
+        }
+        return tmpHolder;
+    }
+
+    /**
+     * load from holder
+     */
+    private boolean storeConfigToFile(List<String> configLines) {
+        boolean isSuccess = false;
+        String filePath = getFilePath();
+        if (StringUtils.isBlank(filePath)) {
+            LOG.error("Error in writing file {} as the file path is null.", 
getFileName());
+        }
+        readWriteLock.writeLock().lock();
+        try {
+            File sourceFile = new File(filePath);
+            File targetFile = new File(getNextBackupFileName());
+            File tmpNewFile = new File(getFileName() + ".tmp");
+
+            if (sourceFile.exists()) {
+                FileUtils.copyFile(sourceFile, targetFile);
+            }
+            FileUtils.writeLines(tmpNewFile, configLines);
+            FileUtils.copyFile(tmpNewFile, sourceFile);
+            tmpNewFile.delete();
+            isSuccess = true;
+            getFileChanged().set(true);
+        } catch (Throwable ex) {
+            LOG.error("Error in writing file {}", getFileName(), ex);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+        return isSuccess;
+    }
+
+    private Map<String, String> loadConfigFromFile() {
+        Map<String, String> result = new HashMap<>();
+        if (StringUtils.isBlank(getFileName())) {
+            LOG.error("Fail to load properties {} as the file name is null.", 
getFileName());
+            return result;
+        }
+        InputStream inStream = null;
+        try {
+            URL url = getClass().getClassLoader().getResource(getFileName());
+            inStream = url != null ? url.openStream() : null;
+            if (inStream == null) {
+                LOG.error("Fail to load properties {} as the input stream is 
null", getFileName());
+                return result;
+            }
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+        } catch (Throwable e) {
+            LOG.error("Fail to load properties {}", getFileName(), e);
+        } finally {
+            if (null != inStream) {
+                try {
+                    inStream.close();
+                } catch (IOException e) {
+                    LOG.error("Fail in inStream.close for file {}", 
getFileName(), e);
+                }
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java
new file mode 100644
index 000000000..8554046ab
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+/**
+ * save black list configure to list
+ */
+public class BlackListConfigHolder extends VisitConfigHolder {
+
+    private static final String blacklistFileName = "blacklist.properties";
+
+    public BlackListConfigHolder() {
+        super(true, blacklistFileName);
+    }
+
+    public boolean needCheckBlacklist() {
+        return !isEmptyConfig();
+    }
+
+    public boolean isIllegalIP(String remoteIP) {
+        return isContain(remoteIP);
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
new file mode 100644
index 000000000..f59b651a9
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import org.apache.inlong.dataproxy.config.PropertiesHolder;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Save cluster configure info
+ */
+public class GroupIdNumConfigHolder extends PropertiesHolder {
+
+    private static final String groupIdNumConfigFileName = 
"groupid_mapping.properties";
+    private static final String GROUPID_VALUE_SPLITTER = "#";
+    private static final Logger LOG = 
LoggerFactory.getLogger(GroupIdNumConfigHolder.class);
+
+    private ConcurrentHashMap<String, String> groupIdNumMap =
+            new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
streamIdNumMap =
+            new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Boolean> groupIdNumEnableMap = new 
ConcurrentHashMap<>();
+
+    public GroupIdNumConfigHolder() {
+        super(groupIdNumConfigFileName);
+    }
+
+    public boolean isEnableNum2NameTrans(String groupIdNum) {
+        return groupIdNumEnableMap.getOrDefault(groupIdNum, Boolean.FALSE);
+    }
+
+    public String getGroupIdNameByNum(String groupIdNum) {
+        return groupIdNumMap.get(groupIdNum);
+    }
+
+    public String getStreamIdNameByIdNum(String groupIdNum, String 
streamIdNum) {
+        ConcurrentHashMap<String, String> tmpMap = 
streamIdNumMap.get(groupIdNum);
+        if (tmpMap == null) {
+            return null;
+        }
+        return tmpMap.get(streamIdNum);
+    }
+
+    public boolean isGroupIdNumConfigEmpty() {
+        return groupIdNumMap.isEmpty();
+    }
+
+    public boolean isStreamIdNumConfigEmpty() {
+        return streamIdNumMap.isEmpty();
+    }
+
+    public ConcurrentHashMap<String, String> getGroupIdNumMap() {
+        return groupIdNumMap;
+    }
+
+    public ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
getStreamIdNumMap() {
+        return streamIdNumMap;
+    }
+
+    public ConcurrentHashMap<String, Boolean> getGroupIdNumEnableMap() {
+        return groupIdNumEnableMap;
+    }
+
+    @Override
+    protected Map<String, String> filterInValidRecords(Map<String, String> 
configMap) {
+        Map<String, String> result = new HashMap<>(configMap.size());
+        for (Map.Entry<String, String> entry : configMap.entrySet()) {
+            if (entry == null
+                    || StringUtils.isBlank(entry.getKey())
+                    || !entry.getKey().contains(GROUPID_VALUE_SPLITTER)) {
+                continue;
+            }
+            String[] keyArray = StringUtils.split(entry.getKey(), 
GROUPID_VALUE_SPLITTER);
+            if (keyArray.length != 3) {
+                continue;
+            }
+            if (StringUtils.isBlank(keyArray[0])
+                    || StringUtils.isBlank(keyArray[1])
+                    || StringUtils.isBlank(keyArray[2])) {
+                continue;
+            }
+            if (StringUtils.isNotBlank(entry.getValue())) {
+                try {
+                    MAP_SPLITTER.split(entry.getValue());
+                } catch (Throwable e) {
+                    continue;
+                }
+            }
+            result.put(entry.getKey().trim(), entry.getValue().trim());
+        }
+        return result;
+    }
+
+    @Override
+    protected boolean updateCacheData() {
+        boolean tmpEnable;
+        Map<String, String> tmpValueMap = new HashMap<>();
+        Map<String, String> valueMap = new HashMap<>();
+        Map<String, String> tmpGroupIdNumMap = new HashMap<>();
+        Map<String, Map<String, String>> tmpStreamIdNumMap = new HashMap<>();
+        Map<String, Boolean> tmpGroupIdNumEnableMap = new HashMap<>();
+        // parse configure data
+        for (Map.Entry<String, String> entry : confHolder.entrySet()) {
+            if (entry == null
+                    || StringUtils.isBlank(entry.getKey())
+                    || !entry.getKey().contains(GROUPID_VALUE_SPLITTER)) {
+                continue;
+            }
+            String[] keyArray = StringUtils.split(entry.getKey(), 
GROUPID_VALUE_SPLITTER);
+            if (keyArray.length != 3) {
+                continue;
+            }
+            if (StringUtils.isBlank(keyArray[0])
+                    || StringUtils.isBlank(keyArray[1])
+                    || StringUtils.isBlank(keyArray[2])) {
+                continue;
+            }
+            tmpEnable = Boolean.parseBoolean(keyArray[2].trim());
+            tmpGroupIdNumMap.put(keyArray[0].trim(), keyArray[1].trim());
+            tmpGroupIdNumEnableMap.put(keyArray[0].trim(), tmpEnable);
+            // parse value
+            if (StringUtils.isNotBlank(entry.getValue())) {
+                tmpValueMap.clear();
+                valueMap.clear();
+                try {
+                    tmpValueMap = MAP_SPLITTER.split(entry.getValue());
+                } catch (Throwable e) {
+                    continue;
+                }
+                if (tmpValueMap.isEmpty()) {
+                    continue;
+                }
+                for (Map.Entry<String, String> entry1 : 
tmpValueMap.entrySet()) {
+                    if (entry1 == null
+                            || StringUtils.isBlank(entry1.getKey())
+                            || StringUtils.isBlank(entry1.getValue())) {
+                        continue;
+                    }
+                    valueMap.put(entry1.getKey().trim(), 
entry.getValue().trim());
+                }
+                if (!valueMap.isEmpty()) {
+                    tmpStreamIdNumMap.put(keyArray[0].trim(), valueMap);
+                }
+            }
+        }
+        // update cached groupId num2Name data
+        updateCachedGroupIdNumMap(tmpGroupIdNumMap);
+        // update cached groupId num2Name enable data
+        updateCachedGroupIdNumEnableMap(tmpGroupIdNumEnableMap);
+        // update cached streamId num2Name enable data
+        updateCachedStreamIdNumMap(tmpStreamIdNumMap);
+        return true;
+    }
+
+    private void updateCachedGroupIdNumMap(Map<String, String> 
newGroupIdNumMap) {
+        if (newGroupIdNumMap.isEmpty()) {
+            groupIdNumMap.clear();
+            return;
+        }
+        for (Map.Entry<String, String> entry : newGroupIdNumMap.entrySet()) {
+            if (!entry.getValue().equals(groupIdNumMap.get(entry.getKey()))) {
+                groupIdNumMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+        // remove records
+        Set<String> rmvKeys = new HashSet<>();
+        for (Map.Entry<String, String> entry : groupIdNumMap.entrySet()) {
+            if (!newGroupIdNumMap.containsKey(entry.getKey())) {
+                rmvKeys.add(entry.getKey());
+            }
+        }
+        for (String tmpKey : rmvKeys) {
+            groupIdNumMap.remove(tmpKey);
+        }
+    }
+
+    private void updateCachedGroupIdNumEnableMap(Map<String, Boolean> 
newGroupNumEnableMap) {
+        if (newGroupNumEnableMap.isEmpty()) {
+            groupIdNumEnableMap.clear();
+            return;
+        }
+        for (Map.Entry<String, Boolean> entry : 
newGroupNumEnableMap.entrySet()) {
+            if 
(!entry.getValue().equals(groupIdNumEnableMap.get(entry.getKey()))) {
+                groupIdNumEnableMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+        // remove records
+        Set<String> rmvKeys = new HashSet<>();
+        for (Map.Entry<String, Boolean> entry : 
groupIdNumEnableMap.entrySet()) {
+            if (!newGroupNumEnableMap.containsKey(entry.getKey())) {
+                rmvKeys.add(entry.getKey());
+            }
+        }
+        for (String tmpKey : rmvKeys) {
+            groupIdNumEnableMap.remove(tmpKey);
+        }
+    }
+
+    private void updateCachedStreamIdNumMap(Map<String, Map<String, String>> 
newStreamIdNumMap) {
+        if (newStreamIdNumMap.isEmpty()) {
+            streamIdNumMap.clear();
+            return;
+        }
+        Map<String, String> newDataMap;
+        Set<String> rmvKeys = new HashSet<>();
+        ConcurrentHashMap<String, String> storedMap;
+        // insert cached streamId num2Name data
+        for (Map.Entry<String, Map<String, String>> entry : 
newStreamIdNumMap.entrySet()) {
+            storedMap = streamIdNumMap.get(entry.getKey());
+            if (storedMap == null) {
+                storedMap = new ConcurrentHashMap<>(entry.getValue().size());
+                storedMap.putAll(entry.getValue());
+                streamIdNumMap.put(entry.getKey(), storedMap);
+            } else {
+                newDataMap = entry.getValue();
+                for (Map.Entry<String, String> entry1 : newDataMap.entrySet()) 
{
+                    if 
(!entry1.getValue().equals(storedMap.get(entry.getKey()))) {
+                        storedMap.put(entry1.getKey(), entry1.getValue());
+                    }
+                }
+                for (Map.Entry<String, String> entry1 : storedMap.entrySet()) {
+                    if (!newDataMap.containsKey(entry1.getKey())) {
+                        rmvKeys.add(entry.getKey());
+                    }
+                }
+                for (String tmpKey : rmvKeys) {
+                    storedMap.remove(tmpKey);
+                }
+                if (storedMap.isEmpty()) {
+                    streamIdNumMap.remove(entry.getKey());
+                }
+            }
+        }
+        // remove cached streamId num2Name data
+        rmvKeys.clear();
+        for (Map.Entry<String, ConcurrentHashMap<String, String>> entry : 
streamIdNumMap.entrySet()) {
+            if (!newStreamIdNumMap.containsKey(entry.getKey())) {
+                rmvKeys.add(entry.getKey());
+            }
+        }
+        for (String tmpKey : rmvKeys) {
+            streamIdNumMap.remove(tmpKey);
+        }
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
deleted file mode 100644
index d34b19522..000000000
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.holder;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * groupId to m value
- */
-public class GroupIdPropertiesHolder extends PropertiesConfigHolder {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(GroupIdPropertiesHolder.class);
-    private static final String GROUPID_VALUE_SPLITTER = "#";
-
-    private Map<String, String> groupIdMappingProperties =
-            new HashMap<String, String>();
-    private Map<String, Map<String, String>> streamIdMappingProperties =
-            new HashMap<String, Map<String, String>>();
-    private Map<String, String> groupIdEnableMappingProperties =
-            new HashMap<String, String>();
-
-    public GroupIdPropertiesHolder(String fileName) {
-        super(fileName);
-    }
-
-    @Override
-    public boolean loadFromFileToHolder() {
-        super.loadFromFileToHolder();
-        try {
-            Map<String, String> tmpGroupIdMappingProperties =
-                    new HashMap<>();
-            Map<String, Map<String, String>> tmpStreamIdMappingProperties =
-                    new HashMap<>();
-            Map<String, String> tmpGroupIdEnableMappingProperties = new 
HashMap<>();
-            for (Map.Entry<String, String> entry : 
super.getHolder().entrySet()) {
-                String[] sArray = StringUtils.split(entry.getKey(), 
GROUPID_VALUE_SPLITTER);
-                if (sArray.length != 3) {
-                    LOG.warn("invalid groupId key {}", entry.getKey());
-                    continue;
-                }
-                tmpGroupIdMappingProperties.put(sArray[0].trim(), 
sArray[1].trim());
-                tmpGroupIdEnableMappingProperties.put(sArray[0].trim(), 
sArray[2].trim());
-                if (StringUtils.isNotBlank(entry.getValue())) {
-                    tmpStreamIdMappingProperties.put(sArray[0].trim(),
-                            MAP_SPLITTER.split(entry.getValue()));
-                }
-            }
-            groupIdMappingProperties = tmpGroupIdMappingProperties;
-            streamIdMappingProperties = tmpStreamIdMappingProperties;
-            groupIdEnableMappingProperties = tmpGroupIdEnableMappingProperties;
-        } catch (Exception e) {
-            LOG.error("loadConfig error :", e);
-        }
-        return true;
-    }
-
-    public Map<String, String> getGroupIdMappingProperties() {
-        return groupIdMappingProperties;
-    }
-
-    public Map<String, Map<String, String>> getStreamIdMappingProperties() {
-        return streamIdMappingProperties;
-    }
-
-    public Map<String, String> getGroupIdEnableMappingProperties() {
-        return groupIdEnableMappingProperties;
-    }
-}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
index 75f851fdb..6f75320da 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
@@ -43,7 +43,7 @@ public class MQClusterConfigHolder extends 
PropertiesConfigHolder {
      * load from file
      */
     @Override
-    public boolean loadFromFileToHolder() {
+    protected boolean loadFromFileToHolder() {
         super.loadFromFileToHolder();
         Map<String, String> tmpUrl2token = new HashMap<>();
         for (Map.Entry<String, String> entry : getHolder().entrySet()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
index c5c65ce7d..74b81999e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
@@ -40,7 +40,7 @@ public class MxPropertiesHolder extends 
PropertiesConfigHolder {
      * load m from file
      */
     @Override
-    public boolean loadFromFileToHolder() {
+    protected boolean loadFromFileToHolder() {
         super.loadFromFileToHolder();
         try {
             for (Map.Entry<String, String> entry : getHolder().entrySet()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
index a75683c78..2894f19bf 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
@@ -50,7 +50,7 @@ public class PropertiesConfigHolder extends ConfigHolder {
     }
 
     @Override
-    public boolean loadFromFileToHolder() {
+    protected boolean loadFromFileToHolder() {
         readWriteLock.readLock().lock();
         try {
             Map<String, String> tmpHolder = loadProperties();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IPVisitConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
similarity index 95%
rename from 
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IPVisitConfigHolder.java
rename to 
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
index 8f73185de..d7a7277b8 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IPVisitConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
@@ -38,27 +38,27 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /**
  * save to list
  */
-public class IPVisitConfigHolder extends ConfigHolder {
+public class VisitConfigHolder extends ConfigHolder {
 
     private static final int MIN_NETMASK_BITS = 0;
     private static final int MAX_NETMASK_BITS = 32;
     private static final String MASKIP_NETMASK_SEP = "/";
     private static final String IPV4ADDR_TMP =
             
"((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))";
-    private static final Logger LOG = 
LoggerFactory.getLogger(IPVisitConfigHolder.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(VisitConfigHolder.class);
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     private final boolean isBlackList;
     private final ConcurrentHashMap<String, Long> confHolder = new 
ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, Long> ipAddrHolder = new 
ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, Pair<Integer, Integer>> 
ipSegmentHolder = new ConcurrentHashMap<>();
 
-    public IPVisitConfigHolder(boolean isBlackList, String fileName) {
+    public VisitConfigHolder(boolean isBlackList, String fileName) {
         super(fileName);
         this.isBlackList = isBlackList;
     }
 
     @Override
-    public boolean loadFromFileToHolder() {
+    protected boolean loadFromFileToHolder() {
         readWriteLock.writeLock().lock();
         try {
             Map<String, Long> tmpHolder = loadFile();
@@ -90,9 +90,7 @@ public class IPVisitConfigHolder extends ConfigHolder {
             int hostAddrMask;
             tmpKeys.clear();
             for (Map.Entry<String, Long> entry : tmpHolder.entrySet()) {
-                if (entry == null
-                        || entry.getKey() == null
-                        || StringUtils.isBlank(entry.getKey())) {
+                if (entry == null || StringUtils.isBlank(entry.getKey())) {
                     continue;
                 }
                 if (!confHolder.containsKey(entry.getKey())) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WeightConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WeightConfigHolder.java
new file mode 100644
index 000000000..54248482a
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WeightConfigHolder.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import org.apache.inlong.dataproxy.config.PropertiesHolder;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Save weight configure info
+ */
+public class WeightConfigHolder extends PropertiesHolder {
+
+    private static final String weightConfigFileName = "weight.properties";
+    private static final Logger LOG = 
LoggerFactory.getLogger(WeightConfigHolder.class);
+
+    private static final String KEY_WEIGHT_CPU = "cpuWeight";
+    private static final double VAL_DEF_WEIGHT_CPU = 1;
+    private static final String KEY_WEIGHT_NET_IN = "netinWeight";
+    private static final double VAL_DEF_WEIGHT_NET_IN = 0.5;
+    private static final String KEY_WEIGHT_NET_OUT = "netoutWeight";
+    private static final double VAL_DEF_WEIGHT_NET_OUT = 0.5;
+    private static final String KEY_WEIGHT_TCP = "tcpWeight";
+    private static final double VAL_DEF_WEIGHT_TCP = 0;
+    private static final String KEY_WEIGHT_CPU_THRESHOLD = "cpuThreshold";
+    private static final double VAL_DEF_WEIGHT_CPU_THRESHOLD = 85;
+    // cache configure
+    private final AtomicDouble cachedCpuWeight = new 
AtomicDouble(VAL_DEF_WEIGHT_CPU);
+    private final AtomicDouble cachedNetInWeight = new 
AtomicDouble(VAL_DEF_WEIGHT_NET_IN);
+    private final AtomicDouble cachedNetOutWeight = new 
AtomicDouble(VAL_DEF_WEIGHT_NET_OUT);
+    private final AtomicDouble cachedTcpWeight = new 
AtomicDouble(VAL_DEF_WEIGHT_TCP);
+    private final AtomicDouble cachedCpuThreshold = new 
AtomicDouble(VAL_DEF_WEIGHT_CPU_THRESHOLD);
+
+    public WeightConfigHolder() {
+        super(weightConfigFileName);
+    }
+
+    public double getCachedCpuWeight() {
+        return cachedCpuWeight.get();
+    }
+
+    public double getCachedNetInWeight() {
+        return cachedNetInWeight.get();
+    }
+
+    public double getCachedNetOutWeight() {
+        return cachedNetOutWeight.get();
+    }
+
+    public double getCachedTcpWeight() {
+        return cachedTcpWeight.get();
+    }
+
+    public double getCachedCpuThreshold() {
+        return cachedCpuThreshold.get();
+    }
+
+    @Override
+    protected Map<String, String> filterInValidRecords(Map<String, String> 
configMap) {
+        Map<String, String> filteredMap = new HashMap<>(configMap.size());
+        for (Map.Entry<String, String> entry : configMap.entrySet()) {
+            if (entry == null
+                    || StringUtils.isBlank(entry.getKey())
+                    || StringUtils.isBlank(entry.getValue())) {
+                continue;
+            }
+            try {
+                Double.parseDouble(entry.getValue());
+            } catch (Throwable e) {
+                continue;
+            }
+            filteredMap.put(entry.getKey().trim(), entry.getValue().trim());
+        }
+        return filteredMap;
+    }
+
+    @Override
+    protected boolean updateCacheData() {
+        // get cpu weight
+        double newVal = VAL_DEF_WEIGHT_CPU;
+        String tmpStrVal = confHolder.get(KEY_WEIGHT_CPU);
+        if (StringUtils.isNotBlank(tmpStrVal)) {
+            try {
+                newVal = Double.parseDouble(tmpStrVal);
+            } catch (Throwable e) {
+                //
+            }
+        }
+        cachedCpuWeight.set(newVal);
+        // get net-in weight
+        newVal = VAL_DEF_WEIGHT_NET_IN;
+        tmpStrVal = confHolder.get(KEY_WEIGHT_NET_IN);
+        if (StringUtils.isNotBlank(tmpStrVal)) {
+            try {
+                newVal = Double.parseDouble(tmpStrVal);
+            } catch (Throwable e) {
+                //
+            }
+        }
+        cachedNetInWeight.set(newVal);
+        // get net-out weight
+        newVal = VAL_DEF_WEIGHT_NET_OUT;
+        tmpStrVal = confHolder.get(KEY_WEIGHT_NET_OUT);
+        if (StringUtils.isNotBlank(tmpStrVal)) {
+            try {
+                newVal = Double.parseDouble(tmpStrVal);
+            } catch (Throwable e) {
+                //
+            }
+        }
+        cachedNetOutWeight.set(newVal);
+        // get tcp weight
+        newVal = VAL_DEF_WEIGHT_TCP;
+        tmpStrVal = confHolder.get(KEY_WEIGHT_TCP);
+        if (StringUtils.isNotBlank(tmpStrVal)) {
+            try {
+                newVal = Double.parseDouble(tmpStrVal);
+            } catch (Throwable e) {
+                //
+            }
+        }
+        cachedTcpWeight.set(newVal);
+        // get cpu threshold weight
+        newVal = VAL_DEF_WEIGHT_CPU_THRESHOLD;
+        tmpStrVal = confHolder.get(KEY_WEIGHT_CPU_THRESHOLD);
+        if (StringUtils.isNotBlank(tmpStrVal)) {
+            try {
+                newVal = Double.parseDouble(tmpStrVal);
+            } catch (Throwable e) {
+                //
+            }
+        }
+        cachedCpuThreshold.set(newVal);
+        return true;
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WhiteListConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WhiteListConfigHolder.java
new file mode 100644
index 000000000..f40ac7748
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WhiteListConfigHolder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+
+/**
+ * save white list configure to list
+ */
+public class WhiteListConfigHolder extends VisitConfigHolder {
+
+    private static final String whitelistFileName = "whitelist.properties";
+
+    public WhiteListConfigHolder() {
+        super(false, whitelistFileName);
+    }
+
+    public boolean needCheckWhitelist() {
+        return CommonConfigHolder.getInstance().isEnableWhiteList();
+    }
+
+    public boolean isIllegalIP(String remoteIP) {
+        return (CommonConfigHolder.getInstance().isEnableWhiteList() && 
!isContain(remoteIP));
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 8d1395f58..26407038e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -44,8 +44,8 @@ import org.apache.http.util.EntityUtils;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -153,27 +153,26 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         
heartbeatMsg.setInCharges(CommonConfigHolder.getInstance().getClusterIncharges());
         
heartbeatMsg.setExtTag(CommonConfigHolder.getInstance().getClusterExtTag());
 
-        Map<String, String> groupIdMappings = 
configManager.getGroupIdMappingProperties();
-        Map<String, Map<String, String>> streamIdMappings = 
configManager.getStreamIdMappingProperties();
-        Map<String, String> groupIdEnableMappings = 
configManager.getGroupIdEnableMappingProperties();
+        ConcurrentHashMap<String, String> groupIdMappings =
+                configManager.getGroupIdNumMap();
+        ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
streamIdMappings =
+                configManager.getStreamIdNumMap();
         List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
         for (Entry<String, String> entry : groupIdMappings.entrySet()) {
             String groupIdNum = entry.getKey();
             String groupId = entry.getValue();
             GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
             groupHeartbeat.setInlongGroupId(groupId);
-            String status = groupIdEnableMappings.getOrDefault(groupIdNum, 
"disabled");
-            status = status.equals("TRUE") ? "enabled" : "disabled";
+            String status = configManager.isEnableNum2NameTrans(groupIdNum) ? 
"enabled" : "disabled";
             groupHeartbeat.setStatus(status);
             groupHeartbeats.add(groupHeartbeat);
         }
         heartbeatMsg.setGroupHeartbeats(groupHeartbeats);
 
         List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
-        for (Entry<String, Map<String, String>> entry : 
streamIdMappings.entrySet()) {
+        for (Entry<String, ConcurrentHashMap<String, String>> entry : 
streamIdMappings.entrySet()) {
             String groupIdNum = entry.getKey();
-            String status = groupIdEnableMappings.getOrDefault(groupIdNum, 
"disabled");
-            status = status.equals("TRUE") ? "enabled" : "disabled";
+            String status = configManager.isEnableNum2NameTrans(groupIdNum) ? 
"enabled" : "disabled";
             String groupId = groupIdMappings.get(groupIdNum);
             for (Entry<String, String> streamEntry : 
entry.getValue().entrySet()) {
                 String streamId = streamEntry.getValue();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 12678b57c..5b2d725fe 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -359,19 +359,12 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                 String groupIdNum = 
commonAttrMap.get(AttrConstants.GROUPID_NUM);
                 String streamIdNum = 
commonAttrMap.get(AttrConstants.STREAMID_NUM);
                 // get configured groupId and steamId by numbers
-                if (configManager.getGroupIdMappingProperties() != null
-                        && configManager.getStreamIdMappingProperties() != 
null) {
-                    groupId = 
configManager.getGroupIdMappingProperties().get(groupIdNum);
-                    streamId = 
(configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
-                            ? null
-                            : 
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+                if (!configManager.isGroupIdNumConfigEmpty()
+                        && !configManager.isStreamIdNumConfigEmpty()) {
+                    groupId = configManager.getGroupIdNameByNum(groupIdNum);
+                    streamId = 
configManager.getStreamIdNameByIdNum(groupIdNum, streamIdNum);
                     if (groupId != null && streamId != null) {
-                        String enableTrans =
-                                
(configManager.getGroupIdEnableMappingProperties() == null)
-                                        ? null
-                                        : 
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
-                        if (("TRUE".equalsIgnoreCase(enableTrans)
-                                && "TRUE".equalsIgnoreCase(num2name))) {
+                        if ((configManager.isEnableNum2NameTrans(groupIdNum) 
&& "TRUE".equalsIgnoreCase(num2name))) {
                             String extraAttr = "groupId=" + groupId + "&" + 
"streamId=" + streamId;
                             message.setData(newBinMsg(message.getData(), 
extraAttr));
                         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index b348777c3..4666e617f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -235,18 +235,13 @@ public class SimpleMessageHandler extends 
ChannelInboundHandlerAdapter {
             String groupIdNum = commonAttrMap.get(AttrConstants.GROUPID_NUM);
             String streamIdNum = commonAttrMap.get(AttrConstants.STREAMID_NUM);
 
-            if (configManager.getGroupIdMappingProperties() != null
-                    && configManager.getStreamIdMappingProperties() != null) {
-                groupId = 
configManager.getGroupIdMappingProperties().get(groupIdNum);
-                streamId = 
(configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
-                        ? null
-                        : 
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+            if (!configManager.isGroupIdNumConfigEmpty()
+                    && !configManager.isStreamIdNumConfigEmpty()) {
+                groupId = configManager.getGroupIdNameByNum(groupIdNum);
+                streamId = configManager.getStreamIdNameByIdNum(groupIdNum, 
streamIdNum);
                 if (groupId != null && streamId != null) {
-                    String enableTrans = 
(configManager.getGroupIdEnableMappingProperties() == null)
-                            ? null
-                            : 
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
-                    if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
-                            .equalsIgnoreCase(num2name))) {
+                    if ((configManager.isEnableNum2NameTrans(groupIdNum)
+                            && "TRUE".equalsIgnoreCase(num2name))) {
                         String extraAttr = "groupId=" + groupId + "&" + 
"streamId=" + streamId;
                         message.setData(newBinMsg(message.getData(), 
extraAttr));
                     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index 83c9e241e..356561ff2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -179,11 +179,9 @@ public abstract class BaseSource
         // get max worker threads
         this.maxWorkerThreads = ConfStringUtils.getIntValue(context,
                 SourceConstants.SRCCXT_MAX_WORKER_THREADS, 
SourceConstants.VAL_DEF_WORKER_THREADS);
-        Preconditions.checkArgument((this.maxWorkerThreads >= 
SourceConstants.VAL_MIN_WORKER_THREADS
-                && this.maxWorkerThreads <= 
SourceConstants.VAL_MAX_WORKER_THREADS),
-                SourceConstants.SRCCXT_MAX_WORKER_THREADS + " must be in ["
-                        + SourceConstants.VAL_MIN_WORKER_THREADS + ", "
-                        + SourceConstants.VAL_MAX_WORKER_THREADS + "]");
+        Preconditions.checkArgument((this.maxWorkerThreads >= 
SourceConstants.VAL_MIN_WORKER_THREADS),
+                SourceConstants.SRCCXT_MAX_WORKER_THREADS + " must be >= "
+                        + SourceConstants.VAL_MIN_WORKER_THREADS);
         // get max read idle time
         this.maxReadIdleTimeMs = ConfStringUtils.getLongValue(context,
                 SourceConstants.SRCCXT_MAX_READ_IDLE_TIME_MS, 
SourceConstants.VAL_DEF_READ_IDLE_TIME_MS);
@@ -204,18 +202,12 @@ public abstract class BaseSource
         Preconditions.checkArgument(this.maxRcvBufferSize >= 
SourceConstants.VAL_MIN_RECEIVE_BUFFER_SIZE,
                 SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE + " must be >= "
                         + SourceConstants.VAL_MIN_RECEIVE_BUFFER_SIZE);
-        if (this.maxRcvBufferSize > 
SourceConstants.VAL_MAX_RECEIVE_BUFFER_SIZE) {
-            this.maxRcvBufferSize = 
SourceConstants.VAL_MAX_RECEIVE_BUFFER_SIZE;
-        }
         // get max send buffer size
         this.maxSendBufferSize = ConfStringUtils.getIntValue(context,
                 SourceConstants.SRCCXT_SEND_BUFFER_SIZE, 
SourceConstants.VAL_DEF_SEND_BUFFER_SIZE);
         Preconditions.checkArgument(this.maxSendBufferSize >= 
SourceConstants.VAL_MIN_SEND_BUFFER_SIZE,
                 SourceConstants.SRCCXT_SEND_BUFFER_SIZE + " must be >= "
                         + SourceConstants.VAL_MIN_SEND_BUFFER_SIZE);
-        if (this.maxSendBufferSize > SourceConstants.VAL_MAX_SEND_BUFFER_SIZE) 
{
-            this.maxSendBufferSize = SourceConstants.VAL_MAX_SEND_BUFFER_SIZE;
-        }
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 64bcf55eb..26f1fad6c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -61,7 +61,6 @@ public class SourceConstants {
     public static final String SRCCXT_MAX_WORKER_THREADS = "max-threads";
     public static final int VAL_DEF_WORKER_THREADS = 
Runtime.getRuntime().availableProcessors();
     public static final int VAL_MIN_WORKER_THREADS = 1;
-    public static final int VAL_MAX_WORKER_THREADS = 
Runtime.getRuntime().availableProcessors() * 2;
     // max connection count
     public static final String SRCCXT_MAX_CONNECTION_CNT = "connections";
     public static final int VAL_DEF_MAX_CONNECTION_CNT = 5000;
@@ -70,12 +69,10 @@ public class SourceConstants {
     public static final String SRCCXT_RECEIVE_BUFFER_SIZE = 
"receiveBufferSize";
     public static final int VAL_DEF_RECEIVE_BUFFER_SIZE = 64 * 1024;
     public static final int VAL_MIN_RECEIVE_BUFFER_SIZE = 0;
-    public static final int VAL_MAX_RECEIVE_BUFFER_SIZE = 100 * 1024 * 1024;
     // max send buffer size
     public static final String SRCCXT_SEND_BUFFER_SIZE = "sendBufferSize";
     public static final int VAL_DEF_SEND_BUFFER_SIZE = 64 * 1024;
     public static final int VAL_MIN_SEND_BUFFER_SIZE = 0;
-    public static final int VAL_MAX_SEND_BUFFER_SIZE = 100 * 1024 * 1024;
     // tcp parameter no delay
     public static final String SRCCXT_TCP_NO_DELAY = "tcpNoDelay";
     public static final boolean VAL_DEF_TCP_NO_DELAY = true;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index 58bed5f78..b45581755 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -265,17 +265,17 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             String confGroupId;
             String confStreamId;
             String strGroupIdNum = String.valueOf(this.groupIdNum);
-            if (configManager.getGroupIdMappingProperties() == null) {
-                source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
-                this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
-                this.errMsg = "GroupId-Mapping configuration is null";
-                return false;
-            }
-            confGroupId = 
configManager.getGroupIdMappingProperties().get(strGroupIdNum);
+            confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
             if (StringUtils.isBlank(confGroupId)) {
-                source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
-                this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
-                this.errMsg = String.format("Non-existing groupIdNum(%s) 
configuration", strGroupIdNum);
+                if (configManager.isGroupIdNumConfigEmpty()) {
+                    
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+                    this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
+                    this.errMsg = "GroupId-Mapping configuration is null";
+                } else {
+                    
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+                    this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
+                    this.errMsg = String.format("Non-existing groupIdNum(%s) 
configuration", strGroupIdNum);
+                }
                 return false;
             }
             if (StringUtils.isNotBlank(this.groupId) && 
!this.groupId.equalsIgnoreCase(confGroupId)) {
@@ -296,27 +296,19 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                     return false;
                 }
             } else {
-                if (configManager.getStreamIdMappingProperties() == null) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
-                    this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
-                    this.errMsg = "StreamId-Mapping configuration is null";
-                    return false;
-                }
-                Map<String, String> confStreamIdMap =
-                        
configManager.getStreamIdMappingProperties().get(strGroupIdNum);
-                if (confStreamIdMap == null) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
-                    this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
-                    this.errMsg = "GroupId in StreamId-Mapping configuration 
is null";
-                    return false;
-                }
                 String strStreamIdNum = String.valueOf(this.streamIdNum);
-                confStreamId = confStreamIdMap.get(strStreamIdNum);
+                confStreamId = 
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
                 if (StringUtils.isBlank(confStreamId)) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
-                    this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
-                    this.errMsg = String.format("Non-existing 
GroupId(%s)-StreamId(%s) configuration",
-                            strGroupIdNum, strStreamIdNum);
+                    if (configManager.isStreamIdNumConfigEmpty()) {
+                        
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+                        this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
+                        this.errMsg = "StreamId-Mapping configuration is null";
+                    } else {
+                        
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+                        this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
+                        this.errMsg = String.format("Non-existing 
GroupId(%s)-StreamId(%s) configuration",
+                                strGroupIdNum, strStreamIdNum);
+                    }
                     return false;
                 }
                 if (StringUtils.isNotBlank(this.streamId) && 
!this.streamId.equalsIgnoreCase(confStreamId)) {
@@ -330,10 +322,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                 this.streamId = confStreamId;
             }
             // check whether enable num 2 name translate
-            String enableTrans = 
(configManager.getGroupIdEnableMappingProperties() == null)
-                    ? null
-                    : 
configManager.getGroupIdEnableMappingProperties().get(strGroupIdNum);
-            if ("true".equalsIgnoreCase(enableTrans) && this.num2name) {
+            if (configManager.isEnableNum2NameTrans(strGroupIdNum) && 
this.num2name) {
                 this.transNum2Name = true;
             }
         } else {

Reply via email to