This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 60c2676223 [INLONG-10066][DataProxy] Optimize the related
configuration settings in the CommonConfigHolder.java file (#10079)
60c2676223 is described below
commit 60c2676223991af57e88d0e3982953f62834a27c
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Apr 26 10:54:06 2024 +0800
[INLONG-10066][DataProxy] Optimize the related configuration settings in
the CommonConfigHolder.java file (#10079)
---
.../inlong/common/enums/InlongCompressType.java | 6 +
inlong-dataproxy/conf/common.properties | 12 +-
.../dataproxy/config/CommonConfigHolder.java | 519 +++++++++++++--------
.../dataproxy/sink/common/DefaultEventHandler.java | 10 +-
.../inlong/dataproxy/sink/common/EventHandler.java | 6 +-
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 2 +-
.../sink/mq/MessageQueueZoneSinkContext.java | 9 +-
.../inlong/dataproxy/sink/mq/PackProfile.java | 2 +-
.../dataproxy/source/ServerMessageHandler.java | 2 +-
.../inlong/dataproxy/source/SourceConstants.java | 2 +
.../inlong/dataproxy/utils/AddressUtils.java | 52 +++
.../config/holder/TestCommonConfigHolder.java | 2 +-
.../src/test/resources/common.properties | 8 +-
13 files changed, 409 insertions(+), 223 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
index 24aa16898e..2869a6adad 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
@@ -17,7 +17,10 @@
package org.apache.inlong.common.enums;
+import com.google.common.collect.Sets;
+
import java.util.Objects;
+import java.util.Set;
public enum InlongCompressType {
@@ -26,6 +29,9 @@ public enum InlongCompressType {
INLONG_SNAPPY(2, "INLONG_SNAPPY", "The message compressed with inlong
snappy"),
UNKNOWN(99, "UNKNOWN", "Unknown compress type");
+ public static final Set<String> allowedCompressTypes =
+ Sets.newHashSet(NONE.getName(), INLONG_GZ.getName(),
INLONG_SNAPPY.getName());
+
private final int id;
private final String name;
private final String desc;
diff --git a/inlong-dataproxy/conf/common.properties
b/inlong-dataproxy/conf/common.properties
index d622c345af..6d9e399867 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -18,24 +18,24 @@
# manager open api address and auth key
manager.hosts=127.0.0.1:8083
-manager.auth.secretId=
-manager.auth.secretKey=
+manager.auth.secret.id=
+manager.auth.secret.key=
# proxy cluster name
proxy.cluster.name=default_dataproxy
proxy.cluster.tag=default_cluster
-proxy.cluster.extTag=default=true
-proxy.cluster.inCharges=admin
+proxy.cluster.ext.tag=default=true
+proxy.cluster.incharges=admin
# synchronize interval of meta config (millisecond)
meta.config.sync.interval.ms=10000
# whether to startup using the local metadata.json file without connecting to
the Manager
-startup.using.local.meta.file.enable=false
+meta.config.startup.using.local.file.enable=false
# metric config
metricDomains=DataProxy
metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
metricDomains.DataProxy.snapshotInterval=60000
-prometheusHttpPort=9081
+online.metric.prometheus.http.port=9081
# whether to enable audit
audit.enable=true
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index 92d0bba627..a788702489 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -17,10 +17,12 @@
package org.apache.inlong.dataproxy.config;
+import org.apache.inlong.common.enums.InlongCompressType;
import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler;
import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk;
+import org.apache.inlong.dataproxy.utils.AddressUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Context;
@@ -32,7 +34,6 @@ import java.io.InputStream;
import java.net.URL;
import java.security.SecureRandom;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -50,6 +51,11 @@ public class CommonConfigHolder {
// list split separator
private static final String VAL_CONFIG_ITEMS_SEPARATOR = ",|\\s+";
// **** allowed keys and default value, begin
+ // proxy node id
+ @Deprecated
+ private static final String KEY_PROXY_NODE_ID = "nodeId";
+ private static final String KEY_PROXY_NODE_IDV2 = "proxy.node.id";
+ private static final String VAL_DEF_PROXY_NODE_ID = "127.0.0.1";
// cluster tag
private static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
private static final String VAL_DEF_CLUSTER_TAG = "default_cluster";
@@ -94,74 +100,95 @@ public class CommonConfigHolder {
private static final String KEY_ENABLE_STARTUP_USING_LOCAL_META_FILEV2 =
"meta.config.startup.using.local.file.enable";
private static final boolean VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE
= false;
- // whether to accept messages without mapping between groupId/streamId and
topic
- public static final String KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT =
"id2topic.unconfigured.accept.enable";
- public static final boolean VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT =
false;
- // default topics configure key, multiple topic settings are separated by
"\\s+".
- public static final String KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS =
"id2topic.unconfigured.default.topics";
- public static final String VAL_DEFAULT_TOPIC = "test";
- // whether enable whitelist, optional field.
- public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
- public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
// whether enable file metric, optional field.
- public static final String KEY_ENABLE_FILE_METRIC = "file.metric.enable";
- public static final boolean VAL_DEF_ENABLE_FILE_METRIC = true;
+ private static final String KEY_ENABLE_FILE_METRIC = "file.metric.enable";
+ private static final boolean VAL_DEF_ENABLE_FILE_METRIC = true;
// file metric statistic interval (second)
- public static final String KEY_FILE_METRIC_STAT_INTERVAL_SEC =
"file.metric.stat.interval.sec";
- public static final int VAL_DEF_FILE_METRIC_STAT_INVL_SEC = 60;
- public static final int VAL_MIN_FILE_METRIC_STAT_INVL_SEC = 0;
+ private static final String KEY_FILE_METRIC_STAT_INTERVAL_SEC =
"file.metric.stat.interval.sec";
+ private static final int VAL_DEF_FILE_METRIC_STAT_INVL_SEC = 60;
+ private static final int VAL_MIN_FILE_METRIC_STAT_INVL_SEC = 0;
// file metric max statistic key count
- public static final String KEY_FILE_METRIC_MAX_CACHE_CNT =
"file.metric.max.cache.cnt";
- public static final int VAL_DEF_FILE_METRIC_MAX_CACHE_CNT = 1000000;
- public static final int VAL_MIN_FILE_METRIC_MAX_CACHE_CNT = 0;
+ private static final String KEY_FILE_METRIC_MAX_CACHE_CNT =
"file.metric.max.cache.cnt";
+ private static final int VAL_DEF_FILE_METRIC_MAX_CACHE_CNT = 1000000;
+ private static final int VAL_MIN_FILE_METRIC_MAX_CACHE_CNT = 0;
// source metric statistic name
- public static final String KEY_FILE_METRIC_SOURCE_OUTPUT_NAME =
"file.metric.source.output.name";
- public static final String VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME =
"Source";
+ private static final String KEY_FILE_METRIC_SOURCE_OUTPUT_NAME =
"file.metric.source.output.name";
+ private static final String VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME =
"Source";
// sink metric statistic name
- public static final String KEY_FILE_METRIC_SINK_OUTPUT_NAME =
"file.metric.sink.output.name";
- public static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
+ private static final String KEY_FILE_METRIC_SINK_OUTPUT_NAME =
"file.metric.sink.output.name";
+ private static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
// event metric statistic name
- public static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME =
"file.metric.event.output.name";
- public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME = "Stats";
+ private static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME =
"file.metric.event.output.name";
+ private static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME =
"Stats";
+ // prometheus http port
+ @Deprecated
+ private static final String KEY_PROMETHEUS_HTTP_PORT =
"prometheusHttpPort";
+ private static final String KEY_PROMETHEUS_HTTP_PORTV2 =
"online.metric.prometheus.http.port";
+ private static final int VAL_DEF_PROMETHEUS_HTTP_PORT = 8080;
// Audit fields
- public static final String KEY_ENABLE_AUDIT = "audit.enable";
- public static final boolean VAL_DEF_ENABLE_AUDIT = true;
- public static final String KEY_AUDIT_PROXYS = "audit.proxys";
- public static final String KEY_AUDIT_FILE_PATH = "audit.filePath";
- public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/";
- public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows";
- public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
- public static final String KEY_AUDIT_FORMAT_INTERVAL_MS =
"auditFormatInterval";
- public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
+ private static final String KEY_ENABLE_AUDIT = "audit.enable";
+ private static final boolean VAL_DEF_ENABLE_AUDIT = true;
+ private static final String KEY_AUDIT_PROXYS = "audit.proxys";
+ @Deprecated
+ private static final String KEY_AUDIT_FILE_PATH = "audit.filePath";
+ private static final String KEY_AUDIT_FILE_PATHV2 = "audit.file.path";
+ private static final String VAL_DEF_AUDIT_FILE_PATH =
"/data/inlong/audit/";
+ @Deprecated
+ private static final String KEY_AUDIT_MAX_CACHE_ROWS =
"audit.maxCacheRows";
+ private static final String KEY_AUDIT_MAX_CACHE_ROWSV2 =
"audit.max.cache.rows";
+ private static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
+ @Deprecated
+ private static final String KEY_AUDIT_FORMAT_INTERVAL_MS =
"auditFormatInterval";
+ private static final String KEY_AUDIT_TIME_FORMAT_INTERVAL =
"audit.time.format.intvl.ms";
+ private static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
+
+ // v1 msg whether response by sink
+ private static final String KEY_V1MSG_RESPONSE_BY_SINK =
"isResponseAfterSave";
+ private static final String KEY_V1MSG_RESPONSE_BY_SINKV2 =
"proxy.v1msg.response.by.sink.enable";
+ private static final boolean VAL_DEF_V1MSG_RESPONSE_BY_SINK = false;
+ // v1 msg sent compress type
+ @Deprecated
+ private static final String KEY_V1MSG_SENT_COMPRESS_TYPE = "compressType";
+ private static final String KEY_V1MSG_SENT_COMPRESS_TYPEV2 =
"proxy.v1msg.compress.type";
+ private static final InlongCompressType VAL_DEF_V1MSG_COMPRESS_TYPE =
InlongCompressType.INLONG_SNAPPY;
+ // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
+ private static final String KEY_MAX_RAS_TIMEOUT_MS = "maxRASTimeoutMs";
+ private static final long VAL_DEF_MAX_RAS_TIMEOUT_MS = 10000L;
+
+ // max buffer queue size in KB
+ @Deprecated
+ private static final String KEY_DEF_BUFFERQUEUE_SIZE_KB =
"maxBufferQueueSizeKb";
+ private static final String KEY_DEF_BUFFERQUEUE_SIZE_KBV2 =
"proxy.def.buffer.queue.size.KB";
+ private static final int VAL_DEF_BUFFERQUEUE_SIZE_KB = 128 * 1024;
// whether to retry after the message send failure
- public static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILURE =
"send.retry.after.failure";
- public static final boolean VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE = true;
+ @Deprecated
+ private static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILURE =
"send.retry.after.failure";
+ private static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILUREV2 =
"msg.send.failure.retry.enable";
+ private static final boolean VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE =
true;
// max retry count
- public static final String KEY_MAX_RETRIES_AFTER_FAILURE =
"max.retries.after.failure";
- public static final int VAL_DEF_MAX_RETRIES_AFTER_FAILURE = -1;
- public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
- public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false;
- // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
- public static final String KEY_MAX_RAS_TIMEOUT_MS = "maxRASTimeoutMs";
- public static final long VAL_DEF_MAX_RAS_TIMEOUT_MS = 10000L;
- // max buffer queue size in Kb
- public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB =
"maxBufferQueueSizeKb";
- public static final int VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
+ private static final String KEY_MAX_RETRIES_AFTER_FAILURE =
"max.retries.after.failure";
+ private static final String KEY_MAX_RETRIES_AFTER_FAILUREV2 =
"msg.max.retries";
+ private static final int VAL_DEF_MAX_RETRIES_AFTER_FAILURE = -1;
+ // whether to accept messages without mapping between groupId/streamId and
topic
+ private static final String KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT =
"id2topic.unconfigured.accept.enable";
+ private static final boolean VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT =
false;
+ // default topics configure key, multiple topic settings are separated by
"\\s+".
+ private static final String KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS =
"id2topic.unconfigured.default.topics";
+ // whether enable whitelist, optional field.
+ @Deprecated
+ private static final String KEY_ENABLE_WHITELIST =
"proxy.enable.whitelist";
+ private static final String KEY_ENABLE_WHITELISTV2 =
"proxy.visit.whitelist.enable";
+ private static final boolean VAL_DEF_ENABLE_WHITELIST = false;
+
// event handler
- public static final String KEY_EVENT_HANDLER = "eventHandler";
- public static final String VAL_DEF_EVENT_HANDLER =
DefaultEventHandler.class.getName();
+ private static final String KEY_EVENT_HANDLER = "eventHandler";
+ private static final String VAL_DEF_EVENT_HANDLER =
DefaultEventHandler.class.getName();
// cache cluster selector
- public static final String KEY_CACHE_CLUSTER_SELECTOR =
"cacheClusterSelector";
- public static final String VAL_DEF_CACHE_CLUSTER_SELECTOR =
AllCacheClusterSelector.class.getName();
- // proxy node id
- public static final String KEY_PROXY_NODE_ID = "nodeId";
- public static final String VAL_DEF_PROXY_NODE_ID = "127.0.0.1";
- // msg sent compress type
- public static final String KEY_MSG_SENT_COMPRESS_TYPE = "compressType";
- public static final String VAL_DEF_MSG_COMPRESS_TYPE =
ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name();
- // prometheus http port
- public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
- public static final int VAL_DEF_PROMETHEUS_HTTP_PORT = 8080;
+ @Deprecated
+ private static final String KEY_CACHE_CLUSTER_SELECTOR =
"cacheClusterSelector";
+ private static final String KEY_CACHE_CLUSTER_SELECTORV2 =
"proxy.mq.cluster.selector";
+ private static final String VAL_DEF_CACHE_CLUSTER_SELECTOR =
AllCacheClusterSelector.class.getName();
+
// **** allowed keys and default value, end
// class instance
@@ -186,24 +213,24 @@ public class CommonConfigHolder {
private String auditFilePath = VAL_DEF_AUDIT_FILE_PATH;
private int auditMaxCacheRows = VAL_DEF_AUDIT_MAX_CACHE_ROWS;
private long auditFormatInvlMs = VAL_DEF_AUDIT_FORMAT_INTERVAL_MS;
- private boolean responseAfterSave = VAL_DEF_RESPONSE_AFTER_SAVE;
- private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
private boolean enableUnConfigTopicAccept =
VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT;
- private List<String> defaultTopics = Arrays.asList(VAL_DEFAULT_TOPIC);
- private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
- private int maxBufferQueueSizeKb = VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB;
- private String eventHandler = VAL_DEF_EVENT_HANDLER;
- private String cacheClusterSelector = VAL_DEF_CACHE_CLUSTER_SELECTOR;
- private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
- private String msgCompressType = VAL_DEF_MSG_COMPRESS_TYPE;
- private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+ private final List<String> defaultTopics = new ArrayList<>();
private boolean enableFileMetric = VAL_DEF_ENABLE_FILE_METRIC;
private int fileMetricStatInvlSec = VAL_DEF_FILE_METRIC_STAT_INVL_SEC;
private int fileMetricStatCacheCnt = VAL_DEF_FILE_METRIC_MAX_CACHE_CNT;
private String fileMetricSourceOutName =
VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME;
private String fileMetricSinkOutName =
VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME;
private String fileMetricEventOutName =
VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME;
- private boolean enableSendRetryAfterFailure =
VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE;
+ private InlongCompressType defV1MsgCompressType =
VAL_DEF_V1MSG_COMPRESS_TYPE;
+ private boolean defV1MsgResponseBySink = VAL_DEF_V1MSG_RESPONSE_BY_SINK;
+ private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
+ private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
+ private int defBufferQueueSizeKB = VAL_DEF_BUFFERQUEUE_SIZE_KB;
+ private String eventHandler = VAL_DEF_EVENT_HANDLER;
+ private String cacheClusterSelector = VAL_DEF_CACHE_CLUSTER_SELECTOR;
+ private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
+ private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+ private boolean sendRetryAfterFailure =
VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE;
private int maxRetriesAfterFailure = VAL_DEF_MAX_RETRIES_AFTER_FAILURE;
/**
@@ -218,6 +245,7 @@ public class CommonConfigHolder {
instance = new CommonConfigHolder();
if (instance.loadConfigFile()) {
instance.preReadFields();
+ LOG.info("{} load and read result is: {}",
COMMON_CONFIG_FILE_NAME, instance.toString());
}
isInit = true;
}
@@ -337,16 +365,16 @@ public class CommonConfigHolder {
return auditFormatInvlMs;
}
- public boolean isResponseAfterSave() {
- return responseAfterSave;
+ public boolean isDefV1MsgResponseBySink() {
+ return defV1MsgResponseBySink;
}
public long getMaxResAfterSaveTimeout() {
return maxResAfterSaveTimeout;
}
- public int getMaxBufferQueueSizeKb() {
- return maxBufferQueueSizeKb;
+ public int getDefBufferQueueSizeKB() {
+ return defBufferQueueSizeKB;
}
public boolean isEnableStartupUsingLocalMetaFile() {
@@ -369,8 +397,8 @@ public class CommonConfigHolder {
return proxyNodeId;
}
- public String getMsgCompressType() {
- return msgCompressType;
+ public InlongCompressType getDefV1MsgCompressType() {
+ return defV1MsgCompressType;
}
public String getFileMetricSourceOutName() {
@@ -385,8 +413,8 @@ public class CommonConfigHolder {
return fileMetricEventOutName;
}
- public boolean isEnableSendRetryAfterFailure() {
- return enableSendRetryAfterFailure;
+ public boolean isSendRetryAfterFailure() {
+ return sendRetryAfterFailure;
}
public int getMaxRetriesAfterFailure() {
@@ -406,161 +434,86 @@ public class CommonConfigHolder {
this.clusterName = tmpValue.trim();
}
// read cluster incharges
- tmpValue = this.props.get(KEY_PROXY_CLUSTER_INCHARGES);
+ tmpValue = compatGetValue(this.props,
+ KEY_PROXY_CLUSTER_INCHARGESV2, KEY_PROXY_CLUSTER_INCHARGES);
if (StringUtils.isNotEmpty(tmpValue)) {
this.clusterIncharges = tmpValue.trim();
}
- tmpValue = this.props.get(KEY_PROXY_CLUSTER_EXT_TAG);
+ tmpValue = compatGetValue(this.props,
+ KEY_PROXY_CLUSTER_EXT_TAGV2, KEY_PROXY_CLUSTER_EXT_TAG);
if (StringUtils.isNotEmpty(tmpValue)) {
this.clusterExtTag = tmpValue.trim();
}
// read the manager setting
this.preReadManagerSetting();
- // read whether accept msg without id2topic configure
- tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.enableUnConfigTopicAccept =
"TRUE".equalsIgnoreCase(tmpValue.trim());
- }
- // read default topics
- tmpValue = this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
- if (StringUtils.isNotBlank(tmpValue)) {
- List<String> tmpList = new ArrayList<>();
- String[] topicItems = tmpValue.split(",|\\s+");
- for (String item : topicItems) {
- if (StringUtils.isBlank(item)) {
- continue;
- }
- tmpList.add(item.trim());
- }
- if (tmpList.size() > 0) {
- defaultTopics = tmpList;
- }
- }
+ // read the configuration related to the default topics
+ this.preReadDefTopicSetting();
// read enable whitelist
- tmpValue = this.props.get(KEY_ENABLE_WHITELIST);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
- }
- // read whether enable file metric
- tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.enableFileMetric = "TRUE".equalsIgnoreCase(tmpValue.trim());
- }
- // read file metric statistic interval
- tmpValue = this.props.get(KEY_FILE_METRIC_STAT_INTERVAL_SEC);
- if (StringUtils.isNotEmpty(tmpValue)) {
- int statInvl = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_FILE_METRIC_STAT_INVL_SEC);
- if (statInvl >= VAL_MIN_FILE_METRIC_STAT_INVL_SEC) {
- this.fileMetricStatInvlSec = statInvl;
- }
- }
- // read file metric statistic max cache count
- tmpValue = this.props.get(KEY_FILE_METRIC_MAX_CACHE_CNT);
- if (StringUtils.isNotEmpty(tmpValue)) {
- int maxCacheCnt = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_FILE_METRIC_MAX_CACHE_CNT);
- if (maxCacheCnt >= VAL_MIN_FILE_METRIC_MAX_CACHE_CNT) {
- this.fileMetricStatCacheCnt = maxCacheCnt;
- }
- }
- // read source file statistic output name
- tmpValue = this.props.get(KEY_FILE_METRIC_SOURCE_OUTPUT_NAME);
- if (StringUtils.isNotBlank(tmpValue)) {
- this.fileMetricSourceOutName = tmpValue.trim();
- }
- // read sink file statistic output name
- tmpValue = this.props.get(KEY_FILE_METRIC_SINK_OUTPUT_NAME);
- if (StringUtils.isNotBlank(tmpValue)) {
- this.fileMetricSinkOutName = tmpValue.trim();
- }
- // read event file statistic output name
- tmpValue = this.props.get(KEY_FILE_METRIC_EVENT_OUTPUT_NAME);
- if (StringUtils.isNotBlank(tmpValue)) {
- this.fileMetricEventOutName = tmpValue.trim();
- }
- // read whether enable audit
- tmpValue = this.props.get(KEY_ENABLE_AUDIT);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.enableAudit = "TRUE".equalsIgnoreCase(tmpValue.trim());
- }
- // read audit proxys
- tmpValue = this.props.get(KEY_AUDIT_PROXYS);
- if (StringUtils.isNotBlank(tmpValue)) {
- String[] ipPorts = tmpValue.split("\\s+");
- for (String tmpIPPort : ipPorts) {
- if (StringUtils.isBlank(tmpIPPort)) {
- continue;
- }
- this.auditProxys.add(tmpIPPort.trim());
- }
- }
- // read audit file path
- tmpValue = this.props.get(KEY_AUDIT_FILE_PATH);
+ tmpValue = compatGetValue(this.props,
+ KEY_ENABLE_WHITELISTV2, KEY_ENABLE_WHITELIST);
if (StringUtils.isNotBlank(tmpValue)) {
- this.auditFilePath = tmpValue.trim();
- }
- // read audit max cache rows
- tmpValue = this.props.get(KEY_AUDIT_MAX_CACHE_ROWS);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.auditMaxCacheRows = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_AUDIT_MAX_CACHE_ROWS);
- }
- // read audit format interval
- tmpValue = this.props.get(KEY_AUDIT_FORMAT_INTERVAL_MS);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.auditFormatInvlMs = NumberUtils.toLong(tmpValue.trim(),
VAL_DEF_AUDIT_FORMAT_INTERVAL_MS);
- }
- // read whether response after save
- tmpValue = this.props.get(KEY_RESPONSE_AFTER_SAVE);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.responseAfterSave = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read max response after save timeout
tmpValue = this.props.get(KEY_MAX_RAS_TIMEOUT_MS);
if (StringUtils.isNotEmpty(tmpValue)) {
this.maxResAfterSaveTimeout = NumberUtils.toLong(tmpValue.trim(),
VAL_DEF_MAX_RAS_TIMEOUT_MS);
}
- // read max bufferqueue size
- tmpValue = this.props.get(KEY_MAX_BUFFERQUEUE_SIZE_KB);
+ // read default buffer queue size
+ tmpValue = compatGetValue(this.props,
+ KEY_DEF_BUFFERQUEUE_SIZE_KBV2, KEY_DEF_BUFFERQUEUE_SIZE_KB);
if (StringUtils.isNotEmpty(tmpValue)) {
- this.maxBufferQueueSizeKb = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB);
+ this.defBufferQueueSizeKB = NumberUtils.toInt(
+ tmpValue.trim(), VAL_DEF_BUFFERQUEUE_SIZE_KB);
+ }
+ // read cache cluster selector
+ tmpValue = compatGetValue(this.props,
+ KEY_CACHE_CLUSTER_SELECTORV2, KEY_CACHE_CLUSTER_SELECTOR);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.cacheClusterSelector = tmpValue.trim();
}
// read event handler
tmpValue = this.props.get(KEY_EVENT_HANDLER);
if (StringUtils.isNotBlank(tmpValue)) {
this.eventHandler = tmpValue.trim();
}
- // read cache cluster selector
- tmpValue = this.props.get(KEY_CACHE_CLUSTER_SELECTOR);
- if (StringUtils.isNotBlank(tmpValue)) {
- this.cacheClusterSelector = tmpValue.trim();
- }
// read proxy node id
- tmpValue = this.props.get(KEY_PROXY_NODE_ID);
- if (StringUtils.isNotBlank(tmpValue)) {
+ tmpValue = compatGetValue(this.props,
+ KEY_PROXY_NODE_IDV2, KEY_PROXY_NODE_ID);
+ if (StringUtils.isBlank(tmpValue)) {
+ this.proxyNodeId = AddressUtils.getSelfHost();
+ } else {
this.proxyNodeId = tmpValue.trim();
}
- // read msg compress type
- tmpValue = this.props.get(KEY_MSG_SENT_COMPRESS_TYPE);
- if (StringUtils.isNotBlank(tmpValue)) {
- this.msgCompressType = tmpValue.trim();
- }
// read prometheus Http Port
- tmpValue = this.props.get(KEY_PROMETHEUS_HTTP_PORT);
- if (StringUtils.isNotEmpty(tmpValue)) {
- this.prometheusHttpPort = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_PROMETHEUS_HTTP_PORT);
+ tmpValue = compatGetValue(this.props,
+ KEY_PROMETHEUS_HTTP_PORTV2, KEY_PROMETHEUS_HTTP_PORT);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.prometheusHttpPort = NumberUtils.toInt(
+ tmpValue.trim(), VAL_DEF_PROMETHEUS_HTTP_PORT);
}
// read whether retry send message after sent failure
- tmpValue = this.props.get(KEY_ENABLE_SEND_RETRY_AFTER_FAILURE);
+ tmpValue = compatGetValue(this.props,
+ KEY_ENABLE_SEND_RETRY_AFTER_FAILUREV2,
KEY_ENABLE_SEND_RETRY_AFTER_FAILURE);
if (StringUtils.isNotEmpty(tmpValue)) {
- this.enableSendRetryAfterFailure =
"TRUE".equalsIgnoreCase(tmpValue.trim());
+ this.sendRetryAfterFailure =
"TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read max retry count
- tmpValue = this.props.get(KEY_MAX_RETRIES_AFTER_FAILURE);
+ tmpValue = compatGetValue(this.props,
+ KEY_MAX_RETRIES_AFTER_FAILUREV2,
KEY_MAX_RETRIES_AFTER_FAILURE);
if (StringUtils.isNotBlank(tmpValue)) {
- int retries = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_MAX_RETRIES_AFTER_FAILURE);
+ int retries = NumberUtils.toInt(
+ tmpValue.trim(), VAL_DEF_MAX_RETRIES_AFTER_FAILURE);
if (retries >= 0) {
this.maxRetriesAfterFailure = retries;
}
}
+ // Pre-read the fields related to Audit
+ this.preReadAuditSetting();
+ // Pre-read the fields related to File metric
+ this.preReadMetricSetting();
+ // pre-read v1msg default setting
+ this.preReadV1MsgSetting();
}
private void preReadManagerSetting() {
@@ -626,6 +579,139 @@ public class CommonConfigHolder {
}
}
+ private void preReadDefTopicSetting() {
+ String tmpValue;
+ // read whether accept msg without id2topic configure
+ tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.enableUnConfigTopicAccept =
"TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read default topics
+ tmpValue = this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ String[] topicItems = tmpValue.split(VAL_CONFIG_ITEMS_SEPARATOR);
+ for (String item : topicItems) {
+ if (StringUtils.isBlank(item)) {
+ continue;
+ }
+ this.defaultTopics.add(item.trim());
+ }
+ LOG.info("Configured {}, size is {}, value is {}",
+ KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS,
defaultTopics.size(), defaultTopics);
+ }
+ // check whether configure default topics
+ if (this.enableUnConfigTopicAccept && this.defaultTopics.isEmpty()) {
+ LOG.error("Required {} field value is blank in {} for {} is true,
exit!",
+ KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS,
COMMON_CONFIG_FILE_NAME,
+ KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
+ System.exit(2);
+ }
+ }
+
+ private void preReadMetricSetting() {
+ String tmpValue;
+ // read whether enable file metric
+ tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.enableFileMetric = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read file metric statistic interval
+ tmpValue = this.props.get(KEY_FILE_METRIC_STAT_INTERVAL_SEC);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ int statInvl = NumberUtils.toInt(
+ tmpValue.trim(), VAL_DEF_FILE_METRIC_STAT_INVL_SEC);
+ if (statInvl >= VAL_MIN_FILE_METRIC_STAT_INVL_SEC) {
+ this.fileMetricStatInvlSec = statInvl;
+ }
+ }
+ // read file metric statistic max cache count
+ tmpValue = this.props.get(KEY_FILE_METRIC_MAX_CACHE_CNT);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ int maxCacheCnt = NumberUtils.toInt(
+ tmpValue.trim(), VAL_DEF_FILE_METRIC_MAX_CACHE_CNT);
+ if (maxCacheCnt >= VAL_MIN_FILE_METRIC_MAX_CACHE_CNT) {
+ this.fileMetricStatCacheCnt = maxCacheCnt;
+ }
+ }
+ // read source file statistic output name
+ tmpValue = this.props.get(KEY_FILE_METRIC_SOURCE_OUTPUT_NAME);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.fileMetricSourceOutName = tmpValue.trim();
+ }
+ // read sink file statistic output name
+ tmpValue = this.props.get(KEY_FILE_METRIC_SINK_OUTPUT_NAME);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.fileMetricSinkOutName = tmpValue.trim();
+ }
+ // read event file statistic output name
+ tmpValue = this.props.get(KEY_FILE_METRIC_EVENT_OUTPUT_NAME);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.fileMetricEventOutName = tmpValue.trim();
+ }
+ }
+
+ private void preReadAuditSetting() {
+ String tmpValue;
+ // read whether enable audit
+ tmpValue = this.props.get(KEY_ENABLE_AUDIT);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.enableAudit = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read audit proxys
+ tmpValue = this.props.get(KEY_AUDIT_PROXYS);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ String[] ipPorts = tmpValue.split(VAL_CONFIG_ITEMS_SEPARATOR);
+ for (String tmpIPPort : ipPorts) {
+ if (StringUtils.isBlank(tmpIPPort)) {
+ continue;
+ }
+ this.auditProxys.add(tmpIPPort.trim());
+ }
+ }
+ // read audit file path
+ tmpValue = compatGetValue(this.props,
+ KEY_AUDIT_FILE_PATHV2, KEY_AUDIT_FILE_PATH);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.auditFilePath = tmpValue.trim();
+ }
+ // read audit max cache rows
+ tmpValue = compatGetValue(this.props,
+ KEY_AUDIT_MAX_CACHE_ROWSV2, KEY_AUDIT_MAX_CACHE_ROWS);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.auditMaxCacheRows = NumberUtils.toInt(
+ tmpValue.trim(), VAL_DEF_AUDIT_MAX_CACHE_ROWS);
+ }
+ // read audit format interval
+ tmpValue = compatGetValue(this.props,
+ KEY_AUDIT_TIME_FORMAT_INTERVAL, KEY_AUDIT_FORMAT_INTERVAL_MS);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.auditFormatInvlMs = NumberUtils.toLong(
+ tmpValue.trim(), VAL_DEF_AUDIT_FORMAT_INTERVAL_MS);
+ }
+ }
+
+ private void preReadV1MsgSetting() {
+ String tmpValue;
+ // read whether response v1 message by sink
+ tmpValue = compatGetValue(this.props,
+ KEY_V1MSG_RESPONSE_BY_SINKV2, KEY_V1MSG_RESPONSE_BY_SINK);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.defV1MsgResponseBySink =
"TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read v1 msg compress type
+ tmpValue = compatGetValue(this.props,
+ KEY_V1MSG_SENT_COMPRESS_TYPEV2, KEY_V1MSG_SENT_COMPRESS_TYPE);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ InlongCompressType tmpCompType =
InlongCompressType.forType(tmpValue.trim());
+ if (tmpCompType == InlongCompressType.UNKNOWN) {
+ LOG.error("{}'s {}({}) must be in allowed range [{}], exist!",
COMMON_CONFIG_FILE_NAME,
+ KEY_V1MSG_SENT_COMPRESS_TYPEV2, tmpValue,
InlongCompressType.allowedCompressTypes);
+ System.exit(2);
+ }
+ this.defV1MsgCompressType = tmpCompType;
+ }
+ }
+
private boolean loadConfigFile() {
InputStream inStream = null;
try {
@@ -673,6 +759,47 @@ public class CommonConfigHolder {
return true;
}
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("props", props)
+ .append("clusterTag", clusterTag)
+ .append("clusterName", clusterName)
+ .append("clusterIncharges", clusterIncharges)
+ .append("clusterExtTag", clusterExtTag)
+ .append("managerIpList", managerIpList)
+ .append("managerAuthSecretId", managerAuthSecretId)
+ .append("managerAuthSecretKey", managerAuthSecretKey)
+ .append("enableStartupUsingLocalMetaFile",
enableStartupUsingLocalMetaFile)
+ .append("metaConfigSyncInvlMs", metaConfigSyncInvlMs)
+ .append("metaConfigWastAlarmMs", metaConfigWastAlarmMs)
+ .append("enableAudit", enableAudit)
+ .append("auditProxys", auditProxys)
+ .append("auditFilePath", auditFilePath)
+ .append("auditMaxCacheRows", auditMaxCacheRows)
+ .append("auditFormatInvlMs", auditFormatInvlMs)
+ .append("enableUnConfigTopicAccept", enableUnConfigTopicAccept)
+ .append("defaultTopics", defaultTopics)
+ .append("enableFileMetric", enableFileMetric)
+ .append("fileMetricStatInvlSec", fileMetricStatInvlSec)
+ .append("fileMetricStatCacheCnt", fileMetricStatCacheCnt)
+ .append("fileMetricSourceOutName", fileMetricSourceOutName)
+ .append("fileMetricSinkOutName", fileMetricSinkOutName)
+ .append("fileMetricEventOutName", fileMetricEventOutName)
+ .append("defV1MsgCompressType", defV1MsgCompressType)
+ .append("defV1MsgResponseBySink", defV1MsgResponseBySink)
+ .append("maxResAfterSaveTimeout", maxResAfterSaveTimeout)
+ .append("enableWhiteList", enableWhiteList)
+ .append("defBufferQueueSizeKB", defBufferQueueSizeKB)
+ .append("eventHandler", eventHandler)
+ .append("cacheClusterSelector", cacheClusterSelector)
+ .append("proxyNodeId", proxyNodeId)
+ .append("prometheusHttpPort", prometheusHttpPort)
+ .append("sendRetryAfterFailure", sendRetryAfterFailure)
+ .append("maxRetriesAfterFailure", maxRetriesAfterFailure)
+ .toString();
+ }
+
private String compatGetValue(Map<String, String> attrs, String newKey,
String depKey) {
String tmpValue = attrs.get(newKey);
if (StringUtils.isBlank(tmpValue)) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
index 64313adef5..b085653e48 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
@@ -17,13 +17,13 @@
package org.apache.inlong.dataproxy.sink.common;
+import org.apache.inlong.common.enums.InlongCompressType;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -48,7 +48,7 @@ public class DefaultEventHandler implements EventHandler {
*/
@Override
public Map<String, String> parseHeader(IdTopicConfig idConfig,
BatchPackProfile profile, String nodeId,
- INLONG_COMPRESSED_TYPE compressType) {
+ InlongCompressType compressType) {
Map<String, String> headers = new HashMap<>();
// version int32 protocol version, the value is 1
headers.put(ConfigConstants.MSG_ENCODE_VER,
MessageWrapType.INLONG_MSG_V1.getStrId());
@@ -71,7 +71,7 @@ public class DefaultEventHandler implements EventHandler {
// INLONG_GZ = 1,
// INLONG_SNAPPY = 2
headers.put(EventConstants.HEADER_KEY_COMPRESS_TYPE,
- String.valueOf(compressType.getNumber()));
+ String.valueOf(compressType.getName()));
// messageKey string partition hash key, optional
return headers;
}
@@ -80,7 +80,7 @@ public class DefaultEventHandler implements EventHandler {
* parseBody
*/
@Override
- public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile,
INLONG_COMPRESSED_TYPE compressType)
+ public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile,
InlongCompressType compressType)
throws IOException {
List<ProxyEvent> events = profile.getEvents();
// encode
@@ -105,7 +105,7 @@ public class DefaultEventHandler implements EventHandler {
case INLONG_GZ:
compressBytes = GzipUtils.compress(srcBytes);
break;
- case INLONG_NO_COMPRESS:
+ case NONE:
default:
compressBytes = srcBytes;
break;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
index 6b8977408c..b26f5cd744 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
@@ -17,9 +17,9 @@
package org.apache.inlong.dataproxy.sink.common;
+import org.apache.inlong.common.enums.InlongCompressType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
import java.util.Map;
@@ -33,11 +33,11 @@ public interface EventHandler {
* parseHeader
*/
Map<String, String> parseHeader(IdTopicConfig idConfig, BatchPackProfile
profile, String nodeId,
- INLONG_COMPRESSED_TYPE compressType) throws Exception;
+ InlongCompressType compressType) throws Exception;
/**
* parseBody
*/
- byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile,
INLONG_COMPRESSED_TYPE compressType)
+ byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile,
InlongCompressType compressType)
throws Exception;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 8b654a65bb..69b62d094f 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -67,7 +67,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
// message group
private BatchPackManager dispatchManager;
private final BufferQueue<PackProfile> dispatchQueue =
- new
BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
+ new
BufferQueue<>(CommonConfigHolder.getInstance().getDefBufferQueueSizeKB());
// scheduled thread pool
// reload
// dispatch
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 0193eea6f9..9bae668283 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -18,12 +18,12 @@
package org.apache.inlong.dataproxy.sink.mq;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.InlongCompressType;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.SinkContext;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@@ -49,7 +49,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
private final String nodeId;
private final Context producerContext;
//
- private final INLONG_COMPRESSED_TYPE compressType;
+ private final InlongCompressType compressType;
/**
* Constructor
@@ -62,8 +62,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
// nodeId
this.nodeId = CommonConfigHolder.getInstance().getProxyNodeId();
// compressionType
- String strCompressionType =
CommonConfigHolder.getInstance().getMsgCompressType();
- this.compressType = INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
+ this.compressType =
CommonConfigHolder.getInstance().getDefV1MsgCompressType();
// producerContext
Map<String, String> producerParams =
context.getSubProperties(PREFIX_PRODUCER);
this.producerContext = new Context(producerParams);
@@ -115,7 +114,7 @@ public class MessageQueueZoneSinkContext extends
SinkContext {
*
* @return the compressType
*/
- public INLONG_COMPRESSED_TYPE getCompressType() {
+ public InlongCompressType getCompressType() {
return compressType;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
index 1867e6a8ce..8e0cb6c5eb 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
@@ -51,7 +51,7 @@ public abstract class PackProfile {
this.inlongGroupId = inlongGroupId;
this.inlongStreamId = inlongStreamId;
this.dispatchTime = dispatchTime;
- this.enableRetryAfterFailure =
CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure();
+ this.enableRetryAfterFailure =
CommonConfigHolder.getInstance().isSendRetryAfterFailure();
this.maxRetries =
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure();
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index cc83f86e06..fcba591e65 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -339,7 +339,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
this.responsePackage(ctx, ProxySdk.ResultCode.SUCCUSS, packObject);
}
// process
- if (CommonConfigHolder.getInstance().isResponseAfterSave()) {
+ if (CommonConfigHolder.getInstance().isDefV1MsgResponseBySink()) {
this.processAndWaitingSave(ctx, packObject, events);
} else {
this.processAndResponse(ctx, packObject, events);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
index 9529aa38ff..3eb12d6a8c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
@@ -25,6 +25,8 @@ public class SourceConstants {
public static final String SYSENV_HOST_IP = "inlongHostIp";
// default source host
public static final String VAL_DEF_HOST_VALUE = "0.0.0.0";
+ // loopback host
+ public static final String VAL_LOOPBACK_HOST_VALUE = "127.0.0.1";
// source port
public static final String SRCCXT_CONFIG_PORT = "port";
// system env source port
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
index a2a347d4aa..71516da004 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
@@ -17,16 +17,48 @@
package org.apache.inlong.dataproxy.utils;
+import org.apache.inlong.dataproxy.source.SourceConstants;
+
import io.netty.channel.Channel;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
import java.net.SocketAddress;
+import java.util.Map;
public class AddressUtils {
private static final Logger logger =
LoggerFactory.getLogger(AddressUtils.class);
+ private static final String localIp;
+
+ static {
+ localIp = getLocalIp();
+ }
+
+ public static String getLocalIp() {
+ if (localIp != null) {
+ return localIp;
+ }
+ String ip = "127.0.0.1";
+ DatagramSocket socket = null;
+ try {
+ socket = new DatagramSocket();
+ socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
+ ip = socket.getLocalAddress().getHostAddress();
+ } catch (Throwable ex) {
+ logger.error("Get local IP failure,", ex);
+ } finally {
+ if (socket != null) {
+ socket.close();
+ }
+ }
+ return ip;
+ }
+
public static String getChannelLocalIP(Channel channel) {
return getChannelIP(channel, true);
}
@@ -57,4 +89,24 @@ public class AddressUtils {
}
}
+ public static String getSelfHost() {
+ String localIp = null;
+ Map<String, String> envMap = System.getenv();
+ if (envMap.containsKey(SourceConstants.SYSENV_HOST_IP)) {
+ String tmpVal = envMap.get(SourceConstants.SYSENV_HOST_IP);
+ if (ConfStringUtils.isValidIp(tmpVal)) {
+ localIp = tmpVal.trim();
+ } else {
+ logger.error("{}({}) config in system env not valid",
+ SourceConstants.SYSENV_HOST_IP, tmpVal);
+ }
+ }
+ if (StringUtils.isBlank(localIp)
+ || SourceConstants.VAL_LOOPBACK_HOST_VALUE.equals(localIp)
+ || SourceConstants.VAL_DEF_HOST_VALUE.equals(localIp)) {
+ localIp = AddressUtils.getLocalIp();
+ }
+
+ return localIp;
+ }
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 8e02c66b0e..45551e6bcc 100644
---
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -38,7 +38,7 @@ public class TestCommonConfigHolder {
Assert.assertEquals(10000,
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept());
Assert.assertTrue(CommonConfigHolder.getInstance().getDefTopics().contains("test2"));
-
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure());
+
Assert.assertTrue(CommonConfigHolder.getInstance().isSendRetryAfterFailure());
Assert.assertEquals(2,
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure());
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index 21a785ee25..4077a082ad 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -20,9 +20,9 @@ metricDomains=DataProxy
metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
metricDomains.DataProxy.snapshotInterval=60000
# whether to startup using the local metadata.json file without connecting to
the Manager
-startup.using.local.meta.file.enable=true
+meta.config.startup.using.local.file.enable=true
-proxy.enable.whitelist=true
+proxy.visit.whitelist.enable=true
meta.config.sync.interval.ms=10000
@@ -31,6 +31,6 @@ id2topic.unconfigured.accept.enable=true
# the default topic if accept unconfigured topic's message
id2topic.unconfigured.default.topics= test1 test2 test3
# whether to retry send after sent failure
-send.retry.after.failure = true
+msg.send.failure.retry.enable = true
# max retries after sent failure
-max.retries.after.failure=2
+msg.max.retries=2