This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 60c2676223 [INLONG-10066][DataProxy] Optimize the related 
configuration settings in the CommonConfigHolder.java file (#10079)
60c2676223 is described below

commit 60c2676223991af57e88d0e3982953f62834a27c
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Apr 26 10:54:06 2024 +0800

    [INLONG-10066][DataProxy] Optimize the related configuration settings in 
the CommonConfigHolder.java file (#10079)
---
 .../inlong/common/enums/InlongCompressType.java    |   6 +
 inlong-dataproxy/conf/common.properties            |  12 +-
 .../dataproxy/config/CommonConfigHolder.java       | 519 +++++++++++++--------
 .../dataproxy/sink/common/DefaultEventHandler.java |  10 +-
 .../inlong/dataproxy/sink/common/EventHandler.java |   6 +-
 .../dataproxy/sink/mq/MessageQueueZoneSink.java    |   2 +-
 .../sink/mq/MessageQueueZoneSinkContext.java       |   9 +-
 .../inlong/dataproxy/sink/mq/PackProfile.java      |   2 +-
 .../dataproxy/source/ServerMessageHandler.java     |   2 +-
 .../inlong/dataproxy/source/SourceConstants.java   |   2 +
 .../inlong/dataproxy/utils/AddressUtils.java       |  52 +++
 .../config/holder/TestCommonConfigHolder.java      |   2 +-
 .../src/test/resources/common.properties           |   8 +-
 13 files changed, 409 insertions(+), 223 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
index 24aa16898e..2869a6adad 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
@@ -17,7 +17,10 @@
 
 package org.apache.inlong.common.enums;
 
+import com.google.common.collect.Sets;
+
 import java.util.Objects;
+import java.util.Set;
 
 public enum InlongCompressType {
 
@@ -26,6 +29,9 @@ public enum InlongCompressType {
     INLONG_SNAPPY(2, "INLONG_SNAPPY", "The message compressed with inlong 
snappy"),
     UNKNOWN(99, "UNKNOWN", "Unknown compress type");
 
+    public static final Set<String> allowedCompressTypes =
+            Sets.newHashSet(NONE.getName(), INLONG_GZ.getName(), 
INLONG_SNAPPY.getName());
+
     private final int id;
     private final String name;
     private final String desc;
diff --git a/inlong-dataproxy/conf/common.properties 
b/inlong-dataproxy/conf/common.properties
index d622c345af..6d9e399867 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -18,24 +18,24 @@
 
 # manager open api address and auth key
 manager.hosts=127.0.0.1:8083
-manager.auth.secretId=
-manager.auth.secretKey=
+manager.auth.secret.id=
+manager.auth.secret.key=
 
 # proxy cluster name
 proxy.cluster.name=default_dataproxy
 proxy.cluster.tag=default_cluster
-proxy.cluster.extTag=default=true
-proxy.cluster.inCharges=admin
+proxy.cluster.ext.tag=default=true
+proxy.cluster.incharges=admin
 # synchronize interval of meta config (millisecond)
 meta.config.sync.interval.ms=10000
 # whether to startup using the local metadata.json file without connecting to 
the Manager
-startup.using.local.meta.file.enable=false
+meta.config.startup.using.local.file.enable=false
 
 # metric config
 metricDomains=DataProxy
 
metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
 metricDomains.DataProxy.snapshotInterval=60000
-prometheusHttpPort=9081
+online.metric.prometheus.http.port=9081
 
 # whether to enable audit
 audit.enable=true
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index 92d0bba627..a788702489 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -17,10 +17,12 @@
 
 package org.apache.inlong.dataproxy.config;
 
+import org.apache.inlong.common.enums.InlongCompressType;
 import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler;
 import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk;
+import org.apache.inlong.dataproxy.utils.AddressUtils;
 
+import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Context;
@@ -32,7 +34,6 @@ import java.io.InputStream;
 import java.net.URL;
 import java.security.SecureRandom;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -50,6 +51,11 @@ public class CommonConfigHolder {
     // list split separator
     private static final String VAL_CONFIG_ITEMS_SEPARATOR = ",|\\s+";
     // **** allowed keys and default value, begin
+    // proxy node id
+    @Deprecated
+    private static final String KEY_PROXY_NODE_ID = "nodeId";
+    private static final String KEY_PROXY_NODE_IDV2 = "proxy.node.id";
+    private static final String VAL_DEF_PROXY_NODE_ID = "127.0.0.1";
     // cluster tag
     private static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
     private static final String VAL_DEF_CLUSTER_TAG = "default_cluster";
@@ -94,74 +100,95 @@ public class CommonConfigHolder {
     private static final String KEY_ENABLE_STARTUP_USING_LOCAL_META_FILEV2 =
             "meta.config.startup.using.local.file.enable";
     private static final boolean VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE 
= false;
-    // whether to accept messages without mapping between groupId/streamId and 
topic
-    public static final String KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = 
"id2topic.unconfigured.accept.enable";
-    public static final boolean VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = 
false;
-    // default topics configure key, multiple topic settings are separated by 
"\\s+".
-    public static final String KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS = 
"id2topic.unconfigured.default.topics";
-    public static final String VAL_DEFAULT_TOPIC = "test";
-    // whether enable whitelist, optional field.
-    public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
-    public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
     // whether enable file metric, optional field.
-    public static final String KEY_ENABLE_FILE_METRIC = "file.metric.enable";
-    public static final boolean VAL_DEF_ENABLE_FILE_METRIC = true;
+    private static final String KEY_ENABLE_FILE_METRIC = "file.metric.enable";
+    private static final boolean VAL_DEF_ENABLE_FILE_METRIC = true;
     // file metric statistic interval (second)
-    public static final String KEY_FILE_METRIC_STAT_INTERVAL_SEC = 
"file.metric.stat.interval.sec";
-    public static final int VAL_DEF_FILE_METRIC_STAT_INVL_SEC = 60;
-    public static final int VAL_MIN_FILE_METRIC_STAT_INVL_SEC = 0;
+    private static final String KEY_FILE_METRIC_STAT_INTERVAL_SEC = 
"file.metric.stat.interval.sec";
+    private static final int VAL_DEF_FILE_METRIC_STAT_INVL_SEC = 60;
+    private static final int VAL_MIN_FILE_METRIC_STAT_INVL_SEC = 0;
     // file metric max statistic key count
-    public static final String KEY_FILE_METRIC_MAX_CACHE_CNT = 
"file.metric.max.cache.cnt";
-    public static final int VAL_DEF_FILE_METRIC_MAX_CACHE_CNT = 1000000;
-    public static final int VAL_MIN_FILE_METRIC_MAX_CACHE_CNT = 0;
+    private static final String KEY_FILE_METRIC_MAX_CACHE_CNT = 
"file.metric.max.cache.cnt";
+    private static final int VAL_DEF_FILE_METRIC_MAX_CACHE_CNT = 1000000;
+    private static final int VAL_MIN_FILE_METRIC_MAX_CACHE_CNT = 0;
     // source metric statistic name
-    public static final String KEY_FILE_METRIC_SOURCE_OUTPUT_NAME = 
"file.metric.source.output.name";
-    public static final String VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME = 
"Source";
+    private static final String KEY_FILE_METRIC_SOURCE_OUTPUT_NAME = 
"file.metric.source.output.name";
+    private static final String VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME = 
"Source";
     // sink metric statistic name
-    public static final String KEY_FILE_METRIC_SINK_OUTPUT_NAME = 
"file.metric.sink.output.name";
-    public static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
+    private static final String KEY_FILE_METRIC_SINK_OUTPUT_NAME = 
"file.metric.sink.output.name";
+    private static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
     // event metric statistic name
-    public static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME = 
"file.metric.event.output.name";
-    public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME = "Stats";
+    private static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME = 
"file.metric.event.output.name";
+    private static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME = 
"Stats";
+    // prometheus http port
+    @Deprecated
+    private static final String KEY_PROMETHEUS_HTTP_PORT = 
"prometheusHttpPort";
+    private static final String KEY_PROMETHEUS_HTTP_PORTV2 = 
"online.metric.prometheus.http.port";
+    private static final int VAL_DEF_PROMETHEUS_HTTP_PORT = 8080;
     // Audit fields
-    public static final String KEY_ENABLE_AUDIT = "audit.enable";
-    public static final boolean VAL_DEF_ENABLE_AUDIT = true;
-    public static final String KEY_AUDIT_PROXYS = "audit.proxys";
-    public static final String KEY_AUDIT_FILE_PATH = "audit.filePath";
-    public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/";
-    public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows";
-    public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
-    public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = 
"auditFormatInterval";
-    public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
+    private static final String KEY_ENABLE_AUDIT = "audit.enable";
+    private static final boolean VAL_DEF_ENABLE_AUDIT = true;
+    private static final String KEY_AUDIT_PROXYS = "audit.proxys";
+    @Deprecated
+    private static final String KEY_AUDIT_FILE_PATH = "audit.filePath";
+    private static final String KEY_AUDIT_FILE_PATHV2 = "audit.file.path";
+    private static final String VAL_DEF_AUDIT_FILE_PATH = 
"/data/inlong/audit/";
+    @Deprecated
+    private static final String KEY_AUDIT_MAX_CACHE_ROWS = 
"audit.maxCacheRows";
+    private static final String KEY_AUDIT_MAX_CACHE_ROWSV2 = 
"audit.max.cache.rows";
+    private static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
+    @Deprecated
+    private static final String KEY_AUDIT_FORMAT_INTERVAL_MS = 
"auditFormatInterval";
+    private static final String KEY_AUDIT_TIME_FORMAT_INTERVAL = 
"audit.time.format.intvl.ms";
+    private static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
+
+    // v1 msg whether response by sink
+    private static final String KEY_V1MSG_RESPONSE_BY_SINK = 
"isResponseAfterSave";
+    private static final String KEY_V1MSG_RESPONSE_BY_SINKV2 = 
"proxy.v1msg.response.by.sink.enable";
+    private static final boolean VAL_DEF_V1MSG_RESPONSE_BY_SINK = false;
+    // v1 msg sent compress type
+    @Deprecated
+    private static final String KEY_V1MSG_SENT_COMPRESS_TYPE = "compressType";
+    private static final String KEY_V1MSG_SENT_COMPRESS_TYPEV2 = 
"proxy.v1msg.compress.type";
+    private static final InlongCompressType VAL_DEF_V1MSG_COMPRESS_TYPE = 
InlongCompressType.INLONG_SNAPPY;
+    // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
+    private static final String KEY_MAX_RAS_TIMEOUT_MS = "maxRASTimeoutMs";
+    private static final long VAL_DEF_MAX_RAS_TIMEOUT_MS = 10000L;
+
+    // max buffer queue size in KB
+    @Deprecated
+    private static final String KEY_DEF_BUFFERQUEUE_SIZE_KB = 
"maxBufferQueueSizeKb";
+    private static final String KEY_DEF_BUFFERQUEUE_SIZE_KBV2 = 
"proxy.def.buffer.queue.size.KB";
+    private static final int VAL_DEF_BUFFERQUEUE_SIZE_KB = 128 * 1024;
     // whether to retry after the message send failure
-    public static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILURE = 
"send.retry.after.failure";
-    public static final boolean VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE = true;
+    @Deprecated
+    private static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILURE = 
"send.retry.after.failure";
+    private static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILUREV2 = 
"msg.send.failure.retry.enable";
+    private static final boolean VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE = 
true;
     // max retry count
-    public static final String KEY_MAX_RETRIES_AFTER_FAILURE = 
"max.retries.after.failure";
-    public static final int VAL_DEF_MAX_RETRIES_AFTER_FAILURE = -1;
-    public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
-    public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false;
-    // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
-    public static final String KEY_MAX_RAS_TIMEOUT_MS = "maxRASTimeoutMs";
-    public static final long VAL_DEF_MAX_RAS_TIMEOUT_MS = 10000L;
-    // max buffer queue size in Kb
-    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = 
"maxBufferQueueSizeKb";
-    public static final int VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
+    private static final String KEY_MAX_RETRIES_AFTER_FAILURE = 
"max.retries.after.failure";
+    private static final String KEY_MAX_RETRIES_AFTER_FAILUREV2 = 
"msg.max.retries";
+    private static final int VAL_DEF_MAX_RETRIES_AFTER_FAILURE = -1;
+    // whether to accept messages without mapping between groupId/streamId and 
topic
+    private static final String KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = 
"id2topic.unconfigured.accept.enable";
+    private static final boolean VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = 
false;
+    // default topics configure key, multiple topic settings are separated by 
"\\s+".
+    private static final String KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS = 
"id2topic.unconfigured.default.topics";
+    // whether enable whitelist, optional field.
+    @Deprecated
+    private static final String KEY_ENABLE_WHITELIST = 
"proxy.enable.whitelist";
+    private static final String KEY_ENABLE_WHITELISTV2 = 
"proxy.visit.whitelist.enable";
+    private static final boolean VAL_DEF_ENABLE_WHITELIST = false;
+
     // event handler
-    public static final String KEY_EVENT_HANDLER = "eventHandler";
-    public static final String VAL_DEF_EVENT_HANDLER = 
DefaultEventHandler.class.getName();
+    private static final String KEY_EVENT_HANDLER = "eventHandler";
+    private static final String VAL_DEF_EVENT_HANDLER = 
DefaultEventHandler.class.getName();
     // cache cluster selector
-    public static final String KEY_CACHE_CLUSTER_SELECTOR = 
"cacheClusterSelector";
-    public static final String VAL_DEF_CACHE_CLUSTER_SELECTOR = 
AllCacheClusterSelector.class.getName();
-    // proxy node id
-    public static final String KEY_PROXY_NODE_ID = "nodeId";
-    public static final String VAL_DEF_PROXY_NODE_ID = "127.0.0.1";
-    // msg sent compress type
-    public static final String KEY_MSG_SENT_COMPRESS_TYPE = "compressType";
-    public static final String VAL_DEF_MSG_COMPRESS_TYPE = 
ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name();
-    // prometheus http port
-    public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
-    public static final int VAL_DEF_PROMETHEUS_HTTP_PORT = 8080;
+    @Deprecated
+    private static final String KEY_CACHE_CLUSTER_SELECTOR = 
"cacheClusterSelector";
+    private static final String KEY_CACHE_CLUSTER_SELECTORV2 = 
"proxy.mq.cluster.selector";
+    private static final String VAL_DEF_CACHE_CLUSTER_SELECTOR = 
AllCacheClusterSelector.class.getName();
+
     // **** allowed keys and default value, end
 
     // class instance
@@ -186,24 +213,24 @@ public class CommonConfigHolder {
     private String auditFilePath = VAL_DEF_AUDIT_FILE_PATH;
     private int auditMaxCacheRows = VAL_DEF_AUDIT_MAX_CACHE_ROWS;
     private long auditFormatInvlMs = VAL_DEF_AUDIT_FORMAT_INTERVAL_MS;
-    private boolean responseAfterSave = VAL_DEF_RESPONSE_AFTER_SAVE;
-    private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
     private boolean enableUnConfigTopicAccept = 
VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT;
-    private List<String> defaultTopics = Arrays.asList(VAL_DEFAULT_TOPIC);
-    private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
-    private int maxBufferQueueSizeKb = VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB;
-    private String eventHandler = VAL_DEF_EVENT_HANDLER;
-    private String cacheClusterSelector = VAL_DEF_CACHE_CLUSTER_SELECTOR;
-    private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
-    private String msgCompressType = VAL_DEF_MSG_COMPRESS_TYPE;
-    private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+    private final List<String> defaultTopics = new ArrayList<>();
     private boolean enableFileMetric = VAL_DEF_ENABLE_FILE_METRIC;
     private int fileMetricStatInvlSec = VAL_DEF_FILE_METRIC_STAT_INVL_SEC;
     private int fileMetricStatCacheCnt = VAL_DEF_FILE_METRIC_MAX_CACHE_CNT;
     private String fileMetricSourceOutName = 
VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME;
     private String fileMetricSinkOutName = 
VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME;
     private String fileMetricEventOutName = 
VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME;
-    private boolean enableSendRetryAfterFailure = 
VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE;
+    private InlongCompressType defV1MsgCompressType = 
VAL_DEF_V1MSG_COMPRESS_TYPE;
+    private boolean defV1MsgResponseBySink = VAL_DEF_V1MSG_RESPONSE_BY_SINK;
+    private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
+    private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
+    private int defBufferQueueSizeKB = VAL_DEF_BUFFERQUEUE_SIZE_KB;
+    private String eventHandler = VAL_DEF_EVENT_HANDLER;
+    private String cacheClusterSelector = VAL_DEF_CACHE_CLUSTER_SELECTOR;
+    private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
+    private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+    private boolean sendRetryAfterFailure = 
VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE;
     private int maxRetriesAfterFailure = VAL_DEF_MAX_RETRIES_AFTER_FAILURE;
 
     /**
@@ -218,6 +245,7 @@ public class CommonConfigHolder {
                 instance = new CommonConfigHolder();
                 if (instance.loadConfigFile()) {
                     instance.preReadFields();
+                    LOG.info("{} load and read result is: {}", 
COMMON_CONFIG_FILE_NAME, instance.toString());
                 }
                 isInit = true;
             }
@@ -337,16 +365,16 @@ public class CommonConfigHolder {
         return auditFormatInvlMs;
     }
 
-    public boolean isResponseAfterSave() {
-        return responseAfterSave;
+    public boolean isDefV1MsgResponseBySink() {
+        return defV1MsgResponseBySink;
     }
 
     public long getMaxResAfterSaveTimeout() {
         return maxResAfterSaveTimeout;
     }
 
-    public int getMaxBufferQueueSizeKb() {
-        return maxBufferQueueSizeKb;
+    public int getDefBufferQueueSizeKB() {
+        return defBufferQueueSizeKB;
     }
 
     public boolean isEnableStartupUsingLocalMetaFile() {
@@ -369,8 +397,8 @@ public class CommonConfigHolder {
         return proxyNodeId;
     }
 
-    public String getMsgCompressType() {
-        return msgCompressType;
+    public InlongCompressType getDefV1MsgCompressType() {
+        return defV1MsgCompressType;
     }
 
     public String getFileMetricSourceOutName() {
@@ -385,8 +413,8 @@ public class CommonConfigHolder {
         return fileMetricEventOutName;
     }
 
-    public boolean isEnableSendRetryAfterFailure() {
-        return enableSendRetryAfterFailure;
+    public boolean isSendRetryAfterFailure() {
+        return sendRetryAfterFailure;
     }
 
     public int getMaxRetriesAfterFailure() {
@@ -406,161 +434,86 @@ public class CommonConfigHolder {
             this.clusterName = tmpValue.trim();
         }
         // read cluster incharges
-        tmpValue = this.props.get(KEY_PROXY_CLUSTER_INCHARGES);
+        tmpValue = compatGetValue(this.props,
+                KEY_PROXY_CLUSTER_INCHARGESV2, KEY_PROXY_CLUSTER_INCHARGES);
         if (StringUtils.isNotEmpty(tmpValue)) {
             this.clusterIncharges = tmpValue.trim();
         }
-        tmpValue = this.props.get(KEY_PROXY_CLUSTER_EXT_TAG);
+        tmpValue = compatGetValue(this.props,
+                KEY_PROXY_CLUSTER_EXT_TAGV2, KEY_PROXY_CLUSTER_EXT_TAG);
         if (StringUtils.isNotEmpty(tmpValue)) {
             this.clusterExtTag = tmpValue.trim();
         }
         // read the manager setting
         this.preReadManagerSetting();
-        // read whether accept msg without id2topic configure
-        tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.enableUnConfigTopicAccept = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
-        }
-        // read default topics
-        tmpValue = this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
-        if (StringUtils.isNotBlank(tmpValue)) {
-            List<String> tmpList = new ArrayList<>();
-            String[] topicItems = tmpValue.split(",|\\s+");
-            for (String item : topicItems) {
-                if (StringUtils.isBlank(item)) {
-                    continue;
-                }
-                tmpList.add(item.trim());
-            }
-            if (tmpList.size() > 0) {
-                defaultTopics = tmpList;
-            }
-        }
+        // read the configuration related to the default topics
+        this.preReadDefTopicSetting();
         // read enable whitelist
-        tmpValue = this.props.get(KEY_ENABLE_WHITELIST);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
-        }
-        // read whether enable file metric
-        tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.enableFileMetric = "TRUE".equalsIgnoreCase(tmpValue.trim());
-        }
-        // read file metric statistic interval
-        tmpValue = this.props.get(KEY_FILE_METRIC_STAT_INTERVAL_SEC);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            int statInvl = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_FILE_METRIC_STAT_INVL_SEC);
-            if (statInvl >= VAL_MIN_FILE_METRIC_STAT_INVL_SEC) {
-                this.fileMetricStatInvlSec = statInvl;
-            }
-        }
-        // read file metric statistic max cache count
-        tmpValue = this.props.get(KEY_FILE_METRIC_MAX_CACHE_CNT);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            int maxCacheCnt = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_FILE_METRIC_MAX_CACHE_CNT);
-            if (maxCacheCnt >= VAL_MIN_FILE_METRIC_MAX_CACHE_CNT) {
-                this.fileMetricStatCacheCnt = maxCacheCnt;
-            }
-        }
-        // read source file statistic output name
-        tmpValue = this.props.get(KEY_FILE_METRIC_SOURCE_OUTPUT_NAME);
-        if (StringUtils.isNotBlank(tmpValue)) {
-            this.fileMetricSourceOutName = tmpValue.trim();
-        }
-        // read sink file statistic output name
-        tmpValue = this.props.get(KEY_FILE_METRIC_SINK_OUTPUT_NAME);
-        if (StringUtils.isNotBlank(tmpValue)) {
-            this.fileMetricSinkOutName = tmpValue.trim();
-        }
-        // read event file statistic output name
-        tmpValue = this.props.get(KEY_FILE_METRIC_EVENT_OUTPUT_NAME);
-        if (StringUtils.isNotBlank(tmpValue)) {
-            this.fileMetricEventOutName = tmpValue.trim();
-        }
-        // read whether enable audit
-        tmpValue = this.props.get(KEY_ENABLE_AUDIT);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.enableAudit = "TRUE".equalsIgnoreCase(tmpValue.trim());
-        }
-        // read audit proxys
-        tmpValue = this.props.get(KEY_AUDIT_PROXYS);
-        if (StringUtils.isNotBlank(tmpValue)) {
-            String[] ipPorts = tmpValue.split("\\s+");
-            for (String tmpIPPort : ipPorts) {
-                if (StringUtils.isBlank(tmpIPPort)) {
-                    continue;
-                }
-                this.auditProxys.add(tmpIPPort.trim());
-            }
-        }
-        // read audit file path
-        tmpValue = this.props.get(KEY_AUDIT_FILE_PATH);
+        tmpValue = compatGetValue(this.props,
+                KEY_ENABLE_WHITELISTV2, KEY_ENABLE_WHITELIST);
         if (StringUtils.isNotBlank(tmpValue)) {
-            this.auditFilePath = tmpValue.trim();
-        }
-        // read audit max cache rows
-        tmpValue = this.props.get(KEY_AUDIT_MAX_CACHE_ROWS);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.auditMaxCacheRows = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_AUDIT_MAX_CACHE_ROWS);
-        }
-        // read audit format interval
-        tmpValue = this.props.get(KEY_AUDIT_FORMAT_INTERVAL_MS);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.auditFormatInvlMs = NumberUtils.toLong(tmpValue.trim(), 
VAL_DEF_AUDIT_FORMAT_INTERVAL_MS);
-        }
-        // read whether response after save
-        tmpValue = this.props.get(KEY_RESPONSE_AFTER_SAVE);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.responseAfterSave = "TRUE".equalsIgnoreCase(tmpValue.trim());
+            this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
         }
         // read max response after save timeout
         tmpValue = this.props.get(KEY_MAX_RAS_TIMEOUT_MS);
         if (StringUtils.isNotEmpty(tmpValue)) {
             this.maxResAfterSaveTimeout = NumberUtils.toLong(tmpValue.trim(), 
VAL_DEF_MAX_RAS_TIMEOUT_MS);
         }
-        // read max bufferqueue size
-        tmpValue = this.props.get(KEY_MAX_BUFFERQUEUE_SIZE_KB);
+        // read default buffer queue size
+        tmpValue = compatGetValue(this.props,
+                KEY_DEF_BUFFERQUEUE_SIZE_KBV2, KEY_DEF_BUFFERQUEUE_SIZE_KB);
         if (StringUtils.isNotEmpty(tmpValue)) {
-            this.maxBufferQueueSizeKb = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB);
+            this.defBufferQueueSizeKB = NumberUtils.toInt(
+                    tmpValue.trim(), VAL_DEF_BUFFERQUEUE_SIZE_KB);
+        }
+        // read cache cluster selector
+        tmpValue = compatGetValue(this.props,
+                KEY_CACHE_CLUSTER_SELECTORV2, KEY_CACHE_CLUSTER_SELECTOR);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.cacheClusterSelector = tmpValue.trim();
         }
         // read event handler
         tmpValue = this.props.get(KEY_EVENT_HANDLER);
         if (StringUtils.isNotBlank(tmpValue)) {
             this.eventHandler = tmpValue.trim();
         }
-        // read cache cluster selector
-        tmpValue = this.props.get(KEY_CACHE_CLUSTER_SELECTOR);
-        if (StringUtils.isNotBlank(tmpValue)) {
-            this.cacheClusterSelector = tmpValue.trim();
-        }
         // read proxy node id
-        tmpValue = this.props.get(KEY_PROXY_NODE_ID);
-        if (StringUtils.isNotBlank(tmpValue)) {
+        tmpValue = compatGetValue(this.props,
+                KEY_PROXY_NODE_IDV2, KEY_PROXY_NODE_ID);
+        if (StringUtils.isBlank(tmpValue)) {
+            this.proxyNodeId = AddressUtils.getSelfHost();
+        } else {
             this.proxyNodeId = tmpValue.trim();
         }
-        // read msg compress type
-        tmpValue = this.props.get(KEY_MSG_SENT_COMPRESS_TYPE);
-        if (StringUtils.isNotBlank(tmpValue)) {
-            this.msgCompressType = tmpValue.trim();
-        }
         // read prometheus Http Port
-        tmpValue = this.props.get(KEY_PROMETHEUS_HTTP_PORT);
-        if (StringUtils.isNotEmpty(tmpValue)) {
-            this.prometheusHttpPort = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_PROMETHEUS_HTTP_PORT);
+        tmpValue = compatGetValue(this.props,
+                KEY_PROMETHEUS_HTTP_PORTV2, KEY_PROMETHEUS_HTTP_PORT);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.prometheusHttpPort = NumberUtils.toInt(
+                    tmpValue.trim(), VAL_DEF_PROMETHEUS_HTTP_PORT);
         }
         // read whether retry send message after sent failure
-        tmpValue = this.props.get(KEY_ENABLE_SEND_RETRY_AFTER_FAILURE);
+        tmpValue = compatGetValue(this.props,
+                KEY_ENABLE_SEND_RETRY_AFTER_FAILUREV2, 
KEY_ENABLE_SEND_RETRY_AFTER_FAILURE);
         if (StringUtils.isNotEmpty(tmpValue)) {
-            this.enableSendRetryAfterFailure = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
+            this.sendRetryAfterFailure = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
         }
         // read max retry count
-        tmpValue = this.props.get(KEY_MAX_RETRIES_AFTER_FAILURE);
+        tmpValue = compatGetValue(this.props,
+                KEY_MAX_RETRIES_AFTER_FAILUREV2, 
KEY_MAX_RETRIES_AFTER_FAILURE);
         if (StringUtils.isNotBlank(tmpValue)) {
-            int retries = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_MAX_RETRIES_AFTER_FAILURE);
+            int retries = NumberUtils.toInt(
+                    tmpValue.trim(), VAL_DEF_MAX_RETRIES_AFTER_FAILURE);
             if (retries >= 0) {
                 this.maxRetriesAfterFailure = retries;
             }
         }
+        // Pre-read the fields related to Audit
+        this.preReadAuditSetting();
+        // Pre-read the fields related to File metric
+        this.preReadMetricSetting();
+        // pre-read v1msg default setting
+        this.preReadV1MsgSetting();
     }
 
     private void preReadManagerSetting() {
@@ -626,6 +579,139 @@ public class CommonConfigHolder {
         }
     }
 
+    private void preReadDefTopicSetting() {
+        String tmpValue;
+        // read whether accept msg without id2topic configure
+        tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.enableUnConfigTopicAccept = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read default topics
+        tmpValue = this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            String[] topicItems = tmpValue.split(VAL_CONFIG_ITEMS_SEPARATOR);
+            for (String item : topicItems) {
+                if (StringUtils.isBlank(item)) {
+                    continue;
+                }
+                this.defaultTopics.add(item.trim());
+            }
+            LOG.info("Configured {}, size is {}, value is {}",
+                    KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS, 
defaultTopics.size(), defaultTopics);
+        }
+        // check whether configure default topics
+        if (this.enableUnConfigTopicAccept && this.defaultTopics.isEmpty()) {
+            LOG.error("Required {} field value is blank in {} for {} is true, 
exit!",
+                    KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS, 
COMMON_CONFIG_FILE_NAME,
+                    KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
+            System.exit(2);
+        }
+    }
+
+    private void preReadMetricSetting() {
+        String tmpValue;
+        // read whether enable file metric
+        tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.enableFileMetric = "TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read file metric statistic interval
+        tmpValue = this.props.get(KEY_FILE_METRIC_STAT_INTERVAL_SEC);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            int statInvl = NumberUtils.toInt(
+                    tmpValue.trim(), VAL_DEF_FILE_METRIC_STAT_INVL_SEC);
+            if (statInvl >= VAL_MIN_FILE_METRIC_STAT_INVL_SEC) {
+                this.fileMetricStatInvlSec = statInvl;
+            }
+        }
+        // read file metric statistic max cache count
+        tmpValue = this.props.get(KEY_FILE_METRIC_MAX_CACHE_CNT);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            int maxCacheCnt = NumberUtils.toInt(
+                    tmpValue.trim(), VAL_DEF_FILE_METRIC_MAX_CACHE_CNT);
+            if (maxCacheCnt >= VAL_MIN_FILE_METRIC_MAX_CACHE_CNT) {
+                this.fileMetricStatCacheCnt = maxCacheCnt;
+            }
+        }
+        // read source file statistic output name
+        tmpValue = this.props.get(KEY_FILE_METRIC_SOURCE_OUTPUT_NAME);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.fileMetricSourceOutName = tmpValue.trim();
+        }
+        // read sink file statistic output name
+        tmpValue = this.props.get(KEY_FILE_METRIC_SINK_OUTPUT_NAME);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.fileMetricSinkOutName = tmpValue.trim();
+        }
+        // read event file statistic output name
+        tmpValue = this.props.get(KEY_FILE_METRIC_EVENT_OUTPUT_NAME);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.fileMetricEventOutName = tmpValue.trim();
+        }
+    }
+
+    private void preReadAuditSetting() {
+        String tmpValue;
+        // read whether enable audit
+        tmpValue = this.props.get(KEY_ENABLE_AUDIT);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.enableAudit = "TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read audit proxys
+        tmpValue = this.props.get(KEY_AUDIT_PROXYS);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            String[] ipPorts = tmpValue.split(VAL_CONFIG_ITEMS_SEPARATOR);
+            for (String tmpIPPort : ipPorts) {
+                if (StringUtils.isBlank(tmpIPPort)) {
+                    continue;
+                }
+                this.auditProxys.add(tmpIPPort.trim());
+            }
+        }
+        // read audit file path
+        tmpValue = compatGetValue(this.props,
+                KEY_AUDIT_FILE_PATHV2, KEY_AUDIT_FILE_PATH);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.auditFilePath = tmpValue.trim();
+        }
+        // read audit max cache rows
+        tmpValue = compatGetValue(this.props,
+                KEY_AUDIT_MAX_CACHE_ROWSV2, KEY_AUDIT_MAX_CACHE_ROWS);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.auditMaxCacheRows = NumberUtils.toInt(
+                    tmpValue.trim(), VAL_DEF_AUDIT_MAX_CACHE_ROWS);
+        }
+        // read audit format interval
+        tmpValue = compatGetValue(this.props,
+                KEY_AUDIT_TIME_FORMAT_INTERVAL, KEY_AUDIT_FORMAT_INTERVAL_MS);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.auditFormatInvlMs = NumberUtils.toLong(
+                    tmpValue.trim(), VAL_DEF_AUDIT_FORMAT_INTERVAL_MS);
+        }
+    }
+
+    private void preReadV1MsgSetting() {
+        String tmpValue;
+        // read whether response v1 message by sink
+        tmpValue = compatGetValue(this.props,
+                KEY_V1MSG_RESPONSE_BY_SINKV2, KEY_V1MSG_RESPONSE_BY_SINK);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.defV1MsgResponseBySink = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read v1 msg compress type
+        tmpValue = compatGetValue(this.props,
+                KEY_V1MSG_SENT_COMPRESS_TYPEV2, KEY_V1MSG_SENT_COMPRESS_TYPE);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            InlongCompressType tmpCompType = 
InlongCompressType.forType(tmpValue.trim());
+            if (tmpCompType == InlongCompressType.UNKNOWN) {
+                LOG.error("{}'s {}({}) must be in allowed range [{}], exist!", 
COMMON_CONFIG_FILE_NAME,
+                        KEY_V1MSG_SENT_COMPRESS_TYPEV2, tmpValue, 
InlongCompressType.allowedCompressTypes);
+                System.exit(2);
+            }
+            this.defV1MsgCompressType = tmpCompType;
+        }
+    }
+
     private boolean loadConfigFile() {
         InputStream inStream = null;
         try {
@@ -673,6 +759,47 @@ public class CommonConfigHolder {
         return true;
     }
 
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this)
+                .append("props", props)
+                .append("clusterTag", clusterTag)
+                .append("clusterName", clusterName)
+                .append("clusterIncharges", clusterIncharges)
+                .append("clusterExtTag", clusterExtTag)
+                .append("managerIpList", managerIpList)
+                .append("managerAuthSecretId", managerAuthSecretId)
+                .append("managerAuthSecretKey", managerAuthSecretKey)
+                .append("enableStartupUsingLocalMetaFile", 
enableStartupUsingLocalMetaFile)
+                .append("metaConfigSyncInvlMs", metaConfigSyncInvlMs)
+                .append("metaConfigWastAlarmMs", metaConfigWastAlarmMs)
+                .append("enableAudit", enableAudit)
+                .append("auditProxys", auditProxys)
+                .append("auditFilePath", auditFilePath)
+                .append("auditMaxCacheRows", auditMaxCacheRows)
+                .append("auditFormatInvlMs", auditFormatInvlMs)
+                .append("enableUnConfigTopicAccept", enableUnConfigTopicAccept)
+                .append("defaultTopics", defaultTopics)
+                .append("enableFileMetric", enableFileMetric)
+                .append("fileMetricStatInvlSec", fileMetricStatInvlSec)
+                .append("fileMetricStatCacheCnt", fileMetricStatCacheCnt)
+                .append("fileMetricSourceOutName", fileMetricSourceOutName)
+                .append("fileMetricSinkOutName", fileMetricSinkOutName)
+                .append("fileMetricEventOutName", fileMetricEventOutName)
+                .append("defV1MsgCompressType", defV1MsgCompressType)
+                .append("defV1MsgResponseBySink", defV1MsgResponseBySink)
+                .append("maxResAfterSaveTimeout", maxResAfterSaveTimeout)
+                .append("enableWhiteList", enableWhiteList)
+                .append("defBufferQueueSizeKB", defBufferQueueSizeKB)
+                .append("eventHandler", eventHandler)
+                .append("cacheClusterSelector", cacheClusterSelector)
+                .append("proxyNodeId", proxyNodeId)
+                .append("prometheusHttpPort", prometheusHttpPort)
+                .append("sendRetryAfterFailure", sendRetryAfterFailure)
+                .append("maxRetriesAfterFailure", maxRetriesAfterFailure)
+                .toString();
+    }
+
     private String compatGetValue(Map<String, String> attrs, String newKey, 
String depKey) {
         String tmpValue = attrs.get(newKey);
         if (StringUtils.isBlank(tmpValue)) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
index 64313adef5..b085653e48 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
@@ -17,13 +17,13 @@
 
 package org.apache.inlong.dataproxy.sink.common;
 
+import org.apache.inlong.common.enums.InlongCompressType;
 import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
 import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -48,7 +48,7 @@ public class DefaultEventHandler implements EventHandler {
      */
     @Override
     public Map<String, String> parseHeader(IdTopicConfig idConfig, 
BatchPackProfile profile, String nodeId,
-            INLONG_COMPRESSED_TYPE compressType) {
+            InlongCompressType compressType) {
         Map<String, String> headers = new HashMap<>();
         // version int32 protocol version, the value is 1
         headers.put(ConfigConstants.MSG_ENCODE_VER, 
MessageWrapType.INLONG_MSG_V1.getStrId());
@@ -71,7 +71,7 @@ public class DefaultEventHandler implements EventHandler {
         // INLONG_GZ = 1,
         // INLONG_SNAPPY = 2
         headers.put(EventConstants.HEADER_KEY_COMPRESS_TYPE,
-                String.valueOf(compressType.getNumber()));
+                String.valueOf(compressType.getName()));
         // messageKey string partition hash key, optional
         return headers;
     }
@@ -80,7 +80,7 @@ public class DefaultEventHandler implements EventHandler {
      * parseBody
      */
     @Override
-    public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, 
INLONG_COMPRESSED_TYPE compressType)
+    public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, 
InlongCompressType compressType)
             throws IOException {
         List<ProxyEvent> events = profile.getEvents();
         // encode
@@ -105,7 +105,7 @@ public class DefaultEventHandler implements EventHandler {
             case INLONG_GZ:
                 compressBytes = GzipUtils.compress(srcBytes);
                 break;
-            case INLONG_NO_COMPRESS:
+            case NONE:
             default:
                 compressBytes = srcBytes;
                 break;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
index 6b8977408c..b26f5cd744 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/EventHandler.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.dataproxy.sink.common;
 
+import org.apache.inlong.common.enums.InlongCompressType;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 
 import java.util.Map;
 
@@ -33,11 +33,11 @@ public interface EventHandler {
      * parseHeader
      */
     Map<String, String> parseHeader(IdTopicConfig idConfig, BatchPackProfile 
profile, String nodeId,
-            INLONG_COMPRESSED_TYPE compressType) throws Exception;
+            InlongCompressType compressType) throws Exception;
 
     /**
      * parseBody
      */
-    byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, 
INLONG_COMPRESSED_TYPE compressType)
+    byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, 
InlongCompressType compressType)
             throws Exception;
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 8b654a65bb..69b62d094f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -67,7 +67,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
     // message group
     private BatchPackManager dispatchManager;
     private final BufferQueue<PackProfile> dispatchQueue =
-            new 
BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
+            new 
BufferQueue<>(CommonConfigHolder.getInstance().getDefBufferQueueSizeKB());
     // scheduled thread pool
     // reload
     // dispatch
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 0193eea6f9..9bae668283 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.dataproxy.sink.mq;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.InlongCompressType;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.common.SinkContext;
-import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 
 import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -49,7 +49,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
     private final String nodeId;
     private final Context producerContext;
     //
-    private final INLONG_COMPRESSED_TYPE compressType;
+    private final InlongCompressType compressType;
 
     /**
      * Constructor
@@ -62,8 +62,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
         // nodeId
         this.nodeId = CommonConfigHolder.getInstance().getProxyNodeId();
         // compressionType
-        String strCompressionType = 
CommonConfigHolder.getInstance().getMsgCompressType();
-        this.compressType = INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
+        this.compressType = 
CommonConfigHolder.getInstance().getDefV1MsgCompressType();
         // producerContext
         Map<String, String> producerParams = 
context.getSubProperties(PREFIX_PRODUCER);
         this.producerContext = new Context(producerParams);
@@ -115,7 +114,7 @@ public class MessageQueueZoneSinkContext extends 
SinkContext {
      * 
      * @return the compressType
      */
-    public INLONG_COMPRESSED_TYPE getCompressType() {
+    public InlongCompressType getCompressType() {
         return compressType;
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
index 1867e6a8ce..8e0cb6c5eb 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
@@ -51,7 +51,7 @@ public abstract class PackProfile {
         this.inlongGroupId = inlongGroupId;
         this.inlongStreamId = inlongStreamId;
         this.dispatchTime = dispatchTime;
-        this.enableRetryAfterFailure = 
CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure();
+        this.enableRetryAfterFailure = 
CommonConfigHolder.getInstance().isSendRetryAfterFailure();
         this.maxRetries = 
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure();
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index cc83f86e06..fcba591e65 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -339,7 +339,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
             this.responsePackage(ctx, ProxySdk.ResultCode.SUCCUSS, packObject);
         }
         // process
-        if (CommonConfigHolder.getInstance().isResponseAfterSave()) {
+        if (CommonConfigHolder.getInstance().isDefV1MsgResponseBySink()) {
             this.processAndWaitingSave(ctx, packObject, events);
         } else {
             this.processAndResponse(ctx, packObject, events);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
index 9529aa38ff..3eb12d6a8c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
@@ -25,6 +25,8 @@ public class SourceConstants {
     public static final String SYSENV_HOST_IP = "inlongHostIp";
     // default source host
     public static final String VAL_DEF_HOST_VALUE = "0.0.0.0";
+    // loopback host
+    public static final String VAL_LOOPBACK_HOST_VALUE = "127.0.0.1";
     // source port
     public static final String SRCCXT_CONFIG_PORT = "port";
     // system env source port
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
index a2a347d4aa..71516da004 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
@@ -17,16 +17,48 @@
 
 package org.apache.inlong.dataproxy.utils;
 
+import org.apache.inlong.dataproxy.source.SourceConstants;
+
 import io.netty.channel.Channel;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.DatagramSocket;
+import java.net.InetAddress;
 import java.net.SocketAddress;
+import java.util.Map;
 
 public class AddressUtils {
 
     private static final Logger logger = 
LoggerFactory.getLogger(AddressUtils.class);
 
+    private static final String localIp;
+
+    static {
+        localIp = getLocalIp();
+    }
+
+    public static String getLocalIp() {
+        if (localIp != null) {
+            return localIp;
+        }
+        String ip = "127.0.0.1";
+        DatagramSocket socket = null;
+        try {
+            socket = new DatagramSocket();
+            socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
+            ip = socket.getLocalAddress().getHostAddress();
+        } catch (Throwable ex) {
+            logger.error("Get local IP failure,", ex);
+        } finally {
+            if (socket != null) {
+                socket.close();
+            }
+        }
+        return ip;
+    }
+
     public static String getChannelLocalIP(Channel channel) {
         return getChannelIP(channel, true);
     }
@@ -57,4 +89,24 @@ public class AddressUtils {
         }
     }
 
+    public static String getSelfHost() {
+        String localIp = null;
+        Map<String, String> envMap = System.getenv();
+        if (envMap.containsKey(SourceConstants.SYSENV_HOST_IP)) {
+            String tmpVal = envMap.get(SourceConstants.SYSENV_HOST_IP);
+            if (ConfStringUtils.isValidIp(tmpVal)) {
+                localIp = tmpVal.trim();
+            } else {
+                logger.error("{}({}) config in system env not valid",
+                        SourceConstants.SYSENV_HOST_IP, tmpVal);
+            }
+        }
+        if (StringUtils.isBlank(localIp)
+                || SourceConstants.VAL_LOOPBACK_HOST_VALUE.equals(localIp)
+                || SourceConstants.VAL_DEF_HOST_VALUE.equals(localIp)) {
+            localIp = AddressUtils.getLocalIp();
+        }
+
+        return localIp;
+    }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 8e02c66b0e..45551e6bcc 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -38,7 +38,7 @@ public class TestCommonConfigHolder {
         Assert.assertEquals(10000, 
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
         
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept());
         
Assert.assertTrue(CommonConfigHolder.getInstance().getDefTopics().contains("test2"));
-        
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure());
+        
Assert.assertTrue(CommonConfigHolder.getInstance().isSendRetryAfterFailure());
         Assert.assertEquals(2, 
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure());
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties 
b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index 21a785ee25..4077a082ad 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -20,9 +20,9 @@ metricDomains=DataProxy
 
metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
 metricDomains.DataProxy.snapshotInterval=60000
 # whether to startup using the local metadata.json file without connecting to 
the Manager
-startup.using.local.meta.file.enable=true
+meta.config.startup.using.local.file.enable=true
 
-proxy.enable.whitelist=true
+proxy.visit.whitelist.enable=true
 
 meta.config.sync.interval.ms=10000
 
@@ -31,6 +31,6 @@ id2topic.unconfigured.accept.enable=true
 # the default topic if accept unconfigured topic's message
 id2topic.unconfigured.default.topics= test1 test2 test3
 # whether to retry send after sent failure
-send.retry.after.failure = true
+msg.send.failure.retry.enable = true
 # max retries after sent failure
-max.retries.after.failure=2
+msg.max.retries=2

Reply via email to