This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6e8d77a48 [INLONG-8106][DataProxy] Optimize ConfigManager
implementation ( part one ) (#8107)
6e8d77a48 is described below
commit 6e8d77a487baaeb6754f65a98be3c3c8d274d1ee
Author: Goson Zhang <[email protected]>
AuthorDate: Mon May 29 16:36:12 2023 +0800
[INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part one )
(#8107)
---
.../dataproxy/channel/FailoverChannelSelector.java | 14 +-
.../inlong/dataproxy/config/ConfigHolder.java | 8 +-
.../inlong/dataproxy/config/ConfigManager.java | 116 ++++++---
.../inlong/dataproxy/config/PropertiesHolder.java | 270 +++++++++++++++++++++
.../config/holder/BlackListConfigHolder.java | 38 +++
.../config/holder/GroupIdNumConfigHolder.java | 268 ++++++++++++++++++++
.../config/holder/GroupIdPropertiesHolder.java | 88 -------
.../config/holder/MQClusterConfigHolder.java | 2 +-
.../config/holder/MxPropertiesHolder.java | 2 +-
.../config/holder/PropertiesConfigHolder.java | 2 +-
...sitConfigHolder.java => VisitConfigHolder.java} | 12 +-
.../config/holder/WeightConfigHolder.java | 157 ++++++++++++
.../config/holder/WhiteListConfigHolder.java | 40 +++
.../dataproxy/heartbeat/HeartbeatManager.java | 17 +-
.../dataproxy/source/ServerMessageHandler.java | 17 +-
.../dataproxy/source/SimpleMessageHandler.java | 17 +-
.../inlong/dataproxy/source2/BaseSource.java | 14 +-
.../inlong/dataproxy/source2/SourceConstants.java | 3 -
.../dataproxy/source2/v0msg/CodecBinMsg.java | 55 ++---
19 files changed, 916 insertions(+), 224 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
index 5bdb1e8da..921d595ed 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
@@ -103,9 +103,7 @@ public class FailoverChannelSelector extends
AbstractChannelSelector {
*/
private List<String> splitChannelName(String channelName) {
List<String> fileMetricList = new ArrayList<String>();
- if (StringUtils.isEmpty(channelName)) {
- LOG.info("channel name is null!");
- } else {
+ if (StringUtils.isNotBlank(channelName)) {
fileMetricList = Arrays.asList(channelName.split("\\s+"));
}
return fileMetricList;
@@ -145,11 +143,9 @@ public class FailoverChannelSelector extends
AbstractChannelSelector {
this.slaveChannels.add(channel);
}
}
- LOG.info("masters:" + this.masterChannels);
- LOG.info("orders:" + this.orderChannels);
- LOG.info("slaves:" + this.slaveChannels);
- LOG.info("transfers:" + this.transferChannels);
- LOG.info("agentFileMetrics:" + this.agentFileMetricChannels);
- LOG.info("slaMetrics:" + this.slaMetricChannels);
+ LOG.info(
+ "Configure channels, masters={}, orders={}, slaves={},
transfers={}, agentFileMetrics={}, slaMetrics={}",
+ this.masterChannels, this.orderChannels, this.slaveChannels,
+ this.transferChannels, this.agentFileMetricChannels,
this.slaMetricChannels);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
index 7748470f5..d6e533494 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
@@ -79,7 +79,7 @@ public abstract class ConfigHolder {
*
* @return - true if configure updated
*/
- public abstract boolean loadFromFileToHolder();
+ protected abstract boolean loadFromFileToHolder();
/**
* check updater
@@ -92,7 +92,7 @@ public abstract class ConfigHolder {
if (configFile != null) {
this.lastModifyTime = configFile.lastModified();
}
- LOG.info("File {} has changed, reload from local file agent",
getFileName());
+ LOG.info("File {} has changed, reload from local file",
this.fileName);
return loadFromFileToHolder();
}
return false;
@@ -125,8 +125,8 @@ public abstract class ConfigHolder {
if (url != null) {
this.filePath = url.getPath();
this.configFile = new File(this.filePath);
- LOG.info("set file path lastTime: {}, currentTime: {}",
- lastModifyTime, configFile.lastModified());
+ LOG.info("Set {} file path, lastTime: {}, currentTime: {}",
+ fileName, lastModifyTime, configFile.lastModified());
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index e495ef0ad..6307d2bf8 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -20,14 +20,16 @@ package org.apache.inlong.dataproxy.config;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigRequest;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.holder.GroupIdPropertiesHolder;
-import org.apache.inlong.dataproxy.config.holder.IPVisitConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MQClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
+import org.apache.inlong.dataproxy.config.holder.WeightConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.WhiteListConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -52,6 +54,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,15 +68,18 @@ public class ConfigManager {
public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new
ArrayList<>();
private static volatile boolean isInit = false;
private static ConfigManager instance = null;
+ // node weight configure
+ private final WeightConfigHolder weightConfigHolder = new
WeightConfigHolder();
+ // black list configure
+ private final BlackListConfigHolder blacklistConfigHolder = new
BlackListConfigHolder();
+ // whitelist configure
+ private final WhiteListConfigHolder whitelistConfigHolder = new
WhiteListConfigHolder();
+ // group id num 2 name configure
+ private final GroupIdNumConfigHolder groupIdConfig = new
GroupIdNumConfigHolder();
private final MQClusterConfigHolder mqClusterConfigHolder = new
MQClusterConfigHolder("mq_cluster.properties");
private final PropertiesConfigHolder topicConfig = new
PropertiesConfigHolder("topics.properties");
private final MxPropertiesHolder mxConfig = new
MxPropertiesHolder("mx.properties");
-
- private final GroupIdPropertiesHolder groupIdConfig = new
GroupIdPropertiesHolder("groupid_mapping.properties");
- private final PropertiesConfigHolder weightHolder = new
PropertiesConfigHolder("weight.properties");
- private final IPVisitConfigHolder blackListConfig = new
IPVisitConfigHolder(true, "blacklist.properties");
- private final IPVisitConfigHolder whiteListConfig = new
IPVisitConfigHolder(false, "whitelist.properties");
// source report configure holder
private final SourceReportConfigHolder sourceReportConfigHolder = new
SourceReportConfigHolder();
// mq clusters ready
@@ -101,8 +107,25 @@ public class ConfigManager {
return instance;
}
- public Map<String, String> getWeightProperties() {
- return weightHolder.getHolder();
+ // get node weight configure
+ public double getCpuWeight() {
+ return weightConfigHolder.getCachedCpuWeight();
+ }
+
+ public double getNetInWeight() {
+ return weightConfigHolder.getCachedNetInWeight();
+ }
+
+ public double getNetOutWeight() {
+ return weightConfigHolder.getCachedNetOutWeight();
+ }
+
+ public double getTcpWeight() {
+ return weightConfigHolder.getCachedTcpWeight();
+ }
+
+ public double getCpuThresholdWeight() {
+ return weightConfigHolder.getCachedCpuThreshold();
}
/**
@@ -134,24 +157,59 @@ public class ConfigManager {
return updatePropertiesHolder(result, topicConfig, false);
}
- public Map<String, String> getMxProperties() {
- return mxConfig.getHolder();
+ // get groupId num 2 name info
+ public boolean isEnableNum2NameTrans(String groupIdNum) {
+ return groupIdConfig.isEnableNum2NameTrans(groupIdNum);
+ }
+
+ public boolean isGroupIdNumConfigEmpty() {
+ return groupIdConfig.isGroupIdNumConfigEmpty();
+ }
+
+ public boolean isStreamIdNumConfigEmpty() {
+ return groupIdConfig.isStreamIdNumConfigEmpty();
+ }
+
+ public String getGroupIdNameByNum(String groupIdNum) {
+ return groupIdConfig.getGroupIdNameByNum(groupIdNum);
+ }
+
+ public String getStreamIdNameByIdNum(String groupIdNum, String
streamIdNum) {
+ return groupIdConfig.getStreamIdNameByIdNum(groupIdNum, streamIdNum);
+ }
+
+ public ConcurrentHashMap<String, String> getGroupIdNumMap() {
+ return groupIdConfig.getGroupIdNumMap();
}
+ public ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
getStreamIdNumMap() {
+ return groupIdConfig.getStreamIdNumMap();
+ }
+
+ // get blacklist whitelist configure info
public void regIPVisitConfigChgCallback(ConfigUpdateCallback callback) {
- blackListConfig.addUpdateCallback(callback);
- whiteListConfig.addUpdateCallback(callback);
+ blacklistConfigHolder.addUpdateCallback(callback);
+ whitelistConfigHolder.addUpdateCallback(callback);
}
public boolean needChkIllegalIP() {
- return (!blackListConfig.isEmptyConfig()
- || CommonConfigHolder.getInstance().isEnableWhiteList());
+ return (blacklistConfigHolder.needCheckBlacklist()
+ || whitelistConfigHolder.needCheckWhitelist());
}
public boolean isIllegalIP(String strRemoteIP) {
return strRemoteIP == null
- || blackListConfig.isContain(strRemoteIP)
- || (CommonConfigHolder.getInstance().isEnableWhiteList() &&
!whiteListConfig.isContain(strRemoteIP));
+ || blacklistConfigHolder.isIllegalIP(strRemoteIP)
+ || whitelistConfigHolder.isIllegalIP(strRemoteIP);
+ }
+
+ // get mx configure info
+ public Map<String, Map<String, String>> getMxPropertiesMaps() {
+ return mxConfig.getMxPropertiesMaps();
+ }
+
+ public Map<String, String> getMxProperties() {
+ return mxConfig.getHolder();
}
public boolean addMxProperties(Map<String, String> result) {
@@ -260,22 +318,6 @@ public class ConfigManager {
}
}
- public Map<String, Map<String, String>> getMxPropertiesMaps() {
- return mxConfig.getMxPropertiesMaps();
- }
-
- public Map<String, String> getGroupIdMappingProperties() {
- return groupIdConfig.getGroupIdMappingProperties();
- }
-
- public Map<String, Map<String, String>> getStreamIdMappingProperties() {
- return groupIdConfig.getStreamIdMappingProperties();
- }
-
- public Map<String, String> getGroupIdEnableMappingProperties() {
- return groupIdConfig.getGroupIdEnableMappingProperties();
- }
-
public PropertiesConfigHolder getTopicConfig() {
return topicConfig;
}
@@ -395,11 +437,17 @@ public class ConfigManager {
}
for (DataProxyTopicInfo topic :
configJson.getData().getTopicList()) {
+ if (topic == null
+ || !topic.isValid()
+ ||
StringUtils.isBlank(topic.getInlongGroupId())
+ || StringUtils.isBlank(topic.getTopic())) {
+ continue;
+ }
if (!StringUtils.isEmpty(topic.getM())) {
groupIdToMValue.put(topic.getInlongGroupId(),
topic.getM());
}
if (!StringUtils.isEmpty(topic.getTopic())) {
- groupIdToTopic.put(topic.getInlongGroupId(),
topic.getTopic());
+
groupIdToTopic.put(topic.getInlongGroupId().trim(), topic.getTopic().trim());
}
}
configManager.updateMxProperties(groupIdToMValue);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
new file mode 100644
index 000000000..4104ce409
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * properties to map
+ */
+public abstract class PropertiesHolder extends ConfigHolder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PropertiesHolder.class);
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ protected final ConcurrentHashMap<String, String> confHolder = new
ConcurrentHashMap<>();
+
+ public PropertiesHolder(String fileName) {
+ super(fileName);
+ }
+
+ public boolean fullUpdateConfigMap(Map<String, String> newConfigMap) {
+ if (newConfigMap == null || newConfigMap.isEmpty()) {
+ return false;
+ }
+ Map<String, String> filterMap = filterInValidRecords(newConfigMap);
+ if (filterMap.isEmpty()) {
+ LOG.info("Update properties {}, but the records are all illegal
{}",
+ getFileName(), newConfigMap);
+ return false;
+ }
+ return compAndStorePropertiesToFile(filterMap);
+ }
+
+ public boolean insertNewConfigMap(Map<String, String> insertConfigMap) {
+ return insertOrRemoveProperties(true, insertConfigMap);
+ }
+
+ public boolean deleteConfigMap(Map<String, String> rmvConfigMap) {
+ return insertOrRemoveProperties(false, rmvConfigMap);
+ }
+
+ protected abstract Map<String, String> filterInValidRecords(Map<String,
String> configMap);
+
+ protected abstract boolean updateCacheData();
+
+ @Override
+ protected boolean loadFromFileToHolder() {
+ readWriteLock.readLock().lock();
+ try {
+ Map<String, String> loadMap = loadConfigFromFile();
+ if (loadMap == null || loadMap.isEmpty()) {
+ LOG.info("Load changed properties {}, but no records
configured", getFileName());
+ return false;
+ }
+ // filter blank items
+ Map<String, String> filteredMap = filterInValidRecords(loadMap);
+ if (filteredMap.isEmpty()) {
+ LOG.info("Load changed properties {}, but the records are all
illegal {}",
+ getFileName(), loadMap);
+ return false;
+ }
+ // remove records
+ Set<String> rmvKeys = new HashSet<>();
+ for (Map.Entry<String, String> entry : confHolder.entrySet()) {
+ if (entry == null || entry.getKey() == null) {
+ continue;
+ }
+ if (!filteredMap.containsKey(entry.getKey())) {
+ rmvKeys.add(entry.getKey());
+ }
+ }
+ for (String tmpKey : rmvKeys) {
+ confHolder.remove(tmpKey);
+ }
+ // insert records
+ Set<String> repKeys = new HashSet<>();
+ for (Map.Entry<String, String> entry : filteredMap.entrySet()) {
+ if (!entry.getValue().equals(confHolder.get(entry.getKey()))) {
+ confHolder.put(entry.getKey(), entry.getValue());
+ repKeys.add(entry.getKey());
+ }
+ }
+ if (rmvKeys.isEmpty() && repKeys.isEmpty()) {
+ return false;
+ }
+ // update cache data
+ boolean result = updateCacheData();
+ // output update result
+ LOG.info("Load changed properties {}, loaded config {}, updated
holder {}, updated cache {}",
+ getFileName(), loadMap, confHolder, result);
+ return true;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ private synchronized boolean insertOrRemoveProperties(boolean isInsert,
+ Map<String, String> changeConfigMap) {
+ if (changeConfigMap == null || changeConfigMap.isEmpty()) {
+ return false;
+ }
+ Map<String, String> filteredMap =
filterInValidRecords(changeConfigMap);
+ if (filteredMap.isEmpty()) {
+ LOG.info("Part {} properties {}, but the records are all illegal
{}",
+ (isInsert ? "insert" : "remove"), getFileName(),
changeConfigMap);
+ return false;
+ }
+ boolean changed = false;
+ Map<String, String> newConfigMap = forkHolder();
+ if (isInsert) {
+ for (Map.Entry<String, String> entry : filteredMap.entrySet()) {
+ String oldValue = newConfigMap.put(entry.getKey(),
entry.getValue());
+ if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+ changed = true;
+ }
+ }
+ } else {
+ for (Map.Entry<String, String> entry : filteredMap.entrySet()) {
+ String oldValue = newConfigMap.remove(entry.getKey());
+ if (oldValue != null) {
+ changed = true;
+ }
+ }
+ }
+ if (!changed) {
+ return false;
+ }
+ return compAndStorePropertiesToFile(newConfigMap);
+ }
+
+ private boolean compAndStorePropertiesToFile(Map<String, String>
newConfigMap) {
+ if (newConfigMap == null || newConfigMap.isEmpty()) {
+ return false;
+ }
+ boolean changed = false;
+ for (Map.Entry<String, String> entry : newConfigMap.entrySet()) {
+ if (!entry.getValue().equals(confHolder.get(entry.getKey()))) {
+ changed = true;
+ break;
+ }
+ }
+ if (!changed) {
+ for (Map.Entry<String, String> entry : confHolder.entrySet()) {
+ if (entry == null || entry.getKey() == null) {
+ continue;
+ }
+ if (!newConfigMap.containsKey(entry.getKey())) {
+ changed = true;
+ break;
+ }
+ }
+ }
+ if (!changed) {
+ return false;
+ }
+ List<String> lines = new ArrayList<>();
+ for (Map.Entry<String, String> entry : newConfigMap.entrySet()) {
+ lines.add(entry.getKey() + "=" + entry.getValue());
+ }
+ return storeConfigToFile(lines);
+ }
+
+ /**
+ * fork current cached records
+ */
+ private Map<String, String> forkHolder() {
+ Map<String, String> tmpHolder = new HashMap<>();
+ if (confHolder != null) {
+ tmpHolder.putAll(confHolder);
+ }
+ return tmpHolder;
+ }
+
+ /**
+ * load from holder
+ */
+ private boolean storeConfigToFile(List<String> configLines) {
+ boolean isSuccess = false;
+ String filePath = getFilePath();
+ if (StringUtils.isBlank(filePath)) {
+ LOG.error("Error in writing file {} as the file path is null.",
getFileName());
+ }
+ readWriteLock.writeLock().lock();
+ try {
+ File sourceFile = new File(filePath);
+ File targetFile = new File(getNextBackupFileName());
+ File tmpNewFile = new File(getFileName() + ".tmp");
+
+ if (sourceFile.exists()) {
+ FileUtils.copyFile(sourceFile, targetFile);
+ }
+ FileUtils.writeLines(tmpNewFile, configLines);
+ FileUtils.copyFile(tmpNewFile, sourceFile);
+ tmpNewFile.delete();
+ isSuccess = true;
+ getFileChanged().set(true);
+ } catch (Throwable ex) {
+ LOG.error("Error in writing file {}", getFileName(), ex);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ return isSuccess;
+ }
+
+ private Map<String, String> loadConfigFromFile() {
+ Map<String, String> result = new HashMap<>();
+ if (StringUtils.isBlank(getFileName())) {
+ LOG.error("Fail to load properties {} as the file name is null.",
getFileName());
+ return result;
+ }
+ InputStream inStream = null;
+ try {
+ URL url = getClass().getClassLoader().getResource(getFileName());
+ inStream = url != null ? url.openStream() : null;
+ if (inStream == null) {
+ LOG.error("Fail to load properties {} as the input stream is
null", getFileName());
+ return result;
+ }
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ } catch (Throwable e) {
+ LOG.error("Fail to load properties {}", getFileName(), e);
+ } finally {
+ if (null != inStream) {
+ try {
+ inStream.close();
+ } catch (IOException e) {
+ LOG.error("Fail in inStream.close for file {}",
getFileName(), e);
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java
new file mode 100644
index 000000000..8554046ab
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/BlackListConfigHolder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+/**
+ * save black list configure to list
+ */
+public class BlackListConfigHolder extends VisitConfigHolder {
+
+ private static final String blacklistFileName = "blacklist.properties";
+
+ public BlackListConfigHolder() {
+ super(true, blacklistFileName);
+ }
+
+ public boolean needCheckBlacklist() {
+ return !isEmptyConfig();
+ }
+
+ public boolean isIllegalIP(String remoteIP) {
+ return isContain(remoteIP);
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
new file mode 100644
index 000000000..f59b651a9
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import org.apache.inlong.dataproxy.config.PropertiesHolder;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Save cluster configure info
+ */
+public class GroupIdNumConfigHolder extends PropertiesHolder {
+
+ private static final String groupIdNumConfigFileName =
"groupid_mapping.properties";
+ private static final String GROUPID_VALUE_SPLITTER = "#";
+ private static final Logger LOG =
LoggerFactory.getLogger(GroupIdNumConfigHolder.class);
+
+ private ConcurrentHashMap<String, String> groupIdNumMap =
+ new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
streamIdNumMap =
+ new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Boolean> groupIdNumEnableMap = new
ConcurrentHashMap<>();
+
+ public GroupIdNumConfigHolder() {
+ super(groupIdNumConfigFileName);
+ }
+
+ public boolean isEnableNum2NameTrans(String groupIdNum) {
+ return groupIdNumEnableMap.getOrDefault(groupIdNum, Boolean.FALSE);
+ }
+
+ public String getGroupIdNameByNum(String groupIdNum) {
+ return groupIdNumMap.get(groupIdNum);
+ }
+
+ public String getStreamIdNameByIdNum(String groupIdNum, String
streamIdNum) {
+ ConcurrentHashMap<String, String> tmpMap =
streamIdNumMap.get(groupIdNum);
+ if (tmpMap == null) {
+ return null;
+ }
+ return tmpMap.get(streamIdNum);
+ }
+
+ public boolean isGroupIdNumConfigEmpty() {
+ return groupIdNumMap.isEmpty();
+ }
+
+ public boolean isStreamIdNumConfigEmpty() {
+ return streamIdNumMap.isEmpty();
+ }
+
+ public ConcurrentHashMap<String, String> getGroupIdNumMap() {
+ return groupIdNumMap;
+ }
+
+ public ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
getStreamIdNumMap() {
+ return streamIdNumMap;
+ }
+
+ public ConcurrentHashMap<String, Boolean> getGroupIdNumEnableMap() {
+ return groupIdNumEnableMap;
+ }
+
+ @Override
+ protected Map<String, String> filterInValidRecords(Map<String, String>
configMap) {
+ Map<String, String> result = new HashMap<>(configMap.size());
+ for (Map.Entry<String, String> entry : configMap.entrySet()) {
+ if (entry == null
+ || StringUtils.isBlank(entry.getKey())
+ || !entry.getKey().contains(GROUPID_VALUE_SPLITTER)) {
+ continue;
+ }
+ String[] keyArray = StringUtils.split(entry.getKey(),
GROUPID_VALUE_SPLITTER);
+ if (keyArray.length != 3) {
+ continue;
+ }
+ if (StringUtils.isBlank(keyArray[0])
+ || StringUtils.isBlank(keyArray[1])
+ || StringUtils.isBlank(keyArray[2])) {
+ continue;
+ }
+ if (StringUtils.isNotBlank(entry.getValue())) {
+ try {
+ MAP_SPLITTER.split(entry.getValue());
+ } catch (Throwable e) {
+ continue;
+ }
+ }
+ result.put(entry.getKey().trim(), entry.getValue().trim());
+ }
+ return result;
+ }
+
+ @Override
+ protected boolean updateCacheData() {
+ boolean tmpEnable;
+ Map<String, String> tmpValueMap = new HashMap<>();
+ Map<String, String> valueMap = new HashMap<>();
+ Map<String, String> tmpGroupIdNumMap = new HashMap<>();
+ Map<String, Map<String, String>> tmpStreamIdNumMap = new HashMap<>();
+ Map<String, Boolean> tmpGroupIdNumEnableMap = new HashMap<>();
+ // parse configure data
+ for (Map.Entry<String, String> entry : confHolder.entrySet()) {
+ if (entry == null
+ || StringUtils.isBlank(entry.getKey())
+ || !entry.getKey().contains(GROUPID_VALUE_SPLITTER)) {
+ continue;
+ }
+ String[] keyArray = StringUtils.split(entry.getKey(),
GROUPID_VALUE_SPLITTER);
+ if (keyArray.length != 3) {
+ continue;
+ }
+ if (StringUtils.isBlank(keyArray[0])
+ || StringUtils.isBlank(keyArray[1])
+ || StringUtils.isBlank(keyArray[2])) {
+ continue;
+ }
+ tmpEnable = Boolean.parseBoolean(keyArray[2].trim());
+ tmpGroupIdNumMap.put(keyArray[0].trim(), keyArray[1].trim());
+ tmpGroupIdNumEnableMap.put(keyArray[0].trim(), tmpEnable);
+ // parse value
+ if (StringUtils.isNotBlank(entry.getValue())) {
+ tmpValueMap.clear();
+ valueMap.clear();
+ try {
+ tmpValueMap = MAP_SPLITTER.split(entry.getValue());
+ } catch (Throwable e) {
+ continue;
+ }
+ if (tmpValueMap.isEmpty()) {
+ continue;
+ }
+ for (Map.Entry<String, String> entry1 :
tmpValueMap.entrySet()) {
+ if (entry1 == null
+ || StringUtils.isBlank(entry1.getKey())
+ || StringUtils.isBlank(entry1.getValue())) {
+ continue;
+ }
+ valueMap.put(entry1.getKey().trim(),
entry.getValue().trim());
+ }
+ if (!valueMap.isEmpty()) {
+ tmpStreamIdNumMap.put(keyArray[0].trim(), valueMap);
+ }
+ }
+ }
+ // update cached groupId num2Name data
+ updateCachedGroupIdNumMap(tmpGroupIdNumMap);
+ // update cached groupId num2Name enable data
+ updateCachedGroupIdNumEnableMap(tmpGroupIdNumEnableMap);
+ // update cached streamId num2Name enable data
+ updateCachedStreamIdNumMap(tmpStreamIdNumMap);
+ return true;
+ }
+
+ private void updateCachedGroupIdNumMap(Map<String, String>
newGroupIdNumMap) {
+ if (newGroupIdNumMap.isEmpty()) {
+ groupIdNumMap.clear();
+ return;
+ }
+ for (Map.Entry<String, String> entry : newGroupIdNumMap.entrySet()) {
+ if (!entry.getValue().equals(groupIdNumMap.get(entry.getKey()))) {
+ groupIdNumMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ // remove records
+ Set<String> rmvKeys = new HashSet<>();
+ for (Map.Entry<String, String> entry : groupIdNumMap.entrySet()) {
+ if (!newGroupIdNumMap.containsKey(entry.getKey())) {
+ rmvKeys.add(entry.getKey());
+ }
+ }
+ for (String tmpKey : rmvKeys) {
+ groupIdNumMap.remove(tmpKey);
+ }
+ }
+
+ private void updateCachedGroupIdNumEnableMap(Map<String, Boolean>
newGroupNumEnableMap) {
+ if (newGroupNumEnableMap.isEmpty()) {
+ groupIdNumEnableMap.clear();
+ return;
+ }
+ for (Map.Entry<String, Boolean> entry :
newGroupNumEnableMap.entrySet()) {
+ if
(!entry.getValue().equals(groupIdNumEnableMap.get(entry.getKey()))) {
+ groupIdNumEnableMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ // remove records
+ Set<String> rmvKeys = new HashSet<>();
+ for (Map.Entry<String, Boolean> entry :
groupIdNumEnableMap.entrySet()) {
+ if (!newGroupNumEnableMap.containsKey(entry.getKey())) {
+ rmvKeys.add(entry.getKey());
+ }
+ }
+ for (String tmpKey : rmvKeys) {
+ groupIdNumEnableMap.remove(tmpKey);
+ }
+ }
+
+ private void updateCachedStreamIdNumMap(Map<String, Map<String, String>>
newStreamIdNumMap) {
+ if (newStreamIdNumMap.isEmpty()) {
+ streamIdNumMap.clear();
+ return;
+ }
+ Map<String, String> newDataMap;
+ Set<String> rmvKeys = new HashSet<>();
+ ConcurrentHashMap<String, String> storedMap;
+ // insert cached streamId num2Name data
+ for (Map.Entry<String, Map<String, String>> entry :
newStreamIdNumMap.entrySet()) {
+ storedMap = streamIdNumMap.get(entry.getKey());
+ if (storedMap == null) {
+ storedMap = new ConcurrentHashMap<>(entry.getValue().size());
+ storedMap.putAll(entry.getValue());
+ streamIdNumMap.put(entry.getKey(), storedMap);
+ } else {
+ newDataMap = entry.getValue();
+ for (Map.Entry<String, String> entry1 : newDataMap.entrySet())
{
+ if
(!entry1.getValue().equals(storedMap.get(entry.getKey()))) {
+ storedMap.put(entry1.getKey(), entry1.getValue());
+ }
+ }
+ for (Map.Entry<String, String> entry1 : storedMap.entrySet()) {
+ if (!newDataMap.containsKey(entry1.getKey())) {
+ rmvKeys.add(entry.getKey());
+ }
+ }
+ for (String tmpKey : rmvKeys) {
+ storedMap.remove(tmpKey);
+ }
+ if (storedMap.isEmpty()) {
+ streamIdNumMap.remove(entry.getKey());
+ }
+ }
+ }
+ // remove cached streamId num2Name data
+ rmvKeys.clear();
+ for (Map.Entry<String, ConcurrentHashMap<String, String>> entry :
streamIdNumMap.entrySet()) {
+ if (!newStreamIdNumMap.containsKey(entry.getKey())) {
+ rmvKeys.add(entry.getKey());
+ }
+ }
+ for (String tmpKey : rmvKeys) {
+ streamIdNumMap.remove(tmpKey);
+ }
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
deleted file mode 100644
index d34b19522..000000000
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdPropertiesHolder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.holder;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * groupId to m value
- */
-public class GroupIdPropertiesHolder extends PropertiesConfigHolder {
-
- private static final Logger LOG =
LoggerFactory.getLogger(GroupIdPropertiesHolder.class);
- private static final String GROUPID_VALUE_SPLITTER = "#";
-
- private Map<String, String> groupIdMappingProperties =
- new HashMap<String, String>();
- private Map<String, Map<String, String>> streamIdMappingProperties =
- new HashMap<String, Map<String, String>>();
- private Map<String, String> groupIdEnableMappingProperties =
- new HashMap<String, String>();
-
- public GroupIdPropertiesHolder(String fileName) {
- super(fileName);
- }
-
- @Override
- public boolean loadFromFileToHolder() {
- super.loadFromFileToHolder();
- try {
- Map<String, String> tmpGroupIdMappingProperties =
- new HashMap<>();
- Map<String, Map<String, String>> tmpStreamIdMappingProperties =
- new HashMap<>();
- Map<String, String> tmpGroupIdEnableMappingProperties = new
HashMap<>();
- for (Map.Entry<String, String> entry :
super.getHolder().entrySet()) {
- String[] sArray = StringUtils.split(entry.getKey(),
GROUPID_VALUE_SPLITTER);
- if (sArray.length != 3) {
- LOG.warn("invalid groupId key {}", entry.getKey());
- continue;
- }
- tmpGroupIdMappingProperties.put(sArray[0].trim(),
sArray[1].trim());
- tmpGroupIdEnableMappingProperties.put(sArray[0].trim(),
sArray[2].trim());
- if (StringUtils.isNotBlank(entry.getValue())) {
- tmpStreamIdMappingProperties.put(sArray[0].trim(),
- MAP_SPLITTER.split(entry.getValue()));
- }
- }
- groupIdMappingProperties = tmpGroupIdMappingProperties;
- streamIdMappingProperties = tmpStreamIdMappingProperties;
- groupIdEnableMappingProperties = tmpGroupIdEnableMappingProperties;
- } catch (Exception e) {
- LOG.error("loadConfig error :", e);
- }
- return true;
- }
-
- public Map<String, String> getGroupIdMappingProperties() {
- return groupIdMappingProperties;
- }
-
- public Map<String, Map<String, String>> getStreamIdMappingProperties() {
- return streamIdMappingProperties;
- }
-
- public Map<String, String> getGroupIdEnableMappingProperties() {
- return groupIdEnableMappingProperties;
- }
-}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
index 75f851fdb..6f75320da 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
@@ -43,7 +43,7 @@ public class MQClusterConfigHolder extends
PropertiesConfigHolder {
* load from file
*/
@Override
- public boolean loadFromFileToHolder() {
+ protected boolean loadFromFileToHolder() {
super.loadFromFileToHolder();
Map<String, String> tmpUrl2token = new HashMap<>();
for (Map.Entry<String, String> entry : getHolder().entrySet()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
index c5c65ce7d..74b81999e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
@@ -40,7 +40,7 @@ public class MxPropertiesHolder extends
PropertiesConfigHolder {
* load m from file
*/
@Override
- public boolean loadFromFileToHolder() {
+ protected boolean loadFromFileToHolder() {
super.loadFromFileToHolder();
try {
for (Map.Entry<String, String> entry : getHolder().entrySet()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
index a75683c78..2894f19bf 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
@@ -50,7 +50,7 @@ public class PropertiesConfigHolder extends ConfigHolder {
}
@Override
- public boolean loadFromFileToHolder() {
+ protected boolean loadFromFileToHolder() {
readWriteLock.readLock().lock();
try {
Map<String, String> tmpHolder = loadProperties();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IPVisitConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
similarity index 95%
rename from
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IPVisitConfigHolder.java
rename to
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
index 8f73185de..d7a7277b8 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IPVisitConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/VisitConfigHolder.java
@@ -38,27 +38,27 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* save to list
*/
-public class IPVisitConfigHolder extends ConfigHolder {
+public class VisitConfigHolder extends ConfigHolder {
private static final int MIN_NETMASK_BITS = 0;
private static final int MAX_NETMASK_BITS = 32;
private static final String MASKIP_NETMASK_SEP = "/";
private static final String IPV4ADDR_TMP =
"((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))";
- private static final Logger LOG =
LoggerFactory.getLogger(IPVisitConfigHolder.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(VisitConfigHolder.class);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final boolean isBlackList;
private final ConcurrentHashMap<String, Long> confHolder = new
ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> ipAddrHolder = new
ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Pair<Integer, Integer>>
ipSegmentHolder = new ConcurrentHashMap<>();
- public IPVisitConfigHolder(boolean isBlackList, String fileName) {
+ public VisitConfigHolder(boolean isBlackList, String fileName) {
super(fileName);
this.isBlackList = isBlackList;
}
@Override
- public boolean loadFromFileToHolder() {
+ protected boolean loadFromFileToHolder() {
readWriteLock.writeLock().lock();
try {
Map<String, Long> tmpHolder = loadFile();
@@ -90,9 +90,7 @@ public class IPVisitConfigHolder extends ConfigHolder {
int hostAddrMask;
tmpKeys.clear();
for (Map.Entry<String, Long> entry : tmpHolder.entrySet()) {
- if (entry == null
- || entry.getKey() == null
- || StringUtils.isBlank(entry.getKey())) {
+ if (entry == null || StringUtils.isBlank(entry.getKey())) {
continue;
}
if (!confHolder.containsKey(entry.getKey())) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WeightConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WeightConfigHolder.java
new file mode 100644
index 000000000..54248482a
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WeightConfigHolder.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import org.apache.inlong.dataproxy.config.PropertiesHolder;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Save weight configure info
+ */
+public class WeightConfigHolder extends PropertiesHolder {
+
+ private static final String weightConfigFileName = "weight.properties";
+ private static final Logger LOG =
LoggerFactory.getLogger(WeightConfigHolder.class);
+
+ private static final String KEY_WEIGHT_CPU = "cpuWeight";
+ private static final double VAL_DEF_WEIGHT_CPU = 1;
+ private static final String KEY_WEIGHT_NET_IN = "netinWeight";
+ private static final double VAL_DEF_WEIGHT_NET_IN = 0.5;
+ private static final String KEY_WEIGHT_NET_OUT = "netoutWeight";
+ private static final double VAL_DEF_WEIGHT_NET_OUT = 0.5;
+ private static final String KEY_WEIGHT_TCP = "tcpWeight";
+ private static final double VAL_DEF_WEIGHT_TCP = 0;
+ private static final String KEY_WEIGHT_CPU_THRESHOLD = "cpuThreshold";
+ private static final double VAL_DEF_WEIGHT_CPU_THRESHOLD = 85;
+ // cache configure
+ private final AtomicDouble cachedCpuWeight = new
AtomicDouble(VAL_DEF_WEIGHT_CPU);
+ private final AtomicDouble cachedNetInWeight = new
AtomicDouble(VAL_DEF_WEIGHT_NET_IN);
+ private final AtomicDouble cachedNetOutWeight = new
AtomicDouble(VAL_DEF_WEIGHT_NET_OUT);
+ private final AtomicDouble cachedTcpWeight = new
AtomicDouble(VAL_DEF_WEIGHT_TCP);
+ private final AtomicDouble cachedCpuThreshold = new
AtomicDouble(VAL_DEF_WEIGHT_CPU_THRESHOLD);
+
+ public WeightConfigHolder() {
+ super(weightConfigFileName);
+ }
+
+ public double getCachedCpuWeight() {
+ return cachedCpuWeight.get();
+ }
+
+ public double getCachedNetInWeight() {
+ return cachedNetInWeight.get();
+ }
+
+ public double getCachedNetOutWeight() {
+ return cachedNetOutWeight.get();
+ }
+
+ public double getCachedTcpWeight() {
+ return cachedTcpWeight.get();
+ }
+
+ public double getCachedCpuThreshold() {
+ return cachedCpuThreshold.get();
+ }
+
+ @Override
+ protected Map<String, String> filterInValidRecords(Map<String, String>
configMap) {
+ Map<String, String> filteredMap = new HashMap<>(configMap.size());
+ for (Map.Entry<String, String> entry : configMap.entrySet()) {
+ if (entry == null
+ || StringUtils.isBlank(entry.getKey())
+ || StringUtils.isBlank(entry.getValue())) {
+ continue;
+ }
+ try {
+ Double.parseDouble(entry.getValue());
+ } catch (Throwable e) {
+ continue;
+ }
+ filteredMap.put(entry.getKey().trim(), entry.getValue().trim());
+ }
+ return filteredMap;
+ }
+
+ @Override
+ protected boolean updateCacheData() {
+ // get cpu weight
+ double newVal = VAL_DEF_WEIGHT_CPU;
+ String tmpStrVal = confHolder.get(KEY_WEIGHT_CPU);
+ if (StringUtils.isNotBlank(tmpStrVal)) {
+ try {
+ newVal = Double.parseDouble(tmpStrVal);
+ } catch (Throwable e) {
+ //
+ }
+ }
+ cachedCpuWeight.set(newVal);
+ // get net-in weight
+ newVal = VAL_DEF_WEIGHT_NET_IN;
+ tmpStrVal = confHolder.get(KEY_WEIGHT_NET_IN);
+ if (StringUtils.isNotBlank(tmpStrVal)) {
+ try {
+ newVal = Double.parseDouble(tmpStrVal);
+ } catch (Throwable e) {
+ //
+ }
+ }
+ cachedNetInWeight.set(newVal);
+ // get net-out weight
+ newVal = VAL_DEF_WEIGHT_NET_OUT;
+ tmpStrVal = confHolder.get(KEY_WEIGHT_NET_OUT);
+ if (StringUtils.isNotBlank(tmpStrVal)) {
+ try {
+ newVal = Double.parseDouble(tmpStrVal);
+ } catch (Throwable e) {
+ //
+ }
+ }
+ cachedNetOutWeight.set(newVal);
+ // get tcp weight
+ newVal = VAL_DEF_WEIGHT_TCP;
+ tmpStrVal = confHolder.get(KEY_WEIGHT_TCP);
+ if (StringUtils.isNotBlank(tmpStrVal)) {
+ try {
+ newVal = Double.parseDouble(tmpStrVal);
+ } catch (Throwable e) {
+ //
+ }
+ }
+ cachedTcpWeight.set(newVal);
+ // get cpu threshold weight
+ newVal = VAL_DEF_WEIGHT_CPU_THRESHOLD;
+ tmpStrVal = confHolder.get(KEY_WEIGHT_CPU_THRESHOLD);
+ if (StringUtils.isNotBlank(tmpStrVal)) {
+ try {
+ newVal = Double.parseDouble(tmpStrVal);
+ } catch (Throwable e) {
+ //
+ }
+ }
+ cachedCpuThreshold.set(newVal);
+ return true;
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WhiteListConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WhiteListConfigHolder.java
new file mode 100644
index 000000000..f40ac7748
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/WhiteListConfigHolder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+
+/**
+ * save white list configure to list
+ */
+public class WhiteListConfigHolder extends VisitConfigHolder {
+
+ private static final String whitelistFileName = "whitelist.properties";
+
+ public WhiteListConfigHolder() {
+ super(false, whitelistFileName);
+ }
+
+ public boolean needCheckWhitelist() {
+ return CommonConfigHolder.getInstance().isEnableWhiteList();
+ }
+
+ public boolean isIllegalIP(String remoteIP) {
+ return (CommonConfigHolder.getInstance().isEnableWhiteList() &&
!isContain(remoteIP));
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 8d1395f58..26407038e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -44,8 +44,8 @@ import org.apache.http.util.EntityUtils;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -153,27 +153,26 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
heartbeatMsg.setInCharges(CommonConfigHolder.getInstance().getClusterIncharges());
heartbeatMsg.setExtTag(CommonConfigHolder.getInstance().getClusterExtTag());
- Map<String, String> groupIdMappings =
configManager.getGroupIdMappingProperties();
- Map<String, Map<String, String>> streamIdMappings =
configManager.getStreamIdMappingProperties();
- Map<String, String> groupIdEnableMappings =
configManager.getGroupIdEnableMappingProperties();
+ ConcurrentHashMap<String, String> groupIdMappings =
+ configManager.getGroupIdNumMap();
+ ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
streamIdMappings =
+ configManager.getStreamIdNumMap();
List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
for (Entry<String, String> entry : groupIdMappings.entrySet()) {
String groupIdNum = entry.getKey();
String groupId = entry.getValue();
GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
groupHeartbeat.setInlongGroupId(groupId);
- String status = groupIdEnableMappings.getOrDefault(groupIdNum,
"disabled");
- status = status.equals("TRUE") ? "enabled" : "disabled";
+ String status = configManager.isEnableNum2NameTrans(groupIdNum) ?
"enabled" : "disabled";
groupHeartbeat.setStatus(status);
groupHeartbeats.add(groupHeartbeat);
}
heartbeatMsg.setGroupHeartbeats(groupHeartbeats);
List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
- for (Entry<String, Map<String, String>> entry :
streamIdMappings.entrySet()) {
+ for (Entry<String, ConcurrentHashMap<String, String>> entry :
streamIdMappings.entrySet()) {
String groupIdNum = entry.getKey();
- String status = groupIdEnableMappings.getOrDefault(groupIdNum,
"disabled");
- status = status.equals("TRUE") ? "enabled" : "disabled";
+ String status = configManager.isEnableNum2NameTrans(groupIdNum) ?
"enabled" : "disabled";
String groupId = groupIdMappings.get(groupIdNum);
for (Entry<String, String> streamEntry :
entry.getValue().entrySet()) {
String streamId = streamEntry.getValue();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 12678b57c..5b2d725fe 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -359,19 +359,12 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
String groupIdNum =
commonAttrMap.get(AttrConstants.GROUPID_NUM);
String streamIdNum =
commonAttrMap.get(AttrConstants.STREAMID_NUM);
// get configured groupId and steamId by numbers
- if (configManager.getGroupIdMappingProperties() != null
- && configManager.getStreamIdMappingProperties() !=
null) {
- groupId =
configManager.getGroupIdMappingProperties().get(groupIdNum);
- streamId =
(configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
- ? null
- :
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+ if (!configManager.isGroupIdNumConfigEmpty()
+ && !configManager.isStreamIdNumConfigEmpty()) {
+ groupId = configManager.getGroupIdNameByNum(groupIdNum);
+ streamId =
configManager.getStreamIdNameByIdNum(groupIdNum, streamIdNum);
if (groupId != null && streamId != null) {
- String enableTrans =
-
(configManager.getGroupIdEnableMappingProperties() == null)
- ? null
- :
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
- if (("TRUE".equalsIgnoreCase(enableTrans)
- && "TRUE".equalsIgnoreCase(num2name))) {
+ if ((configManager.isEnableNum2NameTrans(groupIdNum)
&& "TRUE".equalsIgnoreCase(num2name))) {
String extraAttr = "groupId=" + groupId + "&" +
"streamId=" + streamId;
message.setData(newBinMsg(message.getData(),
extraAttr));
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index b348777c3..4666e617f 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -235,18 +235,13 @@ public class SimpleMessageHandler extends
ChannelInboundHandlerAdapter {
String groupIdNum = commonAttrMap.get(AttrConstants.GROUPID_NUM);
String streamIdNum = commonAttrMap.get(AttrConstants.STREAMID_NUM);
- if (configManager.getGroupIdMappingProperties() != null
- && configManager.getStreamIdMappingProperties() != null) {
- groupId =
configManager.getGroupIdMappingProperties().get(groupIdNum);
- streamId =
(configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
- ? null
- :
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+ if (!configManager.isGroupIdNumConfigEmpty()
+ && !configManager.isStreamIdNumConfigEmpty()) {
+ groupId = configManager.getGroupIdNameByNum(groupIdNum);
+ streamId = configManager.getStreamIdNameByIdNum(groupIdNum,
streamIdNum);
if (groupId != null && streamId != null) {
- String enableTrans =
(configManager.getGroupIdEnableMappingProperties() == null)
- ? null
- :
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
- if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
- .equalsIgnoreCase(num2name))) {
+ if ((configManager.isEnableNum2NameTrans(groupIdNum)
+ && "TRUE".equalsIgnoreCase(num2name))) {
String extraAttr = "groupId=" + groupId + "&" +
"streamId=" + streamId;
message.setData(newBinMsg(message.getData(),
extraAttr));
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index 83c9e241e..356561ff2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -179,11 +179,9 @@ public abstract class BaseSource
// get max worker threads
this.maxWorkerThreads = ConfStringUtils.getIntValue(context,
SourceConstants.SRCCXT_MAX_WORKER_THREADS,
SourceConstants.VAL_DEF_WORKER_THREADS);
- Preconditions.checkArgument((this.maxWorkerThreads >=
SourceConstants.VAL_MIN_WORKER_THREADS
- && this.maxWorkerThreads <=
SourceConstants.VAL_MAX_WORKER_THREADS),
- SourceConstants.SRCCXT_MAX_WORKER_THREADS + " must be in ["
- + SourceConstants.VAL_MIN_WORKER_THREADS + ", "
- + SourceConstants.VAL_MAX_WORKER_THREADS + "]");
+ Preconditions.checkArgument((this.maxWorkerThreads >=
SourceConstants.VAL_MIN_WORKER_THREADS),
+ SourceConstants.SRCCXT_MAX_WORKER_THREADS + " must be >= "
+ + SourceConstants.VAL_MIN_WORKER_THREADS);
// get max read idle time
this.maxReadIdleTimeMs = ConfStringUtils.getLongValue(context,
SourceConstants.SRCCXT_MAX_READ_IDLE_TIME_MS,
SourceConstants.VAL_DEF_READ_IDLE_TIME_MS);
@@ -204,18 +202,12 @@ public abstract class BaseSource
Preconditions.checkArgument(this.maxRcvBufferSize >=
SourceConstants.VAL_MIN_RECEIVE_BUFFER_SIZE,
SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE + " must be >= "
+ SourceConstants.VAL_MIN_RECEIVE_BUFFER_SIZE);
- if (this.maxRcvBufferSize >
SourceConstants.VAL_MAX_RECEIVE_BUFFER_SIZE) {
- this.maxRcvBufferSize =
SourceConstants.VAL_MAX_RECEIVE_BUFFER_SIZE;
- }
// get max send buffer size
this.maxSendBufferSize = ConfStringUtils.getIntValue(context,
SourceConstants.SRCCXT_SEND_BUFFER_SIZE,
SourceConstants.VAL_DEF_SEND_BUFFER_SIZE);
Preconditions.checkArgument(this.maxSendBufferSize >=
SourceConstants.VAL_MIN_SEND_BUFFER_SIZE,
SourceConstants.SRCCXT_SEND_BUFFER_SIZE + " must be >= "
+ SourceConstants.VAL_MIN_SEND_BUFFER_SIZE);
- if (this.maxSendBufferSize > SourceConstants.VAL_MAX_SEND_BUFFER_SIZE)
{
- this.maxSendBufferSize = SourceConstants.VAL_MAX_SEND_BUFFER_SIZE;
- }
}
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 64bcf55eb..26f1fad6c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -61,7 +61,6 @@ public class SourceConstants {
public static final String SRCCXT_MAX_WORKER_THREADS = "max-threads";
public static final int VAL_DEF_WORKER_THREADS =
Runtime.getRuntime().availableProcessors();
public static final int VAL_MIN_WORKER_THREADS = 1;
- public static final int VAL_MAX_WORKER_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
// max connection count
public static final String SRCCXT_MAX_CONNECTION_CNT = "connections";
public static final int VAL_DEF_MAX_CONNECTION_CNT = 5000;
@@ -70,12 +69,10 @@ public class SourceConstants {
public static final String SRCCXT_RECEIVE_BUFFER_SIZE =
"receiveBufferSize";
public static final int VAL_DEF_RECEIVE_BUFFER_SIZE = 64 * 1024;
public static final int VAL_MIN_RECEIVE_BUFFER_SIZE = 0;
- public static final int VAL_MAX_RECEIVE_BUFFER_SIZE = 100 * 1024 * 1024;
// max send buffer size
public static final String SRCCXT_SEND_BUFFER_SIZE = "sendBufferSize";
public static final int VAL_DEF_SEND_BUFFER_SIZE = 64 * 1024;
public static final int VAL_MIN_SEND_BUFFER_SIZE = 0;
- public static final int VAL_MAX_SEND_BUFFER_SIZE = 100 * 1024 * 1024;
// tcp parameter no delay
public static final String SRCCXT_TCP_NO_DELAY = "tcpNoDelay";
public static final boolean VAL_DEF_TCP_NO_DELAY = true;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index 58bed5f78..b45581755 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -265,17 +265,17 @@ public class CodecBinMsg extends AbsV0MsgCodec {
String confGroupId;
String confStreamId;
String strGroupIdNum = String.valueOf(this.groupIdNum);
- if (configManager.getGroupIdMappingProperties() == null) {
- source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
- this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
- this.errMsg = "GroupId-Mapping configuration is null";
- return false;
- }
- confGroupId =
configManager.getGroupIdMappingProperties().get(strGroupIdNum);
+ confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
if (StringUtils.isBlank(confGroupId)) {
- source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
- this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
- this.errMsg = String.format("Non-existing groupIdNum(%s)
configuration", strGroupIdNum);
+ if (configManager.isGroupIdNumConfigEmpty()) {
+
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+ this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
+ this.errMsg = "GroupId-Mapping configuration is null";
+ } else {
+
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+ this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
+ this.errMsg = String.format("Non-existing groupIdNum(%s)
configuration", strGroupIdNum);
+ }
return false;
}
if (StringUtils.isNotBlank(this.groupId) &&
!this.groupId.equalsIgnoreCase(confGroupId)) {
@@ -296,27 +296,19 @@ public class CodecBinMsg extends AbsV0MsgCodec {
return false;
}
} else {
- if (configManager.getStreamIdMappingProperties() == null) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
- this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
- this.errMsg = "StreamId-Mapping configuration is null";
- return false;
- }
- Map<String, String> confStreamIdMap =
-
configManager.getStreamIdMappingProperties().get(strGroupIdNum);
- if (confStreamIdMap == null) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
- this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
- this.errMsg = "GroupId in StreamId-Mapping configuration
is null";
- return false;
- }
String strStreamIdNum = String.valueOf(this.streamIdNum);
- confStreamId = confStreamIdMap.get(strStreamIdNum);
+ confStreamId =
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
if (StringUtils.isBlank(confStreamId)) {
-
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
- this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
- this.errMsg = String.format("Non-existing
GroupId(%s)-StreamId(%s) configuration",
- strGroupIdNum, strStreamIdNum);
+ if (configManager.isStreamIdNumConfigEmpty()) {
+
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+ this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
+ this.errMsg = "StreamId-Mapping configuration is null";
+ } else {
+
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+ this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
+ this.errMsg = String.format("Non-existing
GroupId(%s)-StreamId(%s) configuration",
+ strGroupIdNum, strStreamIdNum);
+ }
return false;
}
if (StringUtils.isNotBlank(this.streamId) &&
!this.streamId.equalsIgnoreCase(confStreamId)) {
@@ -330,10 +322,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
this.streamId = confStreamId;
}
// check whether enable num 2 name translate
- String enableTrans =
(configManager.getGroupIdEnableMappingProperties() == null)
- ? null
- :
configManager.getGroupIdEnableMappingProperties().get(strGroupIdNum);
- if ("true".equalsIgnoreCase(enableTrans) && this.num2name) {
+ if (configManager.isEnableNum2NameTrans(strGroupIdNum) &&
this.num2name) {
this.transNum2Name = true;
}
} else {