This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a629f1cf6f [INLONG-8252][DataProxy] Adjust default Topic settings from 
Source to Sink (#8259)
a629f1cf6f is described below

commit a629f1cf6feedbe548bffe3b3cfab3660c8ec763
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Jun 15 19:46:04 2023 +0800

    [INLONG-8252][DataProxy] Adjust default Topic settings from Source to Sink 
(#8259)
---
 .../dataproxy/config/CommonConfigHolder.java       | 52 ++++++++++++++---
 .../dataproxy/config/holder/MetaConfigHolder.java  |  7 ++-
 .../inlong/dataproxy/consts/StatConstants.java     | 23 +++-----
 .../inlong/dataproxy/http/MessageFilter.java       |  2 +-
 .../dataproxy/http/SimpleMessageHandler.java       | 13 ++---
 .../inlong/dataproxy/sink/common/SinkContext.java  | 41 +++++++++-----
 .../sink/mq/MessageQueueZoneSinkContext.java       |  4 +-
 .../dataproxy/sink/mq/kafka/KafkaHandler.java      | 65 ++++++++++++++--------
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java    | 61 ++++++++++++--------
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 49 +++++++++-------
 .../dataproxy/source/ServerMessageHandler.java     |  4 +-
 .../inlong/dataproxy/source2/BaseSource.java       |  9 ---
 .../dataproxy/source2/InLongMessageFactory.java    |  2 +-
 .../dataproxy/source2/InLongMessageHandler.java    | 13 ++---
 .../inlong/dataproxy/source2/SourceConstants.java  |  2 -
 .../dataproxy/source2/v0msg/CodecBinMsg.java       | 17 ++----
 .../dataproxy/source2/v0msg/CodecTextMsg.java      | 38 ++++++++-----
 .../config/holder/TestCommonConfigHolder.java      |  8 +--
 .../src/test/resources/common.properties           |  5 ++
 19 files changed, 243 insertions(+), 172 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index 81ba91e4df..d6e246978a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -30,6 +30,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -78,8 +81,11 @@ public class CommonConfigHolder {
             "startup.using.local.meta.file.enable";
     public static final boolean VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE = 
false;
     // whether to accept messages without mapping between groupId/streamId and 
topic
-    public static final String KEY_NOTFOUND_TOPIC_ACCEPT = 
"source.topic.notfound.accept";
-    public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false;
+    public static final String KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = 
"id2topic.unconfigured.accept.enable";
+    public static final boolean VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT = 
false;
+    // default topics configure key, multiple topic settings are separated by 
"\\s+".
+    public static final String KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS = 
"id2topic.unconfigured.default.topics";
+    public static final String VAL_DEFAULT_TOPIC = "test";
     // whether enable whitelist, optional field.
     public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
     public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
@@ -161,7 +167,8 @@ public class CommonConfigHolder {
     private long auditFormatInvlMs = VAL_DEF_AUDIT_FORMAT_INTERVAL_MS;
     private boolean responseAfterSave = VAL_DEF_RESPONSE_AFTER_SAVE;
     private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
-    private boolean noTopicAccept = VAL_DEF_NOTFOUND_TOPIC_ACCEPT;
+    private boolean enableUnConfigTopicAccept = 
VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT;
+    private List<String> defaultTopics = Arrays.asList(VAL_DEFAULT_TOPIC);
     private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
     private int maxBufferQueueSizeKb = VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB;
     private String eventHandler = VAL_DEF_EVENT_HANDLER;
@@ -239,8 +246,20 @@ public class CommonConfigHolder {
         return metaConfigSyncInvlMs;
     }
 
-    public boolean isNoTopicAccept() {
-        return noTopicAccept;
+    public boolean isEnableUnConfigTopicAccept() {
+        return enableUnConfigTopicAccept;
+    }
+
+    public List<String> getDefTopics() {
+        return defaultTopics;
+    }
+
+    public String getRandDefTopics() {
+        if (defaultTopics.isEmpty()) {
+            return null;
+        }
+        SecureRandom rand = new SecureRandom();
+        return defaultTopics.get(rand.nextInt(defaultTopics.size()));
     }
 
     public boolean isEnableWhiteList() {
@@ -380,10 +399,27 @@ public class CommonConfigHolder {
         if (StringUtils.isNotEmpty(tmpValue)) {
             this.enableStartupUsingLocalMetaFile = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
         }
-        // read whether accept msg without topic
-        tmpValue = this.props.get(KEY_NOTFOUND_TOPIC_ACCEPT);
+        // read whether accept msg without id2topic configure
+        tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
         if (StringUtils.isNotEmpty(tmpValue)) {
-            this.noTopicAccept = "TRUE".equalsIgnoreCase(tmpValue.trim());
+            this.enableUnConfigTopicAccept = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
+            if (enableUnConfigTopicAccept) {
+                // read default topics
+                tmpValue = 
this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
+                if (StringUtils.isNotBlank(tmpValue)) {
+                    List<String> tmpList = new ArrayList<>();
+                    String[] topicItems = tmpValue.split("\\s+");
+                    for (String item : topicItems) {
+                        if (StringUtils.isBlank(item)) {
+                            continue;
+                        }
+                        tmpList.add(item.trim());
+                    }
+                    if (tmpList.size() > 0) {
+                        defaultTopics = tmpList;
+                    }
+                }
+            }
         }
         // read enable whitelist
         tmpValue = this.props.get(KEY_ENABLE_WHITELIST);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 70a07bde2c..6d0da8d5ea 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -177,7 +177,12 @@ public class MetaConfigHolder extends ConfigHolder {
     }
 
     public Set<String> getAllTopicName() {
-        Set<String> result = new HashSet<>(defTopics);
+        Set<String> result = new HashSet<>();
+        // add default topics first
+        if (CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+            result.addAll(CommonConfigHolder.getInstance().getDefTopics());
+        }
+        // add configured topics
         for (IdTopicConfig topicConfig : id2TopicMap.values()) {
             if (topicConfig == null) {
                 continue;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 48e087c10c..973f01451f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -57,6 +57,7 @@ public class StatConstants {
     public static final java.lang.String EVENT_MSG_BODY_NEGATIVE = 
"msg.body.negative";
     public static final java.lang.String EVENT_MSG_BODY_UNPRESS_EXP = 
"msg.body.unpress.exp";
     public static final java.lang.String EVENT_MSG_BODY_OVERMAX = 
"msg.body.overmax";
+    public static final java.lang.String EVENT_MSG_BODY_TRIP = "msg.body.trip";
     public static final java.lang.String EVENT_MSG_ATTR_NEGATIVE = 
"msg.attr.negative";
     public static final java.lang.String EVENT_MSG_MAGIC_UNEQUAL = 
"msg.magic.unequal";
     public static final java.lang.String EVENT_MSG_HB_TOTALLEN_BELOWMIN = 
"msg.hb.totallen.belowmin";
@@ -76,26 +77,16 @@ public class StatConstants {
     public static final java.lang.String EVENT_MSG_V0_POST_FAILURE = 
"msg.post.v0.failure";
     public static final java.lang.String EVENT_MSG_V1_POST_SUCCESS = 
"msg.post.v1.success";
     public static final java.lang.String EVENT_MSG_V1_POST_DROPPED = 
"msg.post.v1.dropped";
+    // sink
+    public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING = 
"sink.topic.missing";
+    public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING = 
"default.topic.empty";
+    public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED = 
"default.topic.used";
+    public static final java.lang.String EVENT_SINK_PRODUCER_NULL = 
"sink.producer.null";
+    public static final java.lang.String EVENT_SINK_SEND_EXCEPTION = 
"sink.send.exception";
 
-    public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
-    public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
-    public static final java.lang.String EVENT_SINK_NOPRODUCER = 
"sink.noproducer";
-    public static final java.lang.String EVENT_SINK_SENDEXCEPT = 
"sink.sendexcept";
     public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry";
     public static final java.lang.String EVENT_SINK_FAILDROPPED = 
"sink.dropped";
     public static final java.lang.String EVENT_SINK_SUCCESS = "sink.success";
     public static final java.lang.String EVENT_SINK_RECEIVEEXCEPT = 
"sink.rcvexcept";
 
-    public static final java.lang.String METASINK_OTHEREXP = 
"metasink.otherexp";
-    public static final java.lang.String METASINK_NOSLAVE = "metasink.noslave";
-    public static final java.lang.String METASINK_MSG_NOTOPIC = 
"metasink.msgnotopic";
-    public static final java.lang.String METASINK_PROCESS_SPEED = 
"metasink.process.speed";
-    public static final java.lang.String EVENT_OTHEREXP = "socketmsg.otherexp";
-    public static final java.lang.String EVENT_INVALID = "socketmsg.invalid";
-
-    public static final java.lang.String AGENT_MESSAGES_SENT_SUCCESS = 
"agent.messages.success";
-    public static final java.lang.String AGENT_PACKAGES_SENT_SUCCESS = 
"agent.packages.success";
-    public static final java.lang.String MSG_COUNTER_KEY = "msgcnt";
-    public static final java.lang.String MSG_PKG_TIME_KEY = "msg.pkg.time";
-    public static final java.lang.String AGENT_MESSAGES_COUNT_PREFIX = 
"agent.messages.count.";
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
index 8c65690faa..e2272cd5c0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
@@ -106,7 +106,7 @@ public class MessageFilter implements Filter {
         // get and check topicName
         String topicName = ConfigManager.getInstance().getTopicName(groupId, 
streamId);
         if (StringUtils.isBlank(topicName)
-                && !CommonConfigHolder.getInstance().isNoTopicAccept()) {
+                && 
!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
             returnRspPackage(resp, req.getCharacterEncoding(),
                     DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
                     DataProxyErrCode.TOPIC_IS_BLANK.getErrMsg());
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 59a7655338..e2eba723a3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -22,7 +22,6 @@ import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -103,13 +102,9 @@ public class SimpleMessageHandler implements 
MessageHandler {
         // get topicName
         String topicName = configManager.getTopicName(groupId, streamId);
         if (StringUtils.isBlank(topicName)) {
-            if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
-                topicName = "test";
-            } else {
-                throw new MessageProcessException(strBuff
-                        .append("Topic for message is null, inlongGroupId = ")
-                        .append(groupId).append(", inlongStreamId = 
").append(streamId).toString());
-            }
+            throw new MessageProcessException(strBuff
+                    .append("Topic for message is null, inlongGroupId = ")
+                    .append(groupId).append(", inlongStreamId = 
").append(streamId).toString());
         }
         // get message data time
         final long msgRcvTime = System.currentTimeMillis();
@@ -171,7 +166,7 @@ public class SimpleMessageHandler implements MessageHandler 
{
         // build metric data item
         longDataTime = longDataTime / 1000 / 60 / 10;
         longDataTime = longDataTime * 1000 * 60 * 10;
-        
strBuff.append("http").append(SEP_HASHTAG).append(topicName).append(SEP_HASHTAG)
+        
strBuff.append("http").append(SEP_HASHTAG).append(groupId).append(SEP_HASHTAG)
                 
.append(streamId).append(SEP_HASHTAG).append(strRemoteIP).append(SEP_HASHTAG)
                 .append(NetworkUtils.getLocalIp()).append(SEP_HASHTAG)
                 .append(evenProcType.getRight()).append(SEP_HASHTAG)
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 4ebe570975..102c88a37d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.dataproxy.sink.common;
 
 import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
 import org.apache.inlong.dataproxy.sink.mq.PackProfile;
 import org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler;
@@ -65,7 +65,7 @@ public class SinkContext {
     protected Timer reloadTimer;
     // file metric statistic
     protected MonitorIndex monitorIndex = null;
-    private MonitorIndexExt monitorIndexExt = null;
+    private MonitorStats monitorStats = null;
 
     /**
      * Constructor
@@ -90,13 +90,15 @@ public class SinkContext {
         // init monitor logic
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
             this.monitorIndex = new 
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSinkOutName(),
-                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
                     
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
-            this.monitorIndexExt = new MonitorIndexExt(
+            this.monitorStats = new MonitorStats(
                     
CommonConfigHolder.getInstance().getFileMetricEventOutName()
                             + AttrConstants.SEP_HASHTAG + this.getSinkName(),
-                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
                     
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+            this.monitorIndex.start();
+            this.monitorStats.start();
         }
         try {
             this.reload();
@@ -118,23 +120,36 @@ public class SinkContext {
         // stop file statistic index
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
             if (monitorIndex != null) {
-                monitorIndex.shutDown();
+                monitorIndex.stop();
             }
-            if (monitorIndexExt != null) {
-                monitorIndexExt.shutDown();
+            if (monitorStats != null) {
+                monitorStats.stop();
             }
         }
     }
 
-    public void fileMetricEventInc(String eventKey) {
+    public void fileMetricIncSumStats(String eventKey) {
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
-            monitorIndexExt.incrementAndGet(eventKey);
+            monitorStats.incSumStats(eventKey);
         }
     }
 
-    public void fileMetricRecordAdd(String key, int cnt, int packCnt, long 
packSize, int failCnt) {
+    public void fileMetricIncWithDetailStats(String eventKey, String 
detailInfoKey) {
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
-            monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
+            monitorStats.incSumStats(eventKey);
+            monitorStats.incDetailStats(eventKey + "#" + detailInfoKey);
+        }
+    }
+
+    public void fileMetricAddSuccCnt(String key, int cnt, int packCnt, long 
packSize) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            monitorIndex.addSuccStats(key, cnt, packCnt, packSize);
+        }
+    }
+
+    public void fileMetricAddFailCnt(String key, int failCnt) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            monitorIndex.addFailStats(key, failCnt);
         }
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 151b812ff0..c395af814d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -242,11 +242,11 @@ public class MessageQueueZoneSinkContext extends 
SinkContext {
             DataProxyErrCode errCode, String errMsg) {
         if (currentRecord.isResend()) {
             dispatchQueue.offer(currentRecord);
-            fileMetricEventInc(StatConstants.EVENT_SINK_FAILRETRY);
+            fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILRETRY);
             this.addSendResultMetric(currentRecord, mqName, topic, false, 
sendTime);
         } else {
             currentRecord.fail(errCode, errMsg);
-            fileMetricEventInc(StatConstants.EVENT_SINK_FAILDROPPED);
+            fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILDROPPED);
         }
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index b8eef4f7ca..86e0fddefc 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.dataproxy.sink.mq.kafka;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -29,6 +31,7 @@ import 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
 import org.apache.inlong.dataproxy.sink.mq.PackProfile;
 import org.apache.inlong.dataproxy.sink.mq.SimplePackProfile;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -50,7 +53,8 @@ import java.util.Set;
 public class KafkaHandler implements MessageQueueHandler {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaHandler.class);
-    public static final String KEY_NAMESPACE = "namespace";
+    // log print count
+    private static final LogCounter logCounter = new LogCounter(10, 100000, 30 
* 1000);
 
     private CacheClusterConfig config;
     private String clusterName;
@@ -115,27 +119,34 @@ public class KafkaHandler implements MessageQueueHandler {
     @Override
     public boolean send(PackProfile profile) {
         try {
-            // idConfig
+            String topic;
+            // get idConfig
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
-                sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
-                sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                sinkContext.getDispatchQueue().release(profile.getSize());
-                
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
-                return false;
-            }
-            String topic = idConfig.getTopicName();
-            if (topic == null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
-                sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                sinkContext.getDispatchQueue().release(profile.getSize());
-                profile.fail(DataProxyErrCode.TOPIC_IS_BLANK, "");
-                return false;
+                if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                    sinkContext.fileMetricIncWithDetailStats(
+                            StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, 
profile.getUid());
+                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                    return false;
+                }
+                topic = CommonConfigHolder.getInstance().getRandDefTopics();
+                if (StringUtils.isEmpty(topic)) {
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
+                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                    return false;
+                }
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
+            } else {
+                topic = idConfig.getTopicName();
             }
             // create producer failed
             if (producer == null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_PRODUCER_NULL);
                 sinkContext.processSendFail(profile, clusterName, topic, 0, 
DataProxyErrCode.PRODUCER_IS_NULL, "");
                 return false;
             }
@@ -147,10 +158,12 @@ public class KafkaHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception ex) {
-            
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
+            
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
-            LOG.error(ex.getMessage(), ex);
+            if (logCounter.shouldPrint()) {
+                LOG.error("Send Message to Kafka failure", ex);
+            }
             return false;
         }
     }
@@ -188,12 +201,14 @@ public class KafkaHandler implements MessageQueueHandler {
             @Override
             public void onCompletion(RecordMetadata arg0, Exception ex) {
                 if (ex != null) {
-                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                     sinkContext.processSendFail(batchProfile, clusterName, 
topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
-                    LOG.error("Send BatchPackProfile to Kafka failure", ex);
+                    if (logCounter.shouldPrint()) {
+                        LOG.error("Send BatchPackProfile to Kafka failure", 
ex);
+                    }
                 } else {
-                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(batchProfile, clusterName, 
topic, true, sendTime);
                     
sinkContext.getDispatchQueue().release(batchProfile.getSize());
                     batchProfile.ack();
@@ -230,12 +245,14 @@ public class KafkaHandler implements MessageQueueHandler {
             @Override
             public void onCompletion(RecordMetadata arg0, Exception ex) {
                 if (ex != null) {
-                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                     sinkContext.processSendFail(simpleProfile, clusterName, 
topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
-                    LOG.error("Send SimplePackProfile to Kafka failure", ex);
+                    if (logCounter.shouldPrint()) {
+                        LOG.error("Send SimplePackProfile to Kafka failure", 
ex);
+                    }
                 } else {
-                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(simpleProfile, 
clusterName, topic, true, sendTime);
                     
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
                     simpleProfile.ack();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 0765197431..7800e615fa 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.mq.pulsar;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -203,24 +204,30 @@ public class PulsarHandler implements MessageQueueHandler 
{
     @Override
     public boolean send(PackProfile profile) {
         try {
-            // idConfig
+            String producerTopic;
+            // get idConfig
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
-                sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
-                sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                sinkContext.getDispatchQueue().release(profile.getSize());
-                
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
-                return false;
-            }
-            // topic
-            String producerTopic = idConfig.getPulsarTopicName(tenant, 
namespace);
-            if (producerTopic == null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
-                sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                sinkContext.getDispatchQueue().release(profile.getSize());
-                profile.fail(DataProxyErrCode.TOPIC_IS_BLANK, "");
-                return false;
+                if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                    sinkContext.fileMetricIncWithDetailStats(
+                            StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, 
profile.getUid());
+                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                    return false;
+                }
+                producerTopic = 
CommonConfigHolder.getInstance().getRandDefTopics();
+                if (StringUtils.isEmpty(producerTopic)) {
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
+                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                    return false;
+                }
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
+            } else {
+                producerTopic = idConfig.getPulsarTopicName(tenant, namespace);
             }
             // get producer
             Producer<byte[]> producer = this.producerMap.get(producerTopic);
@@ -246,7 +253,7 @@ public class PulsarHandler implements MessageQueueHandler {
             }
             // create producer failed
             if (producer == null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_PRODUCER_NULL);
                 sinkContext.processSendFail(profile, clusterName, 
producerTopic, 0,
                         DataProxyErrCode.PRODUCER_IS_NULL, "");
                 return false;
@@ -259,10 +266,12 @@ public class PulsarHandler implements MessageQueueHandler 
{
             }
             return true;
         } catch (Exception ex) {
-            
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
+            
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
-            LOG.error(ex.getMessage(), ex);
+            if (logCounter.shouldPrint()) {
+                LOG.error("Send Message to Pulsar failure", ex);
+            }
             return false;
         }
     }
@@ -314,12 +323,14 @@ public class PulsarHandler implements MessageQueueHandler 
{
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(batchProfile, clusterName, 
producerTopic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
-                LOG.error("Send ProfileV1 to Pulsar failure", ex);
+                if (logCounter.shouldPrint()) {
+                    LOG.error("Send ProfileV1 to Pulsar failure", ex);
+                }
             } else {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(batchProfile, clusterName, 
producerTopic, true, sendTime);
                 sinkContext.getDispatchQueue().release(batchProfile.getSize());
                 batchProfile.ack();
@@ -346,12 +357,14 @@ public class PulsarHandler implements MessageQueueHandler 
{
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(simpleProfile, clusterName, 
producerTopic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
-                LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
+                if (logCounter.shouldPrint()) {
+                    LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
+                }
             } else {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(simpleProfile, clusterName, 
producerTopic, true, sendTime);
                 
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
                 simpleProfile.ack();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 0f6b4fcce1..0b5ee8cffa 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.mq.tube;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -39,6 +40,7 @@ import 
org.apache.inlong.tubemq.client.producer.MessageSentCallback;
 import org.apache.inlong.tubemq.client.producer.MessageSentResult;
 import org.apache.inlong.tubemq.corebase.Message;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +59,6 @@ public class TubeHandler implements MessageQueueHandler {
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 
* 1000);
 
     private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
-    public static final String KEY_NAMESPACE = "namespace";
 
     private CacheClusterConfig config;
     private String clusterName;
@@ -183,29 +184,35 @@ public class TubeHandler implements MessageQueueHandler {
     public boolean send(PackProfile profile) {
         try {
             // idConfig
+            String topic;
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
-                sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
-                sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                sinkContext.getDispatchQueue().release(profile.getSize());
-                
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
-                return false;
-            }
-            String topic = idConfig.getTopicName();
-            if (topic == null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
-                sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                sinkContext.getDispatchQueue().release(profile.getSize());
-                profile.fail(DataProxyErrCode.TOPIC_IS_BLANK, "");
-                return false;
+                if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                    sinkContext.fileMetricIncWithDetailStats(
+                            StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, 
profile.getUid());
+                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                    return false;
+                }
+                topic = CommonConfigHolder.getInstance().getRandDefTopics();
+                if (StringUtils.isEmpty(topic)) {
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
+                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                    sinkContext.getDispatchQueue().release(profile.getSize());
+                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                    return false;
+                }
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
+            } else {
+                topic = idConfig.getTopicName();
             }
             // create producer failed
             if (producer == null) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_PRODUCER_NULL);
                 sinkContext.processSendFail(profile, clusterName, topic, 0,
                         DataProxyErrCode.PRODUCER_IS_NULL, "");
-                LOG.error("producer is null");
                 return false;
             }
             // publish
@@ -221,7 +228,7 @@ public class TubeHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception ex) {
-            
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
+            
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
             LOG.error(ex.getMessage(), ex);
@@ -258,7 +265,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onMessageSent(MessageSentResult result) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(batchProfile, clusterName, 
topic, true, sendTime);
                 sinkContext.getDispatchQueue().release(batchProfile.getSize());
                 batchProfile.ack();
@@ -266,7 +273,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onException(Throwable ex) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(batchProfile, clusterName, topic, 
sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
@@ -292,7 +299,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onMessageSent(MessageSentResult result) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(simpleProfile, clusterName, 
topic, true, sendTime);
                 
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
                 simpleProfile.ack();
@@ -300,7 +307,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onException(Throwable ex) {
-                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(simpleProfile, clusterName, topic, 
sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index ab1d77b53e..5669e8de08 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -381,7 +381,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
             }
             // check topic configure
             if (StringUtils.isEmpty(configTopic)) {
-                if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
+                if 
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
                     configTopic = this.defaultTopic;
                 } else {
                     
commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
@@ -518,7 +518,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                 longDataTime = longDataTime / 1000 / 60 / 10;
                 longDataTime = longDataTime * 1000 * 60 * 10;
                 strBuff.append(protocolType).append(AttrConstants.SEPARATOR)
-                        
.append(topicEntry.getKey()).append(AttrConstants.SEPARATOR)
+                        .append(groupId).append(AttrConstants.SEPARATOR)
                         
.append(streamIdEntry.getKey()).append(AttrConstants.SEPARATOR)
                         .append(strRemoteIP).append(AttrConstants.SEPARATOR)
                         .append(getLocalIp()).append(AttrConstants.SEPARATOR)
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index 0cf601495f..6800fd01cf 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -153,11 +153,6 @@ public abstract class BaseSource
         Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
                 SourceConstants.SRCCXT_MESSAGE_HANDLER_NAME + " config is 
blank");
         this.messageHandlerName = tmpVal;
-        // get default topic
-        tmpVal = context.getString(SourceConstants.SRCCXT_DEF_TOPIC);
-        if (StringUtils.isNotBlank(tmpVal)) {
-            this.defTopic = tmpVal.trim();
-        }
         // get default attributes
         tmpVal = context.getString(SourceConstants.SRCCXT_DEF_ATTR);
         if (StringUtils.isNotBlank(tmpVal)) {
@@ -367,10 +362,6 @@ public abstract class BaseSource
         return strPort;
     }
 
-    public String getDefTopic() {
-        return defTopic;
-    }
-
     public String getDefAttr() {
         return defAttr;
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
index 0dc9b326d7..605dfd8872 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
@@ -34,7 +34,7 @@ public class InLongMessageFactory extends 
ChannelInitializer<SocketChannel> {
 
     public static final int INLONG_LENGTH_FIELD_OFFSET = 0;
     public static final int INLONG_LENGTH_FIELD_LENGTH = 4;
-    public static final int INLONG_LENGTH_ADJUSTMENT = -4;
+    public static final int INLONG_LENGTH_ADJUSTMENT = 0;
     public static final int INLONG_INITIAL_BYTES_TO_STRIP = 0;
     public static final boolean DEFAULT_FAIL_FAST = true;
     private static final Logger LOG = 
LoggerFactory.getLogger(InLongMessageFactory.class);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index 9fb5144aa2..a4d094872b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -75,7 +75,6 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 
* 1000);
 
     private static final int INLONG_MSG_V1 = 1;
-    private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
 
     private static final ConfigManager configManager = 
ConfigManager.getInstance();
     private final BaseSource source;
@@ -421,14 +420,10 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             // get configured topic name
             String topic = 
configManager.getTopicName(event.getInlongGroupId(), event.getInlongStreamId());
             if (StringUtils.isBlank(topic)) {
-                if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
-                    topic = source.getDefTopic();
-                } else {
-                    
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
-                    source.addMetric(false, event.getBody().length, event);
-                    this.responsePackage(ctx, 
ProxySdk.ResultCode.ERR_ID_ERROR, packObject);
-                    return;
-                }
+                
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+                source.addMetric(false, event.getBody().length, event);
+                this.responsePackage(ctx, ProxySdk.ResultCode.ERR_ID_ERROR, 
packObject);
+                return;
             }
             event.setTopic(topic);
             // put to channel
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 8cb0e018ce..c47e6d5cac 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -33,8 +33,6 @@ public class SourceConstants {
     public static final String SRCCXT_MSG_FACTORY_NAME = "msg-factory-name";
     // message handler name
     public static final String SRCCXT_MESSAGE_HANDLER_NAME = 
"message-handler-name";
-    // default topic name
-    public static final String SRCCXT_DEF_TOPIC = "topic";
     // default attributes
     public static final String SRCCXT_DEF_ATTR = "attr";
     // max message length
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index 4fe96f57ec..d5dd70c7a4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -22,7 +22,6 @@ import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.msg.MsgType;
 import org.apache.inlong.dataproxy.base.SinkRspEvent;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.source2.BaseSource;
@@ -149,7 +148,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             return false;
         }
         // build message seqId
-        this.msgSeqId = strBuff.append(this.topicName)
+        this.msgSeqId = strBuff.append(this.groupId)
                 .append(AttributeConstants.SEPARATOR).append(this.streamId)
                 .append(AttributeConstants.SEPARATOR).append(strRemoteIP)
                 
.append("#").append(dataTimeMs).append("#").append(uniq).toString();
@@ -330,15 +329,11 @@ public class CodecBinMsg extends AbsV0MsgCodec {
         // get and check topic configure
         this.topicName = configManager.getTopicName(this.groupId, 
this.streamId);
         if (StringUtils.isBlank(this.topicName)) {
-            if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
-                this.topicName = source.getDefTopic();
-            } else {
-                
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
-                this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
-                this.errMsg = String.format("Topic not configured for 
groupId=(%s), streamId=(%s)",
-                        this.groupId, this.streamId);
-                return false;
-            }
+            
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+            this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
+            this.errMsg = String.format("Topic not configured for 
groupId=(%s), streamId=(%s)",
+                    this.groupId, this.streamId);
+            return false;
         }
         if (StringUtils.isBlank(this.streamId)) {
             this.streamId = "";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
index 50f7552a32..07a8f1aac5 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
@@ -21,7 +21,6 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.msg.MsgType;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.source2.BaseSource;
@@ -120,17 +119,19 @@ public class CodecTextMsg extends AbsV0MsgCodec {
             int readPos = 0;
             int singleMsgLen;
             ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyData);
-            while (bodyBuffer.remaining() > 0) {
+            int remaining = bodyBuffer.remaining();
+            while (remaining > 0) {
                 singleMsgLen = bodyBuffer.getInt(readPos);
-                if (singleMsgLen <= 0 || singleMsgLen > 
bodyBuffer.remaining()) {
+                if (singleMsgLen <= 0 || singleMsgLen > remaining) {
                     
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ITEM_LEN_MALFORMED);
                     this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
                     this.errMsg = String.format(
                             "Malformed data len, singleMsgLen(%d), buffer 
remaining(%d), attr: (%s)",
-                            singleMsgLen, bodyBuffer.remaining(), origAttr);
+                            singleMsgLen, remaining, origAttr);
                     return false;
                 }
                 readPos += 4 + singleMsgLen;
+                remaining -= 4 + singleMsgLen;
             }
         }
         return true;
@@ -148,15 +149,11 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         // get and check topic configure
         String tmpTopicName = 
ConfigManager.getInstance().getTopicName(tmpGroupId, tmpStreamId);
         if (StringUtils.isBlank(tmpTopicName)) {
-            if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
-                tmpTopicName = source.getDefTopic();
-            } else {
-                
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
-                this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
-                this.errMsg = String.format(
-                        "Topic not configured for groupId=(%s), 
streamId=(%s)", tmpGroupId, tmpStreamId);
-                return false;
-            }
+            
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+            this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
+            this.errMsg = String.format(
+                    "Topic not configured for groupId=(%s), streamId=(%s)", 
tmpGroupId, tmpStreamId);
+            return false;
         }
         this.groupId = tmpGroupId;
         this.topicName = tmpTopicName;
@@ -190,7 +187,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         // process sequence id
         String sequenceId = attrMap.get(AttributeConstants.SEQUENCE_ID);
         if (StringUtils.isNotBlank(sequenceId)) {
-            
strBuff.append(topicName).append(AttributeConstants.SEPARATOR).append(streamId)
+            
strBuff.append(groupId).append(AttributeConstants.SEPARATOR).append(streamId)
                     .append(AttributeConstants.SEPARATOR).append(sequenceId)
                     .append("#").append(strRemoteIP);
             msgSeqId = strBuff.toString();
@@ -251,6 +248,18 @@ public class CodecTextMsg extends AbsV0MsgCodec {
             inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);
             attrMap.put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(this.msgCount));
         } else {
+            if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))) {
+                if (bodyData[bodyData.length - 1] == '\n') {
+                    int tripDataLen = bodyData.length - 1;
+                    if (bodyData[bodyData.length - 2] == '\r') {
+                        tripDataLen = bodyData.length - 2;
+                    }
+                    byte[] tripData = new byte[tripDataLen];
+                    System.arraycopy(bodyData, 0, tripData, 0, tripDataLen);
+                    bodyData = tripData;
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_TRIP);
+                }
+            }
             inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);
         }
         long pkgTime = inLongMsg.getCreatetime();
@@ -258,5 +267,4 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         inLongMsg.reset();
         return event;
     }
-
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 930eccc700..9f0111e21e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -23,8 +23,6 @@ import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 /**
  * Test for {@link CommonConfigHolder}
  */
@@ -35,8 +33,10 @@ public class TestCommonConfigHolder {
         Assert.assertEquals("proxy_inlong5th_sz",
                 CommonConfigHolder.getInstance().getClusterName());
         
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableWhiteList());
-        assertEquals("DataProxy",
+        Assert.assertEquals("DataProxy",
                 
CommonConfigHolder.getInstance().getProperties().get(MetricListener.KEY_METRIC_DOMAINS));
-        assertEquals(10000, 
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
+        Assert.assertEquals(10000, 
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
+        
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept());
+        
Assert.assertTrue(CommonConfigHolder.getInstance().getDefTopics().contains("test2"));
     }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties 
b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index 49d0e277c0..4ee6350595 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -25,3 +25,8 @@ startup.using.local.meta.file.enable=true
 proxy.enable.whitelist=true
 
 meta.config.sync.interval.ms=10000
+
+# whether to accept messages of unconfigured topics
+id2topic.unconfigured.accept.enable=true
+# the default topic if accept unconfigured topic's message
+id2topic.unconfigured.default.topics= test1 test2 test3

Reply via email to