This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b5714f8e26 [INLONG-10081][DataProxy] Modify the data format of
metadata saved in the metadata.json file (#10083)
b5714f8e26 is described below
commit b5714f8e26dcd1e8633c409d3e767f8dfeb18c0e
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Apr 26 17:06:07 2024 +0800
[INLONG-10081][DataProxy] Modify the data format of metadata saved in the
metadata.json file (#10083)
---
.../inlong/dataproxy/config/ConfigManager.java | 260 ++++++++++++--
.../dataproxy/config/holder/MetaConfigHolder.java | 372 ++++++++-------------
.../dataproxy/config/pojo/CacheClusterConfig.java | 20 ++
.../inlong/dataproxy/config/pojo/CacheType.java | 11 +-
.../dataproxy/config/pojo/IdTopicConfig.java | 54 ++-
.../dataproxy/config/pojo/InLongMetaConfig.java | 69 ++++
.../{source => consts}/SourceConstants.java | 93 +-----
.../apache/inlong/dataproxy/source/BaseSource.java | 1 +
.../dataproxy/source/ServerMessageFactory.java | 2 +
.../inlong/dataproxy/source/SimpleHttpSource.java | 1 +
.../inlong/dataproxy/source/SimpleTcpSource.java | 1 +
.../inlong/dataproxy/source/SimpleUdpSource.java | 1 +
.../inlong/dataproxy/utils/AddressUtils.java | 2 +-
.../src/test/resources/metadata.json | 2 +-
14 files changed, 535 insertions(+), 354 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 01ef883b44..68772a8c78 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -17,10 +17,17 @@
package org.apache.inlong.dataproxy.config;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.InlongCompressType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.heartbeat.AddressInfo;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigRequest;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
+import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
+import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
@@ -29,12 +36,15 @@ import
org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
import org.apache.inlong.dataproxy.config.holder.WeightConfigHolder;
import org.apache.inlong.dataproxy.config.holder.WhiteListConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.CacheType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.config.pojo.InLongMetaConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.utils.HttpUtils;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
@@ -45,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.SecureRandom;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -141,8 +152,8 @@ public class ConfigManager {
return metaConfigHolder.getConfigMd5();
}
- public boolean updateMetaConfigInfo(String inDataMd5, String
inDataJsonStr) {
- return metaConfigHolder.updateConfigMap(inDataMd5, inDataJsonStr);
+ public boolean updateMetaConfigInfo(InLongMetaConfig metaConfig) {
+ return metaConfigHolder.updateConfigMap(metaConfig);
}
// register meta-config callback
@@ -331,7 +342,7 @@ public class ConfigManager {
}
httpPost.setEntity(HttpUtils.getEntity(request));
if (LOG.isDebugEnabled()) {
- LOG.debug("Start to request {} to get config info, with
params: {}, headers: {}",
+ LOG.debug("Sync meta: start to get config, to:{}, params:
{}, headers: {}",
url, request, httpPost.getAllHeaders());
}
// request with post
@@ -340,52 +351,69 @@ public class ConfigManager {
String returnStr = EntityUtils.toString(response.getEntity());
long dltTime = System.currentTimeMillis() - startTime;
if (dltTime >=
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()) {
- LOG.warn("End to request {} to get config info, WAIST {}
ms, over alarm value {} ms",
- url, dltTime,
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs());
+ LOG.warn("Sync meta: end to get config, WAIST {} ms, over
alarm: {} ms, from:{}",
+ dltTime,
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs(), url);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("End to request {} to get config info:{},
WAIST {} ms",
- url, returnStr, dltTime);
+ LOG.debug("Sync meta: end to get config, WAIST {} ms,
from:{}, result:{}",
+ dltTime, url, returnStr);
}
}
if (response.getStatusLine().getStatusCode() != 200) {
- LOG.warn("Failed to request {}, with params: {}, headers:
{}, the response is {}",
- url, request, httpPost.getAllHeaders(), returnStr);
+ LOG.error(
+ "Sync meta: return failure, errCode {}, from:{},
params:{}, headers:{}, response:{}",
+ response.getStatusLine().getStatusCode(), url,
request, httpPost.getAllHeaders(),
+ returnStr);
return false;
}
// get groupId <-> topic and m value.
- DataProxyConfigResponse proxyResponse =
- gson.fromJson(returnStr,
DataProxyConfigResponse.class);
- if (!proxyResponse.isResult()) {
- LOG.warn("Fail to get config from url {}, with params {},
error code is {}",
- url, request, proxyResponse.getErrCode());
+ DataProxyConfigResponse proxyResponse;
+ try {
+ proxyResponse = gson.fromJson(returnStr,
DataProxyConfigResponse.class);
+ } catch (Throwable e) {
+ LOG.error("Sync meta: exception thrown while parsing
config, from:{}, params:{}, response:{}",
+ url, request, returnStr, e);
return false;
}
- if (proxyResponse.getErrCode() !=
DataProxyConfigResponse.SUCC) {
+ // check required fields
+ ImmutablePair<Boolean, String> validResult =
validRequiredFields(proxyResponse);
+ if (!validResult.getLeft()) {
if (proxyResponse.getErrCode() !=
DataProxyConfigResponse.NOUPDATE) {
- LOG.warn("Get config failure from url:{}, with params
{}, error code is {}",
- url, request, proxyResponse.getErrCode());
+ LOG.error("Sync meta: {}, from:{}, params:{},
return:{}",
+ validResult.getRight(), url, request,
returnStr);
}
return true;
}
- DataProxyCluster dataProxyCluster = proxyResponse.getData();
- if (dataProxyCluster == null
- || dataProxyCluster.getCacheClusterSet() == null
- ||
dataProxyCluster.getCacheClusterSet().getCacheClusters().isEmpty()) {
- LOG.warn("Get config empty from url:{}, with params {},
return:{}, cluster is empty!",
+ // get mq cluster info
+ ImmutablePair<CacheType, Map<String, CacheClusterConfig>>
clusterInfo =
+
buildCacheClusterConfig(proxyResponse.getData().getCacheClusterSet());
+ if (clusterInfo.getLeft() == CacheType.N) {
+ LOG.error("Sync meta: unsupported mq type {}, from:{},
params:{}, return:{}",
+ clusterInfo.getLeft(), url, request, returnStr);
+ return true;
+ }
+ if (clusterInfo.getRight().isEmpty()) {
+ LOG.error("Sync meta: cacheClusters is empty, from:{},
params:{}, return:{}",
url, request, returnStr);
return true;
}
+ // get ID to Topic configure
+ Map<String, IdTopicConfig> idTopicConfigMap =
buildCacheTopicConfig(
+ clusterInfo.getLeft(),
proxyResponse.getData().getProxyCluster());
+ InLongMetaConfig inLongMetaConfig = new
InLongMetaConfig(proxyResponse.getMd5(),
+ clusterInfo.getLeft(), clusterInfo.getRight(),
idTopicConfigMap);
// update meta configure
- if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(),
returnStr)) {
- if (!ConfigManager.handshakeManagerOk.get()) {
- ConfigManager.handshakeManagerOk.set(true);
- LOG.info("Get config success from manager and updated,
set handshake status is ok!");
- }
+ configManager.updateMetaConfigInfo(inLongMetaConfig);
+ // update handshake to manager status
+ if (ConfigManager.handshakeManagerOk.get()) {
+ LOG.info("Sync meta: sync config success, from:{}", url);
+ } else {
+ ConfigManager.handshakeManagerOk.set(true);
+ LOG.info("Sync meta: sync config success, handshake
manager ok, from:{}", url);
}
return true;
} catch (Throwable ex) {
- LOG.error("Request manager {} failure, throw exception", url,
ex);
+ LOG.error("Sync meta: process throw exception, from:{}", url,
ex);
return false;
} finally {
if (httpPost != null) {
@@ -393,5 +421,179 @@ public class ConfigManager {
}
}
}
+
+ /**
+ * check required fields status
+ *
+ * @param response response from Manager
+ *
+ * @return check result
+ */
+ public ImmutablePair<Boolean, String>
validRequiredFields(DataProxyConfigResponse response) {
+ if (response == null) {
+ return ImmutablePair.of(false, "parse result is null");
+ } else if (!response.isResult()) {
+ return ImmutablePair.of(false, "result is NOT true");
+ } else if (response.getErrCode() != DataProxyConfigResponse.SUCC) {
+ return ImmutablePair.of(false, "errCode is "
+ + response.getErrCode() + ", NOT success");
+ } else if (response.getMd5() == null) {
+ return ImmutablePair.of(false, "md5 field is null");
+ } else if (response.getData() == null) {
+ return ImmutablePair.of(false, "data field is null");
+ } else if (response.getData().getProxyCluster() == null) {
+ return ImmutablePair.of(false, "proxyCluster field is null");
+ } else if (response.getData().getCacheClusterSet() == null) {
+ return ImmutablePair.of(false, "cacheClusterSet field is
null");
+ } else if (response.getData().getProxyCluster().getInlongIds() ==
null) {
+ return ImmutablePair.of(false, "inlongIds field is null");
+ } else if
(response.getData().getCacheClusterSet().getCacheClusters() == null) {
+ return ImmutablePair.of(false, "cacheClusters field is null");
+ }
+ return ImmutablePair.of(true, "ok");
+ }
+
+ /**
+ * build cluster config based on cluster set object
+ *
+ * @param clusterSetObject mq cluster set obect
+ *
+ * @return mq type and cluster set configure
+ */
+ private ImmutablePair<CacheType, Map<String, CacheClusterConfig>>
buildCacheClusterConfig(
+ CacheClusterSetObject clusterSetObject) {
+ CacheType mqType = CacheType.convert(clusterSetObject.getType());
+ Map<String, CacheClusterConfig> result = new HashMap<>();
+ for (CacheClusterObject clusterObject :
clusterSetObject.getCacheClusters()) {
+ if (clusterObject == null ||
StringUtils.isBlank(clusterObject.getName())) {
+ continue;
+ }
+ CacheClusterConfig config = new CacheClusterConfig();
+ config.setClusterName(clusterObject.getName());
+ config.setToken(clusterObject.getToken());
+ config.getParams().putAll(clusterObject.getParams());
+ result.put(config.getClusterName(), config);
+ }
+ return ImmutablePair.of(mqType, result);
+ }
+
+ /**
+ * build id2field config based on id2Topic configure
+ *
+ * @param mqType mq cluster type
+ * @param proxyClusterObject cluster object info
+ *
+ * @return ID to Topic configures
+ */
+ private Map<String, IdTopicConfig> buildCacheTopicConfig(
+ CacheType mqType, ProxyClusterObject proxyClusterObject) {
+ Map<String, IdTopicConfig> tmpTopicConfigMap = new HashMap<>();
+ List<InLongIdObject> inLongIds = proxyClusterObject.getInlongIds();
+ if (inLongIds.isEmpty()) {
+ return tmpTopicConfigMap;
+ }
+ int index;
+ String[] idItems;
+ String groupId;
+ String streamId;
+ String topicName;
+ String tenant;
+ String nameSpace;
+ for (InLongIdObject idObject : inLongIds) {
+ if (idObject == null
+ || StringUtils.isBlank(idObject.getInlongId())
+ || StringUtils.isBlank(idObject.getTopic())) {
+ continue;
+ }
+ // parse inlong id
+ idItems = idObject.getInlongId().split("\\.");
+ if (idItems.length == 2) {
+ if (StringUtils.isBlank(idItems[0])) {
+ continue;
+ }
+ groupId = idItems[0].trim();
+ streamId = idItems[1].trim();
+ } else {
+ groupId = idObject.getInlongId().trim();
+ streamId = "";
+ }
+ topicName = idObject.getTopic().trim();
+ // change full topic name "pulsar-xxx/test/base_topic_name" to
+ // base topic name "base_topic_name"
+ index = topicName.lastIndexOf('/');
+ if (index >= 0) {
+ topicName = topicName.substring(index + 1).trim();
+ }
+ tenant =
idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, "");
+ nameSpace =
idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, "");
+ if (StringUtils.isBlank(idObject.getTopic())) {
+ // namespace field must exist and value not be empty,
+ // otherwise it is an illegal configuration item.
+ continue;
+ }
+ if (mqType.equals(CacheType.TUBE)) {
+ topicName = nameSpace;
+ } else if (mqType.equals(CacheType.KAFKA)) {
+ if (topicName.equals(streamId)) {
+ topicName =
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName);
+ }
+ }
+ IdTopicConfig tmpConfig = new IdTopicConfig();
+ tmpConfig.setInlongGroupIdAndStreamId(groupId, streamId);
+ tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
+ tmpConfig.setTopicName(topicName);
+ tmpConfig.setParams(idObject.getParams());
+ tmpConfig.setDataType(DataTypeEnum.convert(
+ idObject.getParams().getOrDefault("dataType",
DataTypeEnum.TEXT.getType())));
+
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter",
"|"));
+
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter",
"\n"));
+ tmpConfig.setUseExtendedFields(Boolean.valueOf(
+ idObject.getParams().getOrDefault("useExtendedFields",
"false")));
+ tmpConfig.setMsgWrapType(getPbWrapType(idObject));
+ tmpConfig.setV1CompressType(getPbCompressType(idObject));
+ tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig);
+ // add only groupId object for tube
+ if (mqType.equals(CacheType.TUBE)
+ &&
!tmpConfig.getUid().equals(tmpConfig.getInlongGroupId())
+ && tmpTopicConfigMap.get(tmpConfig.getInlongGroupId())
== null) {
+ IdTopicConfig tmpConfig2 = new IdTopicConfig();
+ tmpConfig2.setInlongGroupIdAndStreamId(groupId, "");
+ tmpConfig2.setTenantAndNameSpace(tenant, nameSpace);
+ tmpConfig2.setTopicName(topicName);
+ tmpConfig2.setDataType(tmpConfig.getDataType());
+
tmpConfig2.setFieldDelimiter(tmpConfig.getFieldDelimiter());
+ tmpConfig2.setFileDelimiter(tmpConfig.getFileDelimiter());
+ tmpConfig2.setParams(new HashMap<>(tmpConfig.getParams()));
+
tmpConfig2.setUseExtendedFields(tmpConfig.isUseExtendedFields());
+ tmpConfig2.setMsgWrapType(tmpConfig.getMsgWrapType());
+
tmpConfig2.setV1CompressType(tmpConfig.getV1CompressType());
+ tmpTopicConfigMap.put(tmpConfig2.getUid(), tmpConfig2);
+ }
+ }
+ return tmpTopicConfigMap;
+ }
+
+ private MessageWrapType getPbWrapType(InLongIdObject idObject) {
+ String strWrapType = idObject.getParams().get("wrapType");
+ if (StringUtils.isBlank(strWrapType)) {
+ return MessageWrapType.UNKNOWN;
+ } else {
+ return MessageWrapType.forType(strWrapType);
+ }
+ }
+
+ private InlongCompressType getPbCompressType(InLongIdObject idObject) {
+ String strCompressType =
idObject.getParams().get("inlongCompressType");
+ if (StringUtils.isBlank(strCompressType)) {
+ return
CommonConfigHolder.getInstance().getDefV1MsgCompressType();
+ } else {
+ InlongCompressType msgCompType =
InlongCompressType.forType(strCompressType);
+ if (msgCompType == InlongCompressType.UNKNOWN) {
+ return
CommonConfigHolder.getInstance().getDefV1MsgCompressType();
+ } else {
+ return msgCompType;
+ }
+ }
+ }
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 96c5bd820a..555c55f9cc 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -17,26 +17,19 @@
package org.apache.inlong.dataproxy.config.holder;
-import org.apache.inlong.common.constant.Constants;
-import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
-import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
-import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
-import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.CacheType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.config.pojo.InLongMetaConfig;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import com.google.gson.Gson;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +67,6 @@ public class MetaConfigHolder extends ConfigHolder {
private String tmpDataMd5 = "";
private final AtomicLong lastSyncVersion = new AtomicLong(0);
// cached data
- private final List<String> defTopics = new ArrayList<>();
private final AtomicInteger clusterType = new
AtomicInteger(CacheType.N.getId());
private final ConcurrentHashMap<String, CacheClusterConfig> mqClusterMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSrcMap =
new ConcurrentHashMap<>();
@@ -84,13 +76,6 @@ public class MetaConfigHolder extends ConfigHolder {
super(metaConfigFileName);
}
- public void addDefTopic(String defTopic) {
- if (StringUtils.isBlank(defTopic)) {
- return;
- }
- defTopics.add(defTopic);
- }
-
/**
* get source topic by groupId and streamId
*/
@@ -153,28 +138,46 @@ public class MetaConfigHolder extends ConfigHolder {
}
}
- public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) {
- if (StringUtils.isBlank(inDataMd5)
- || StringUtils.isBlank(inDataJsonStr)) {
- return false;
- }
+ public boolean updateConfigMap(InLongMetaConfig metaConfig) {
+ String inDataJsonStr;
+ // check cache data
synchronized (this.lastSyncVersion) {
if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
- if (inDataJsonStr.equals(tmpDataMd5)) {
+ if (tmpDataMd5.equals(metaConfig.getMd5())) {
return false;
}
- LOG.info("Load changed metadata {} , but reloading content,
over {} ms",
+ LOG.info("Update metadata: NOT UPDATE, {} is loading, but wast
over {} ms",
getFileName(), System.currentTimeMillis() -
this.lastSyncVersion.get());
return false;
} else {
- if (inDataMd5.equals(dataMd5)) {
+ if (dataMd5.equals(metaConfig.getMd5())) {
return false;
}
}
- return storeConfigToFile(inDataMd5, inDataJsonStr);
+ InLongMetaConfig newMetaConfig = buildMixedMetaConfig(metaConfig);
+ try {
+ inDataJsonStr = GSON.toJson(newMetaConfig);
+ } catch (Throwable e) {
+ LOG.error("Update metadata: failure to serial meta config to
json", e);
+ return false;
+ }
+ return storeConfigToFile(inDataJsonStr, newMetaConfig);
}
}
+ private InLongMetaConfig buildMixedMetaConfig(InLongMetaConfig metaConfig)
{
+ // process and check cluster info
+ Map<String, CacheClusterConfig> newClusterConfigMap =
+ new HashMap<>(metaConfig.getClusterConfigMap().size());
+ newClusterConfigMap.putAll(metaConfig.getClusterConfigMap());
+ // process id2topic info
+ Map<String, IdTopicConfig> newIdTopicConfigMap =
+ new HashMap<>(metaConfig.getIdTopicConfigMap().size());
+ newIdTopicConfigMap.putAll(metaConfig.getIdTopicConfigMap());
+ return new InLongMetaConfig(metaConfig.getMd5(),
+ metaConfig.getMqType(), newClusterConfigMap,
newIdTopicConfigMap);
+ }
+
public List<CacheClusterConfig> forkCachedCLusterConfig() {
List<CacheClusterConfig> result = new ArrayList<>();
if (mqClusterMap.isEmpty()) {
@@ -214,264 +217,146 @@ public class MetaConfigHolder extends ConfigHolder {
// check meta update setting
if
(!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile()
&& !ConfigManager.handshakeManagerOk.get()) {
- LOG.warn("Failed to load json config from {}, don't obtain
metadata from the Manager,"
- + " and the startup via the cache file is false",
getFileName());
+ LOG.warn("Load metadata: StartupUsingLocalMetaFile is false, don't
obtain metadata from {}"
+ + " before handshake with Manager", getFileName());
return false;
}
String jsonString = "";
+ InLongMetaConfig metaConfig;
readWriteLock.writeLock().lock();
try {
jsonString = loadConfigFromFile();
if (StringUtils.isBlank(jsonString)) {
- LOG.warn("Load changed json {}, but no records configured",
getFileName());
+ LOG.error("Load metadata: NOT LOADED, changed but empty
records, file:{}", getFileName());
return true;
}
- DataProxyConfigResponse metaConfig =
- GSON.fromJson(jsonString, DataProxyConfigResponse.class);
- // check result tag
- if (!metaConfig.isResult() || metaConfig.getErrCode() !=
DataProxyConfigResponse.SUCC) {
- LOG.warn("Load failed json config from {}, error code is {}",
- getFileName(), metaConfig.getErrCode());
+ try {
+ metaConfig = GSON.fromJson(jsonString, InLongMetaConfig.class);
+ } catch (Throwable e) {
+ LOG.error("Load metadata: NOT LOADED, parse json config
failure, file:{}", getFileName(), e);
return true;
}
- // check cluster data
- DataProxyCluster clusterObj = metaConfig.getData();
- if (clusterObj == null) {
- LOG.warn("Load failed json config from {}, malformed content,
data is null", getFileName());
+ // check required fields
+ ImmutablePair<Boolean, String> paramChkResult =
validRequiredFields(metaConfig);
+ if (!paramChkResult.getLeft()) {
+ LOG.error("Load metadata: NOT LOADED, {}, file:{}",
+ paramChkResult.getRight(), getFileName());
return true;
}
- // update cache data
- if (updateCacheData(jsonString, metaConfig)) {
- LOG.info("Load changed {} file success!", getFileName());
- }
+ // update cached data
+ replaceCacheConfig(metaConfig.getMqType(),
+ metaConfig.getClusterConfigMap(),
metaConfig.getIdTopicConfigMap());
+ this.dataMd5 = metaConfig.getMd5();
+ this.dataStr = jsonString;
+ LOG.info("Load metadata: LOADED success, from {}!", getFileName());
return true;
} catch (Throwable e) {
- LOG.warn("Process json {} changed data {} failure", getFileName(),
jsonString, e);
+ LOG.error("Load metadata: NOT LOADED, load from {} throw
exception", getFileName(), e);
return false;
} finally {
+ if (this.lastSyncVersion.get() == 0) {
+ this.lastUpdVersion.set(System.currentTimeMillis());
+ this.lastSyncVersion.compareAndSet(0,
this.lastUpdVersion.get());
+ } else {
+ this.lastUpdVersion.set(this.lastSyncVersion.get());
+ }
readWriteLock.writeLock().unlock();
}
}
- private boolean updateCacheData(String jsonString, DataProxyConfigResponse
metaConfig) {
- // get and valid inlongIds configure
- ProxyClusterObject proxyClusterObject =
metaConfig.getData().getProxyCluster();
- if (proxyClusterObject == null) {
- LOG.warn("Load failed json config from {}, malformed content,
proxyCluster field is null",
- getFileName());
- return false;
- }
- CacheClusterSetObject clusterSetObject =
metaConfig.getData().getCacheClusterSet();
- if (clusterSetObject == null) {
- LOG.warn("Load failed json config from {}, malformed content,
cacheClusterSet field is null",
- getFileName());
- return false;
- }
- List<InLongIdObject> inLongIds = proxyClusterObject.getInlongIds();
- if (inLongIds == null) {
- LOG.warn("Load failed json config from {}, malformed content,
inlongIds field is null",
- getFileName());
- return false;
- }
- // get mq type
- CacheType mqType = CacheType.convert(clusterSetObject.getType());
- if (mqType == CacheType.N) {
- LOG.warn("Load failed json config from {}, unsupported mq type {}",
- getFileName(), clusterSetObject.getType());
- return false;
+ /**
+ * store meta config to file
+ *
+ * @param metaJsonStr meta info string
+ * @param metaConfig meta info object
+ *
+ * @return store result
+ */
+ private boolean storeConfigToFile(String metaJsonStr, InLongMetaConfig
metaConfig) {
+ boolean isSuccess = false;
+ String filePath = getFilePath();
+ if (StringUtils.isBlank(filePath)) {
+ LOG.error("Store metadata: error in writing file {} as the file
path is null.", getFileName());
+ return isSuccess;
}
- // get mq cluster info
- Map<String, CacheClusterConfig> tmpClusterConfigMap = new HashMap<>();
- for (CacheClusterObject clusterObject :
clusterSetObject.getCacheClusters()) {
- if (clusterObject == null ||
StringUtils.isBlank(clusterObject.getName())) {
- continue;
+ readWriteLock.writeLock().lock();
+ try {
+ File sourceFile = new File(filePath);
+ File targetFile = new File(getNextBackupFileName());
+ File tmpNewFile = new File(getFileName() + ".tmp");
+
+ if (sourceFile.exists()) {
+ FileUtils.copyFile(sourceFile, targetFile);
}
- CacheClusterConfig config = new CacheClusterConfig();
- config.setClusterName(clusterObject.getName());
- config.setToken(clusterObject.getToken());
- config.getParams().putAll(clusterObject.getParams());
- tmpClusterConfigMap.put(config.getClusterName(), config);
- }
- if (tmpClusterConfigMap.isEmpty()) {
- LOG.warn("Load failed json config from {}, no valid {} mq cluster",
- getFileName(), clusterSetObject.getType());
- return false;
- }
- // get topic config info
- Map<String, IdTopicConfig> tmpTopicConfigMap =
buildCacheTopicConfig(mqType, inLongIds);
- replaceCacheConfig(mqType, tmpClusterConfigMap, tmpTopicConfigMap);
- // update cached data
- this.dataMd5 = metaConfig.getMd5();
- this.dataStr = jsonString;
- if (this.lastSyncVersion.get() == 0) {
- this.lastUpdVersion.set(System.currentTimeMillis());
- this.lastSyncVersion.compareAndSet(0, this.lastUpdVersion.get());
- } else {
- this.lastUpdVersion.set(this.lastSyncVersion.get());
+ FileUtils.writeStringToFile(tmpNewFile, metaJsonStr,
StandardCharsets.UTF_8);
+ FileUtils.copyFile(tmpNewFile, sourceFile);
+ tmpNewFile.delete();
+ tmpDataMd5 = metaConfig.getMd5();
+ lastSyncVersion.set(System.currentTimeMillis());
+ isSuccess = true;
+ setFileChanged();
+ } catch (Throwable ex) {
+ LOG.error("Store metadata: exception thrown while writing to file
{}", getFileName(), ex);
+ } finally {
+ readWriteLock.writeLock().unlock();
}
- return true;
+ return isSuccess;
}
+ /**
+ * update locally cached configuration with input information
+ *
+ * @param cacheType mq cluster type
+ * @param clusterConfigMap mq cluster config
+ * @param idTopicConfigMap id to topic config
+ */
private void replaceCacheConfig(CacheType cacheType,
Map<String, CacheClusterConfig> clusterConfigMap,
- Map<String, IdTopicConfig> topicConfigMap) {
+ Map<String, IdTopicConfig> idTopicConfigMap) {
this.clusterType.getAndSet(cacheType.getId());
// remove deleted id2topic config
- Set<String> tmpKeys = new HashSet<>();
+ Set<String> tmpRmvKeys = new HashSet<>();
for (Map.Entry<String, IdTopicConfig> entry :
id2TopicSrcMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
continue;
}
- if (!topicConfigMap.containsKey(entry.getKey())) {
- tmpKeys.add(entry.getKey());
+ if (!idTopicConfigMap.containsKey(entry.getKey())) {
+ tmpRmvKeys.add(entry.getKey());
}
}
- for (String key : tmpKeys) {
+ for (String key : tmpRmvKeys) {
id2TopicSrcMap.remove(key);
}
// add new id2topic source config
- id2TopicSrcMap.putAll(topicConfigMap);
+ id2TopicSrcMap.putAll(idTopicConfigMap);
// add new id2topic sink config
- id2TopicSinkMap.putAll(topicConfigMap);
+ id2TopicSinkMap.putAll(idTopicConfigMap);
// remove deleted cluster config
- tmpKeys.clear();
+ tmpRmvKeys.clear();
for (Map.Entry<String, CacheClusterConfig> entry :
mqClusterMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
continue;
}
if (!clusterConfigMap.containsKey(entry.getKey())) {
- tmpKeys.add(entry.getKey());
+ tmpRmvKeys.add(entry.getKey());
}
}
- for (String key : tmpKeys) {
+ for (String key : tmpRmvKeys) {
mqClusterMap.remove(key);
}
// add new mq cluster config
mqClusterMap.putAll(clusterConfigMap);
}
- private Map<String, IdTopicConfig> buildCacheTopicConfig(
- CacheType mqType, List<InLongIdObject> inLongIds) {
- Map<String, IdTopicConfig> tmpTopicConfigMap = new HashMap<>();
- if (inLongIds.isEmpty()) {
- return tmpTopicConfigMap;
- }
- int index;
- String[] idItems;
- String groupId;
- String streamId;
- String topicName;
- String tenant;
- String nameSpace;
- for (InLongIdObject idObject : inLongIds) {
- if (idObject == null
- || StringUtils.isBlank(idObject.getInlongId())
- || StringUtils.isBlank(idObject.getTopic())) {
- continue;
- }
- // parse inlong id
- idItems = idObject.getInlongId().split("\\.");
- if (idItems.length == 2) {
- if (StringUtils.isBlank(idItems[0])) {
- continue;
- }
- groupId = idItems[0].trim();
- streamId = idItems[1].trim();
- } else {
- groupId = idObject.getInlongId().trim();
- streamId = "";
- }
- topicName = idObject.getTopic().trim();
- // change full topic name "pulsar-xxx/test/base_topic_name" to
- // base topic name "base_topic_name"
- index = topicName.lastIndexOf('/');
- if (index >= 0) {
- topicName = topicName.substring(index + 1).trim();
- }
- tenant =
idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, "");
- nameSpace =
idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, "");
- if (StringUtils.isBlank(idObject.getTopic())) {
- // namespace field must exist and value not be empty,
- // otherwise it is an illegal configuration item.
- continue;
- }
- if (mqType.equals(CacheType.TUBE)) {
- topicName = nameSpace;
- } else if (mqType.equals(CacheType.KAFKA)) {
- if (topicName.equals(streamId)) {
- topicName =
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName);
- }
- }
- IdTopicConfig tmpConfig = new IdTopicConfig();
- tmpConfig.setInlongGroupIdAndStreamId(groupId, streamId);
- tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
- tmpConfig.setTopicName(topicName);
- tmpConfig.setParams(idObject.getParams());
- tmpConfig.setDataType(DataTypeEnum.convert(
- idObject.getParams().getOrDefault("dataType",
DataTypeEnum.TEXT.getType())));
-
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter",
"|"));
-
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter",
"\n"));
- tmpConfig.setUseExtendedFields(Boolean.valueOf(
- idObject.getParams().getOrDefault("useExtendedFields",
"false")));
- tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig);
- if (mqType.equals(CacheType.TUBE)
- && !tmpConfig.getUid().equals(tmpConfig.getInlongGroupId())
- && tmpTopicConfigMap.get(tmpConfig.getInlongGroupId()) ==
null) {
- IdTopicConfig tmpConfig2 = new IdTopicConfig();
- tmpConfig2.setInlongGroupIdAndStreamId(groupId, "");
- tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
- tmpConfig2.setTopicName(topicName);
- tmpConfig2.setDataType(tmpConfig.getDataType());
- tmpConfig2.setFieldDelimiter(tmpConfig.getFieldDelimiter());
- tmpConfig2.setFileDelimiter(tmpConfig.getFileDelimiter());
- tmpConfig2.setParams(tmpConfig.getParams());
- tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig2);
- }
- }
- return tmpTopicConfigMap;
- }
-
/**
- * store meta config to file
- */
- private boolean storeConfigToFile(String inDataMd5, String metaJsonStr) {
- boolean isSuccess = false;
- String filePath = getFilePath();
- if (StringUtils.isBlank(filePath)) {
- LOG.error("Error in writing file {} as the file path is null.",
getFileName());
- return isSuccess;
- }
- readWriteLock.writeLock().lock();
- try {
- File sourceFile = new File(filePath);
- File targetFile = new File(getNextBackupFileName());
- File tmpNewFile = new File(getFileName() + ".tmp");
-
- if (sourceFile.exists()) {
- FileUtils.copyFile(sourceFile, targetFile);
- }
- FileUtils.writeStringToFile(tmpNewFile, metaJsonStr,
StandardCharsets.UTF_8);
- FileUtils.copyFile(tmpNewFile, sourceFile);
- tmpNewFile.delete();
- tmpDataMd5 = inDataMd5;
- lastSyncVersion.set(System.currentTimeMillis());
- isSuccess = true;
- setFileChanged();
- } catch (Throwable ex) {
- LOG.error("Error in writing file {}", getFileName(), ex);
- } finally {
- readWriteLock.writeLock().unlock();
- }
- return isSuccess;
- }
-
- /**
- * load from holder
+ * load configure from holder
+ *
+ * @return the configure info
*/
private String loadConfigFromFile() {
String result = "";
if (StringUtils.isBlank(getFileName())) {
- LOG.error("Fail to load json {} as the file name is null.",
getFileName());
+ LOG.error("Load metadata: fail to load json {} as the file name is
null.", getFileName());
return result;
}
InputStream inStream = null;
@@ -479,12 +364,12 @@ public class MetaConfigHolder extends ConfigHolder {
URL url = getClass().getClassLoader().getResource(getFileName());
inStream = url != null ? url.openStream() : null;
if (inStream == null) {
- LOG.error("Fail to load json {} as the input stream is null",
getFileName());
+ LOG.error("Load metadata: fail to load json {} as the input
stream is null", getFileName());
return result;
}
int size = inStream.available();
if (size > MAX_ALLOWED_JSON_FILE_SIZE) {
- LOG.error("Fail to load json {} as the content size({}) over
max allowed size({})",
+ LOG.error("Load metadata: fail to load json {} as the content
size({}) over max allowed size({})",
getFileName(), size, MAX_ALLOWED_JSON_FILE_SIZE);
return result;
}
@@ -492,16 +377,45 @@ public class MetaConfigHolder extends ConfigHolder {
inStream.read(buffer);
result = new String(buffer, StandardCharsets.UTF_8);
} catch (Throwable e) {
- LOG.error("Fail to load json {}", getFileName(), e);
+ LOG.error("Load metadata: exception thrown while load from file
{}", getFileName(), e);
} finally {
if (null != inStream) {
try {
inStream.close();
} catch (IOException e) {
- LOG.error("Fail in inStream.close for file {}",
getFileName(), e);
+ LOG.error("Load metadata: fail in inStream.close for file
{}", getFileName(), e);
}
}
}
return result;
}
+
+ /**
+ * check required fields status
+ *
+ * @param metaConfig response from Manager
+ *
+ * @return check result
+ */
+ public ImmutablePair<Boolean, String> validRequiredFields(InLongMetaConfig
metaConfig) {
+ if (metaConfig == null) {
+ return ImmutablePair.of(false, "metaConfig object is null");
+ } else if (metaConfig.getMd5() == null) {
+ return ImmutablePair.of(false, "metaConfig.md5 field is null");
+ } else if (metaConfig.getMqType() == null) {
+ return ImmutablePair.of(false, "metaConfig.mqType field is null");
+ } else if (metaConfig.getMqType() == CacheType.N) {
+ return ImmutablePair.of(false, "metaConfig.mqType value is
CacheType.N");
+ } else if (metaConfig.getClusterConfigMap() == null) {
+ return ImmutablePair.of(false, "metaConfig.clusterConfigMap field
is null");
+ } else if (metaConfig.getClusterConfigMap().isEmpty()) {
+ return ImmutablePair.of(false, "metaConfig.clusterConfigMap field
is empty");
+ } else if (metaConfig.getIdTopicConfigMap() == null) {
+ return ImmutablePair.of(false, "metaConfig.idTopicConfigMap field
is null");
+ } else if (metaConfig.getIdTopicConfigMap().isEmpty()) {
+ return ImmutablePair.of(false, "metaConfig.idTopicConfigMap is
empty");
+ }
+ return ImmutablePair.of(true, "ok");
+ }
+
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
index 094aee59b3..6c48467c1e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/**
*
@@ -94,4 +95,23 @@ public class CacheClusterConfig {
.append("params", params)
.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CacheClusterConfig)) {
+ return false;
+ }
+ CacheClusterConfig that = (CacheClusterConfig) o;
+ return Objects.equals(clusterName, that.clusterName)
+ && Objects.equals(token, that.token)
+ && Objects.equals(params, that.params);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clusterName, token, params);
+ }
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
index 0d8a5430d4..2ac72981a2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
@@ -73,7 +73,16 @@ public enum CacheType {
*/
public static CacheType convert(String value) {
for (CacheType v : values()) {
- if (v.value().equals(value)) {
+ if (v.value().equalsIgnoreCase(value)) {
+ return v;
+ }
+ }
+ return N;
+ }
+
+ public static CacheType valueOf(int idValue) {
+ for (CacheType v : values()) {
+ if (v.getId() == idValue) {
return v;
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index 48b19d446d..8d2a01915d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -18,6 +18,8 @@
package org.apache.inlong.dataproxy.config.pojo;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.InlongCompressType;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import org.apache.commons.lang.StringUtils;
@@ -25,6 +27,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/**
* IdTopicConfig
@@ -42,6 +45,8 @@ public class IdTopicConfig {
private String fieldDelimiter = "|";
private String fileDelimiter = "\n";
private Boolean useExtendedFields = false;
+ private MessageWrapType msgWrapType = MessageWrapType.UNKNOWN;
+ private InlongCompressType v1CompressType =
InlongCompressType.INLONG_SNAPPY;
private Map<String, String> params = new HashMap<>();
@@ -49,7 +54,7 @@ public class IdTopicConfig {
}
- public Boolean getUseExtendedFields() {
+ public boolean isUseExtendedFields() {
return useExtendedFields;
}
@@ -57,6 +62,22 @@ public class IdTopicConfig {
this.useExtendedFields = useExtendedFields;
}
+ public MessageWrapType getMsgWrapType() {
+ return msgWrapType;
+ }
+
+ public void setMsgWrapType(MessageWrapType msgWrapType) {
+ this.msgWrapType = msgWrapType;
+ }
+
+ public InlongCompressType getV1CompressType() {
+ return v1CompressType;
+ }
+
+ public void setV1CompressType(InlongCompressType v1CompressType) {
+ this.v1CompressType = v1CompressType;
+ }
+
/**
* get uid
* @return the uid
@@ -221,11 +242,42 @@ public class IdTopicConfig {
.append("inlongGroupId", inlongGroupId)
.append("inlongStreamid", inlongStreamid)
.append("topicName", topicName)
+ .append("tenant", tenant)
.append("nameSpace", nameSpace)
.append("dataType", dataType)
.append("fieldDelimiter", fieldDelimiter)
.append("fileDelimiter", fileDelimiter)
+ .append("useExtendedFields", useExtendedFields)
+ .append("msgWrapType", msgWrapType)
+ .append("pbCompressType", v1CompressType)
.append("params", params)
.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IdTopicConfig)) {
+ return false;
+ }
+ IdTopicConfig that = (IdTopicConfig) o;
+ return uid.equals(that.uid) && Objects.equals(inlongGroupId,
that.inlongGroupId)
+ && Objects.equals(inlongStreamid, that.inlongStreamid) &&
topicName.equals(that.topicName)
+ && Objects.equals(tenant, that.tenant) &&
Objects.equals(nameSpace, that.nameSpace)
+ && dataType == that.dataType && Objects.equals(fieldDelimiter,
that.fieldDelimiter)
+ && Objects.equals(fileDelimiter, that.fileDelimiter)
+ && Objects.equals(useExtendedFields, that.useExtendedFields)
+ && Objects.equals(msgWrapType, that.msgWrapType)
+ && v1CompressType == that.v1CompressType
+ && Objects.equals(params, that.params);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(uid, inlongGroupId, inlongStreamid, topicName,
tenant, nameSpace,
+ dataType, fieldDelimiter, fileDelimiter, useExtendedFields,
msgWrapType,
+ v1CompressType, params);
+ }
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java
new file mode 100644
index 0000000000..4d80ee4d13
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.pojo;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+import java.util.Map;
+
+public class InLongMetaConfig {
+
+ private String md5;
+ private CacheType mqType;
+ private Map<String, CacheClusterConfig> clusterConfigMap;
+ private Map<String, IdTopicConfig> idTopicConfigMap;
+
+ public InLongMetaConfig() {
+
+ }
+
+ public InLongMetaConfig(String md5, CacheType mqType,
+ Map<String, CacheClusterConfig> clusterConfigMap,
+ Map<String, IdTopicConfig> idTopicConfigMap) {
+ this.md5 = md5;
+ this.mqType = mqType;
+ this.clusterConfigMap = clusterConfigMap;
+ this.idTopicConfigMap = idTopicConfigMap;
+ }
+
+ public String getMd5() {
+ return md5;
+ }
+
+ public CacheType getMqType() {
+ return mqType;
+ }
+
+ public Map<String, CacheClusterConfig> getClusterConfigMap() {
+ return clusterConfigMap;
+ }
+
+ public Map<String, IdTopicConfig> getIdTopicConfigMap() {
+ return idTopicConfigMap;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("md5", md5)
+ .append("mqType", mqType)
+ .append("clusterConfigMap", clusterConfigMap)
+ .append("idTopicConfigMap", idTopicConfigMap)
+ .toString();
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
similarity index 51%
rename from
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
rename to
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
index 3eb12d6a8c..9ead853540 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.dataproxy.source;
+package org.apache.inlong.dataproxy.consts;
public class SourceConstants {
@@ -105,95 +105,4 @@ public class SourceConstants {
public static final String SRC_PROTOCOL_TYPE_TCP = "tcp";
public static final String SRC_PROTOCOL_TYPE_UDP = "udp";
public static final String SRC_PROTOCOL_TYPE_HTTP = "http";
-
- public static final String SERVICE_PROCESSOR_NAME = "service-decoder-name";
- public static final String ENABLE_EXCEPTION_RETURN =
"enableExceptionReturn";
-
- public static final String TRAFFIC_CLASS = "trafficClass";
-
- public static final String HEART_INTERVAL_SEC = "heart-interval-sec";
-
- public static final String PACKAGE_TIMEOUT_SEC = "package-timeout-sec";
-
- public static final String HEART_SERVERS = "heart-servers";
-
- public static final String TOPIC_KEY = "topic";
- public static final String REMOTE_IP_KEY = "srcIp";
- public static final String DATAPROXY_IP_KEY = "dpIp";
- public static final String MSG_ENCODE_VER = "msgEnType";
- public static final String REMOTE_IDC_KEY = "idc";
- public static final String MSG_COUNTER_KEY = "msgcnt";
- public static final String PKG_COUNTER_KEY = "pkgcnt";
- public static final String PKG_TIME_KEY = "msg.pkg.time";
- public static final String TRANSFER_KEY = "transfer";
- public static final String DEST_IP_KEY = "dstIp";
- public static final String INTERFACE_KEY = "interface";
- public static final String SINK_MIN_METRIC_KEY = "sink-min-metric-topic";
- public static final String SINK_HOUR_METRIC_KEY = "sink-hour-metric-topic";
- public static final String SINK_TEN_METRIC_KEY = "sink-ten-metric-topic";
- public static final String SINK_QUA_METRIC_KEY = "sink-qua-metric-topic";
- public static final String L5_MIN_METRIC_KEY = "l5-min-metric-topic";
- public static final String L5_MIN_FAIL_METRIC_KEY =
"l5-min-fail-metric-key";
- public static final String L5_HOUR_METRIC_KEY = "l5-hour-metric-topic";
- public static final String L5_ID_KEY = "l5id";
- public static final String SET_KEY = "set";
- public static final String CLUSTER_ID_KEY = "clusterId";
-
- public static final String DECODER_BODY = "body";
- public static final String DECODER_TOPICKEY = "topic_key";
- public static final String DECODER_ATTRS = "attrs";
- public static final String MSG_TYPE = "msg_type";
- public static final String COMPRESS_TYPE = "compress_type";
- public static final String EXTRA_ATTR = "extra_attr";
- public static final String COMMON_ATTR_MAP = "common_attr_map";
- public static final String MSG_LIST = "msg_list";
- public static final String VERSION_TYPE = "version";
- public static final String FILE_CHECK_DATA = "file-check-data";
- public static final String MINUTE_CHECK_DATA = "minute-check-data";
- public static final String SLA_METRIC_DATA = "sla-metric-data";
- public static final String SLA_METRIC_GROUPID = "manager_sla_metric";
-
- public static final String FILE_BODY = "file-body";
- public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;
-
- public static final String SEQUENCE_ID = "sequencial_id";
-
- public static final String TOTAL_LEN = "totalLen";
-
- public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT =
"link_max_allowed_delayed_msg_count";
- public static final String SESSION_WARN_DELAYED_MSG_COUNT =
"session_warn_delayed_msg_count";
- public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT =
"session_max_allowed_delayed_msg_count";
- public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK =
"netty_write_buffer_high_water_mark";
- public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
-
- public static final String MANAGER_PATH = "/inlong/manager/openapi";
- public static final String MANAGER_GET_CONFIG_PATH =
"/dataproxy/getConfig";
- public static final String MANAGER_GET_ALL_CONFIG_PATH =
"/dataproxy/getAllConfig";
- public static final String MANAGER_HEARTBEAT_REPORT = "/heartbeat/report";
-
- public static final String MANAGER_AUTH_SECRET_ID =
"manager.auth.secretId";
- public static final String MANAGER_AUTH_SECRET_KEY =
"manager.auth.secretKey";
- // Pulsar config
- public static final String KEY_TENANT = "tenant";
- public static final String KEY_NAMESPACE = "namespace";
-
- public static final String KEY_SERVICE_URL = "serviceUrl";
- public static final String KEY_AUTHENTICATION = "authentication";
- public static final String KEY_STATS_INTERVAL_SECONDS =
"statsIntervalSeconds";
-
- public static final String KEY_ENABLEBATCHING = "enableBatching";
- public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
- public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
- public static final String KEY_BATCHINGMAXPUBLISHDELAY =
"batchingMaxPublishDelay";
- public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
- public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS =
"maxPendingMessagesAcrossPartitions";
- public static final String KEY_SENDTIMEOUT = "sendTimeout";
- public static final String KEY_COMPRESSIONTYPE = "compressionType";
- public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
- public static final String
KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
- + "BatchingPartitionSwitchFrequency";
-
- public static final String KEY_IOTHREADS = "ioThreads";
- public static final String KEY_MEMORYLIMIT = "memoryLimit";
- public static final String KEY_CONNECTIONSPERBROKER =
"connectionsPerBroker";
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index c317bba799..acf0e4d8a4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -25,6 +25,7 @@ import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.AttrConstants;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index d68a8b4df2..517c11113b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -17,6 +17,8 @@
package org.apache.inlong.dataproxy.source;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
+
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
index 64375733fd..3a90e9f10a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source;
import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import com.google.common.base.Preconditions;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 5455cdddb4..b343dc4dcb 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source;
import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.apache.inlong.dataproxy.utils.EventLoopUtil;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index fef78bdd1b..380fc3dab9 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source;
import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
index 71516da004..04322fe339 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
@@ -17,7 +17,7 @@
package org.apache.inlong.dataproxy.utils;
-import org.apache.inlong.dataproxy.source.SourceConstants;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
import io.netty.channel.Channel;
import org.apache.commons.lang3.StringUtils;
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json
b/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json
index 074be5958c..b9de51bafb 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json
@@ -1 +1 @@
-{"result":true,"errCode":0,"md5":"5a3f5939bb7368f493bf41c1d785b8f3","data":{"proxyCluster":{"name":"test_dataproxy","setName":"test_set","zone":"default\u003dtrue","channels":[],"inlongIds":[{"inlongId":"test_group.stream1","topic":"stream1","params":{"namespace":"test_group","ignoreParseError":"true","appGroupName":"app_test_group","productId":"58","productName":"test_meta","wrapWithInlongMsg":"true"}}],"sources":[],"sinks":[]},"cacheClusterSet":{"setName":"test_set","type":"TUBEMQ","ca
[...]
+{"md5":"5a3f5939bb7368f493bf41c1d785b8f3","mqType":"TUBE","clusterConfigMap":{"test_tubemq":{"clusterName":
"test_tubemq","token": "******","zone": "default=true","params":
{"masterWebUrl": "http://127.0.0.1:8080","messageQueueHandler":
"org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler","master-host-port-list":
"127.0.0.1:8000"}}},"idTopicConfigMap":{"test_group.stream1":{"uid":"test_group.stream1","inlongGroupId":"test_group","inlongStreamid":"stream1","topicName":"test_group","data
[...]
\ No newline at end of file