This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 62d832e83e [INLONG-8228][DataProxy] Optimize the implementation of the 
index output to files (#8229)
62d832e83e is described below

commit 62d832e83ee9c8e4cf6000b97b14164f1f36cd46
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jun 14 11:00:38 2023 +0800

    [INLONG-8228][DataProxy] Optimize the implementation of the index output to 
files (#8229)
---
 .../inlong/common/enums/DataProxyErrCode.java      |   6 +-
 inlong-dataproxy/conf/log4j2.xml                   |   4 +-
 .../channel/FailoverChannelProcessor.java          |   9 +-
 .../dataproxy/config/CommonConfigHolder.java       |   2 +-
 .../inlong/dataproxy/consts/StatConstants.java     |  57 ++++---
 .../exception/MainChannelFullException.java        |  33 ++++
 .../dataproxy/metrics/stats/AbsStatsDaemon.java    | 141 +++++++++++++++++
 .../dataproxy/metrics/stats/MonitorIndex.java      | 174 +++++++++++++++++++++
 .../dataproxy/metrics/stats/MonitorStats.java      | 148 ++++++++++++++++++
 .../inlong/dataproxy/source2/BaseSource.java       |  75 ++++++---
 .../dataproxy/source2/InLongMessageHandler.java    | 126 ++++++++-------
 .../inlong/dataproxy/source2/SimpleHttpSource.java |   5 +
 .../inlong/dataproxy/source2/SimpleTcpSource.java  |   5 +
 .../inlong/dataproxy/source2/SimpleUdpSource.java  |   5 +
 .../inlong/dataproxy/source2/SourceConstants.java  |  10 ++
 .../source2/httpMsg/InLongHttpMsgHandler.java      |  77 +++++----
 .../dataproxy/source2/v0msg/AbsV0MsgCodec.java     |  34 +++-
 .../dataproxy/source2/v0msg/CodecBinMsg.java       |  57 +++----
 .../dataproxy/source2/v0msg/CodecTextMsg.java      |  19 ++-
 19 files changed, 802 insertions(+), 185 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index 5caeb2d875..c4ef15ce86 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -38,7 +38,8 @@ public enum DataProxyErrCode {
     HTTP_UNSUPPORTED_SERVICE_URI(35, "Un-supported service uri"),
     HTTP_UNSUPPORTED_CONTENT_TYPE(36, "Un-supported content type"),
 
-    FIELD_VALUE_NOT_EQUAL(95, "Field value not equal"),
+    FIELD_MAGIC_NOT_EQUAL(94, "Magic value not equal"),
+    FIELD_LENGTH_VALUE_NOT_EQUAL(95, "Field length value not equal"),
     UNCOMPRESS_DATA_ERROR(96, "Uncompress data error"),
 
     MISS_REQUIRED_GROUPID_ARGUMENT(100, "Parameter groupId is required"),
@@ -68,6 +69,9 @@ public enum DataProxyErrCode {
     GROUPID_OR_STREAMID_NOT_CONFIGURE(121, "GroupId or StreamId not found in 
configure"),
     GROUPID_OR_STREAMID_INCONSTANT(122, "GroupId or StreamId inconstant"),
 
+    ATTR_ORDER_CONTROL_CONFLICT_ERROR(150, "Require order send but isAck is 
false"),
+    ATTR_PROXY_CONTROL_CONFLICT_ERROR(151, "Require proxy send but isAck is 
false"),
+
     UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error");
 
     private final int errCode;
diff --git a/inlong-dataproxy/conf/log4j2.xml b/inlong-dataproxy/conf/log4j2.xml
index f05e67a039..6a48d1b2f3 100644
--- a/inlong-dataproxy/conf/log4j2.xml
+++ b/inlong-dataproxy/conf/log4j2.xml
@@ -159,10 +159,10 @@
     </appenders>
 
     <loggers>
-        <logger name="org.apache.inlong.common.monitor.MonitorIndexExt" 
level="info" additivity="false">
+        <logger name="org.apache.inlong.dataproxy.metrics.stats.MonitorStats" 
level="info" additivity="false">
             <appender-ref ref="MonitorFile"/>
         </logger>
-        <logger name="org.apache.inlong.common.monitor.MonitorIndex" 
level="info" additivity="false">
+        <logger name="org.apache.inlong.dataproxy.metrics.stats.MonitorIndex" 
level="info" additivity="false">
             <appender-ref ref="IndexFile"/>
         </logger>
         <logger name="org.apache.pulsar" level="info" additivity="false">
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
index 5940da14ce..2162019317 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
@@ -17,10 +17,9 @@
 
 package org.apache.inlong.dataproxy.channel;
 
-import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.dataproxy.base.SinkRspEvent;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.exception.MainChannelFullException;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 
 import com.google.common.base.Preconditions;
@@ -258,7 +257,7 @@ public class FailoverChannelProcessor
                 tx.commit();
 
             } catch (Throwable t) {
-                errMsg = "Unable to put event on channel" + 
reqChannel.getName()
+                errMsg = "Unable to put event on channel " + 
reqChannel.getName()
                         + ", error message is " + t.getMessage();
                 if (logPrinter.shouldPrint()) {
                     LOG.error("FailoverChannelProcessor Unable to put event on 
required channel: "
@@ -279,9 +278,7 @@ public class FailoverChannelProcessor
         }
         if (!success) {
             if (MessageUtils.isSyncSendForOrder(event)) {
-                MessageUtils.sinkReturnRspPackage((SinkRspEvent) event,
-                        DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE, errMsg);
-                return;
+                throw new MainChannelFullException(errMsg);
             }
             List<Channel> optionalChannels = 
selector.getOptionalChannels(event);
             for (Channel optChannel : optionalChannels) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index 02afd1a80e..81ba91e4df 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -102,7 +102,7 @@ public class CommonConfigHolder {
     public static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
     // event metric statistic name
     public static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME = 
"file.metric.event.output.name";
-    public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME = 
"DataProxy_monitors";
+    public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME = "Stats";
     // Audit fields
     public static final String KEY_ENABLE_AUDIT = "audit.enable";
     public static final boolean VAL_DEF_ENABLE_AUDIT = true;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 1d262a8112..48e087c10c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -27,9 +27,24 @@ public class StatConstants {
     public static final java.lang.String EVENT_VISIT_LINKIN = "visit.linkin";
     public static final java.lang.String EVENT_VISIT_LINKOUT = "visit.linkout";
     public static final java.lang.String EVENT_VISIT_EXCEPTION = 
"visit.exception";
+    // channel
+    public static final java.lang.String EVENT_REMOTE_UNWRITABLE = 
"socket.unwritable";
     // configure
     public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING = 
"config.topic.missing";
+    public static final java.lang.String EVENT_CONFIG_IDNUM_EMPTY = 
"config.idnum.empty";
+    public static final java.lang.String EVENT_CONFIG_GROUPIDNUM_MISSING = 
"config.groupidnum.missing";
+    public static final java.lang.String EVENT_CONFIG_GROUP_IDNUM_INCONSTANT = 
"config.group.idnum.incons";
+    public static final java.lang.String EVENT_CONFIG_STREAMIDNUM_MISSING = 
"config.streamidnum.missing";
+    public static final java.lang.String EVENT_CONFIG_STREAM_IDNUM_INCONSTANT 
= "config.stream.idnum.incons";
     // source
+    public static final java.lang.String EVENT_PKG_READABLE_EMPTY = 
"pkg.readable.empty";
+    public static final java.lang.String EVENT_PKG_READABLE_OVERMAX = 
"pkg.readable.overmax";
+    public static final java.lang.String EVENT_PKG_READABLE_UNFILLED = 
"pkg.readable.unfilled";
+    public static final java.lang.String EVENT_PKG_MSGTYPE_V0_INVALID = 
"pkg.msgtype.v0.invalid";
+    public static final java.lang.String EVENT_PKG_MSGTYPE_V1_INVALID = 
"pkg.msgtype.v1.invalid";
+    // message
+    public static final java.lang.String EVENT_MSG_BIN_TOTALLEN_BELOWMIN = 
"msg.bin.totallen.belowmin";
+    public static final java.lang.String EVENT_MSG_TXT_TOTALLEN_BELOWMIN = 
"msg.txt.totallen.belowmin";
     public static final java.lang.String EVENT_MSG_DECODE_FAIL = 
"msg.decode.failure";
     public static final java.lang.String EVENT_MSG_METHOD_INVALID = 
"msg.method.invalid";
     public static final java.lang.String EVENT_MSG_PATH_INVALID = 
"msg.path.invalid";
@@ -38,29 +53,29 @@ public class StatConstants {
     public static final java.lang.String EVENT_MSG_STREAMID_MISSING = 
"msg.streamid.missing";
     public static final java.lang.String EVENT_MSG_BODY_MISSING = 
"msg.body.missing";
     public static final java.lang.String EVENT_MSG_BODY_BLANK = 
"msg.body.blank";
+    public static final java.lang.String EVENT_MSG_BODY_ZERO = "msg.body.zero";
+    public static final java.lang.String EVENT_MSG_BODY_NEGATIVE = 
"msg.body.negative";
+    public static final java.lang.String EVENT_MSG_BODY_UNPRESS_EXP = 
"msg.body.unpress.exp";
     public static final java.lang.String EVENT_MSG_BODY_OVERMAX = 
"msg.body.overmax";
+    public static final java.lang.String EVENT_MSG_ATTR_NEGATIVE = 
"msg.attr.negative";
+    public static final java.lang.String EVENT_MSG_MAGIC_UNEQUAL = 
"msg.magic.unequal";
+    public static final java.lang.String EVENT_MSG_HB_TOTALLEN_BELOWMIN = 
"msg.hb.totallen.belowmin";
+    public static final java.lang.String EVENT_MSG_HB_MAGIC_UNEQUAL = 
"msg.hb.magic.unequal";
+    public static final java.lang.String EVENT_MSG_HB_LEN_MALFORMED = 
"msg.hb.len.malformed";
+    public static final java.lang.String EVENT_MSG_BIN_LEN_MALFORMED = 
"msg.bin.len.malformed";
+    public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED = 
"msg.txt.len.malformed";
+    public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED = 
"msg.item.len.malformed";
+    public static final java.lang.String EVENT_MSG_ATTR_INVALID = 
"msg.attr.invalid";
+    public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID = 
"msg.attr.order.noack";
+    public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID = 
"msg.attr.proxy.noack";
+    public static final java.lang.String EVENT_MSG_INDEXMSG_ILLEGAL = 
"msg.index.illegal";
+    public static final java.lang.String EVENT_MSG_GROUPIDNUM_ZERO = 
"msg.groupidnum.zero";
+    public static final java.lang.String EVENT_MSG_STREAMIDNUM_ZERO = 
"msg.streamidnum.zero";
     public static final java.lang.String EVENT_MSG_HB_SUCCESS = 
"msg.hb.success";
-    public static final java.lang.String EVENT_MSG_POST_SUCCESS = 
"msg.post.success";
-    public static final java.lang.String EVENT_MSG_POST_FAILURE = 
"msg.post.failure";
-
-    public static final java.lang.String EVENT_EMPTY = "socketmsg.empty";
-    public static final java.lang.String EVENT_OVERMAXLEN = 
"socketmsg.overmaxlen";
-    public static final java.lang.String EVENT_NOTEQUALLEN = 
"socketmsg.notequallen";
-    public static final java.lang.String EVENT_MSGUNKNOWN_V0 = 
"socketmsg.unknownV0";
-    public static final java.lang.String EVENT_MSGUNKNOWN_V1 = 
"socketmsg.unknownV1";
-    public static final java.lang.String EVENT_MALFORMED = 
"socketmsg.malformed";
-    public static final java.lang.String EVENT_NOBODY = "socketmsg.nobody";
-    public static final java.lang.String EVENT_NEGBODY = "socketmsg.negbody";
-    public static final java.lang.String EVENT_NEGATTR = "socketmsg.negattr";
-    public static final java.lang.String EVENT_INVALIDATTR = 
"socketmsg.invattr";
-    public static final java.lang.String EVENT_UNSUPMSG = "socketmsg.unsupmsg";
-    public static final java.lang.String EVENT_UNPRESSEXP = 
"socketmsg.upressexp";
-    public static final java.lang.String EVENT_WITHOUTGROUPID = 
"socketmsg.wogroupid";
-    public static final java.lang.String EVENT_INCONSGROUPORSTREAMID = 
"socketmsg.inconsids";
-    public static final java.lang.String EVENT_CHANNEL_NOT_WRITABLE = 
"socketch.notwritable";
-    public static final java.lang.String EVENT_POST_SUCCESS = 
"socketmsg.success";
-    public static final java.lang.String EVENT_POST_DROPPED = 
"socketmsg.dropped";
-    // http
+    public static final java.lang.String EVENT_MSG_V0_POST_SUCCESS = 
"msg.post.v0.success";
+    public static final java.lang.String EVENT_MSG_V0_POST_FAILURE = 
"msg.post.v0.failure";
+    public static final java.lang.String EVENT_MSG_V1_POST_SUCCESS = 
"msg.post.v1.success";
+    public static final java.lang.String EVENT_MSG_V1_POST_DROPPED = 
"msg.post.v1.dropped";
 
     public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
     public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/MainChannelFullException.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/MainChannelFullException.java
new file mode 100644
index 0000000000..6b5379ac72
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/MainChannelFullException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.exception;
+
+import org.apache.flume.ChannelException;
+
+/**
+ * MainChannelFullException
+ *
+ * When orderly sending and proxy sending messages, if the main channel is 
full,
+ * it will no longer cache and wait, and directly throw this exception.
+ */
+public class MainChannelFullException extends ChannelException {
+
+    public MainChannelFullException(String message) {
+        super(message);
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/AbsStatsDaemon.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/AbsStatsDaemon.java
new file mode 100644
index 0000000000..93bb0c73e2
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/AbsStatsDaemon.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.stats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * AbsStatsDaemon
+ *
+ *
+ * Statistics daemon that periodically outputs metrics data to a file.
+ */
+
+public abstract class AbsStatsDaemon implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbsStatsDaemon.class);
+    private static final long MAX_PRINT_TIME_MS = 10000L;
+    private final String name;
+    private final String threadName;
+    private final long intervalMs;
+    private final int maxStatsCnt;
+    private final Thread daemon;
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+    private final AtomicInteger writeIndex = new AtomicInteger(0);
+
+    public AbsStatsDaemon(String name, long intervalMs, int maxCnt) {
+        this.name = name;
+        this.maxStatsCnt = maxCnt;
+        this.intervalMs = intervalMs;
+        this.threadName = "Daemon_Thread_" + name;
+        this.daemon = new Thread(this, this.threadName);
+        this.daemon.setDaemon(true);
+    }
+
+    public void start() {
+        this.daemon.start();
+        LOGGER.info("{} is started!", this.name);
+    }
+
+    public boolean isStopped() {
+        return this.shutdown.get();
+    }
+
+    public boolean stop() {
+        if (this.shutdown.get()) {
+            return true;
+        }
+        if (this.shutdown.compareAndSet(false, true)) {
+            LOGGER.info("{} is closing ......", this.name);
+            try {
+                if (this.daemon != null) {
+                    this.daemon.interrupt();
+                    this.daemon.join();
+                }
+            } catch (Throwable e) {
+                //
+            }
+            LOGGER.info("{} is stopped", this.name);
+            return false;
+        }
+        return true;
+    }
+
+    protected abstract int loopProcess(long startTime);
+
+    protected abstract int exitProcess(long startTime);
+
+    /**
+     * Get the writable index
+     *
+     * @return the writable block index
+     */
+    protected int getWriteIndex() {
+        return Math.abs(writeIndex.get() % 2);
+    }
+
+    /**
+     * Gets the read index
+     *
+     * @return the read block index
+     */
+    protected int getReadIndex() {
+        return Math.abs((writeIndex.get() - 1) % 2);
+    }
+
+    @Override
+    public void run() {
+        int printCnt;
+        long startTime;
+        LOGGER.info("{} is started", this.threadName);
+        // process daemon task
+        while (!shutdown.get()) {
+            try {
+                Thread.sleep(intervalMs);
+                writeIndex.incrementAndGet();
+                startTime = System.currentTimeMillis();
+                printCnt = loopProcess(startTime);
+                checkAndPrintStatus(printCnt, System.currentTimeMillis() - 
startTime);
+            } catch (InterruptedException e) {
+                LOGGER.info("{} has been interrupted", this.threadName);
+                break;
+            } catch (Throwable t) {
+                LOGGER.info("{} throw a exception", this.threadName);
+            }
+        }
+        // process exit output
+        startTime = System.currentTimeMillis();
+        printCnt = exitProcess(startTime);
+        checkAndPrintStatus(printCnt, System.currentTimeMillis() - startTime);
+        LOGGER.info("{} is stopped", this.threadName);
+    }
+
+    private void checkAndPrintStatus(int printCnt, long outputTime) {
+        if (printCnt > maxStatsCnt) {
+            LOGGER.warn("{} print {} items, over max allowed count {}",
+                    this.threadName, printCnt, this.maxStatsCnt);
+        }
+        if (outputTime > MAX_PRINT_TIME_MS) {
+            LOGGER.warn("{} print items wasts {} ms", this.threadName, 
outputTime);
+        }
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorIndex.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorIndex.java
new file mode 100644
index 0000000000..1aaa7c0ddb
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorIndex.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.stats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * MonitorIndex
+ *
+ *
+ * The index statistics received or sent by DataProxy nodes, and output to 
file.
+ */
+public class MonitorIndex extends AbsStatsDaemon {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MonitorIndex.class);
+
+    private static final AtomicLong RECODE_ID = new AtomicLong(0);
+    private final StatsUnit[] statsUnits = new StatsUnit[2];
+
+    public MonitorIndex(String name, long intervalMill, int maxCnt) {
+        super(name, intervalMill, maxCnt);
+        this.statsUnits[0] = new StatsUnit(name);
+        this.statsUnits[1] = new StatsUnit(name);
+    }
+
+    /**
+     * Add success statistic
+     *
+     * @param key the statistic key
+     * @param msgCnt  the message count
+     * @param packCnt the package count
+     * @param packSize the package size
+     */
+    public void addSuccStats(String key, int msgCnt, int packCnt, long 
packSize) {
+        if (isStopped()) {
+            return;
+        }
+        statsUnits[getWriteIndex()].addSuccCnt(key, msgCnt, packCnt, packSize);
+    }
+
+    /**
+     * Add failure statistic
+     *
+     * @param key  the statistic key
+     * @param failCnt  the failure count
+     */
+    public void addFailStats(String key, int failCnt) {
+        if (isStopped()) {
+            return;
+        }
+        statsUnits[getWriteIndex()].addFailCnt(key, failCnt);
+    }
+
+    @Override
+    protected int loopProcess(long startTime) {
+        return statsUnits[getReadIndex()].printAndResetStatsInfo(startTime);
+    }
+
+    @Override
+    protected int exitProcess(long startTime) {
+        int totalCnt = 0;
+        if (!statsUnits[getReadIndex()].isEmpty()) {
+            totalCnt += 
statsUnits[getReadIndex()].printAndResetStatsInfo(startTime);
+        }
+        if (!statsUnits[getWriteIndex()].isEmpty()) {
+            totalCnt += 
statsUnits[getWriteIndex()].printAndResetStatsInfo(startTime);
+        }
+        return totalCnt;
+    }
+
+    private static class StatsUnit {
+
+        private final String statsName;
+        private final ConcurrentHashMap<String, StatsItem> counterMap = new 
ConcurrentHashMap<>();
+
+        public StatsUnit(String statsName) {
+            this.statsName = statsName;
+        }
+
+        public boolean isEmpty() {
+            return counterMap.isEmpty();
+        }
+
+        public void addSuccCnt(String key, int cnt, int packcnt, long 
packsize) {
+            StatsItem statsItem = counterMap.get(key);
+            if (statsItem == null) {
+                StatsItem tmpItem = new StatsItem();
+                statsItem = counterMap.putIfAbsent(key, tmpItem);
+                if (statsItem == null) {
+                    statsItem = tmpItem;
+                }
+            }
+            statsItem.addSuccessCnt(cnt, packcnt, packsize);
+        }
+
+        public void addFailCnt(String key, int failCnt) {
+            StatsItem statsItem = counterMap.get(key);
+            if (statsItem == null) {
+                StatsItem tmpItem = new StatsItem();
+                statsItem = counterMap.putIfAbsent(key, tmpItem);
+                if (statsItem == null) {
+                    statsItem = tmpItem;
+                }
+            }
+            statsItem.addFailCnt(failCnt);
+        }
+
+        public int printAndResetStatsInfo(long startTime) {
+            int printCnt = 0;
+            // get print time (second)
+            long printTime = startTime / 1000;
+            for (Map.Entry<String, StatsItem> entry : counterMap.entrySet()) {
+                if (entry == null || entry.getKey() == null || 
entry.getValue() == null) {
+                    continue;
+                }
+                LOGGER.info("{}#{}_{}#{}#{}", this.statsName, printTime,
+                        RECODE_ID.incrementAndGet(), entry.getKey(), 
entry.getValue().toString());
+                printCnt++;
+            }
+            counterMap.clear();
+            return printCnt;
+        }
+    }
+
+    private static class StatsItem {
+
+        private final LongAdder msgCnt = new LongAdder();
+        private final LongAdder packCnt = new LongAdder();
+        private final LongAdder packSize = new LongAdder();
+        private final LongAdder failCnt = new LongAdder();
+
+        public StatsItem() {
+
+        }
+
+        public void addSuccessCnt(int msgCnt, int packCnt, long packSize) {
+            this.msgCnt.add(msgCnt);
+            this.packCnt.add(packCnt);
+            this.packSize.add(packSize);
+        }
+
+        public void addFailCnt(int failCnt) {
+            this.failCnt.add(failCnt);
+        }
+
+        @Override
+        public String toString() {
+            return msgCnt.longValue() + "#" + packCnt.longValue() + "#"
+                    + packSize.longValue() + "#" + failCnt.longValue();
+        }
+    }
+
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorStats.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorStats.java
new file mode 100644
index 0000000000..4fd25337bc
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorStats.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.stats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * MonitorStats
+ *
+ *
+ * Summary statistics and detailed statistics on failure, output to file.
+ */
+public class MonitorStats extends AbsStatsDaemon {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MonitorStats.class);
+
+    private final StatsUnit[] statsUnits = new StatsUnit[2];
+
+    public MonitorStats(String name, long intervalMill, int maxCnt) {
+        super(name, intervalMill, maxCnt);
+        this.statsUnits[0] = new StatsUnit(name);
+        this.statsUnits[1] = new StatsUnit(name);
+    }
+
+    /**
+     * add summary statistic items
+     *
+     * @param sumKey the summary key
+     */
+    public void incSumStats(String sumKey) {
+        if (isStopped()) {
+            return;
+        }
+        statsUnits[getWriteIndex()].incSumStats(sumKey);
+    }
+
+    /**
+     * add detail statistic items
+     *
+     * @param detailKey the detail key
+     */
+    public void incDetailStats(String detailKey) {
+        if (isStopped()) {
+            return;
+        }
+        statsUnits[getWriteIndex()].incDetailStats(detailKey);
+    }
+
+    @Override
+    protected int loopProcess(long startTime) {
+        return statsUnits[getReadIndex()].printAndResetStatsInfo();
+    }
+
+    @Override
+    protected int exitProcess(long startTime) {
+        int totalCnt = 0;
+        if (!statsUnits[getReadIndex()].isEmpty()) {
+            totalCnt += statsUnits[getReadIndex()].printAndResetStatsInfo();
+        }
+        if (!statsUnits[getWriteIndex()].isEmpty()) {
+            totalCnt += statsUnits[getWriteIndex()].printAndResetStatsInfo();
+        }
+        return totalCnt;
+    }
+
+    private static class StatsUnit {
+
+        private final String statsName;
+        private final ConcurrentHashMap<String, LongAdder> sumMap = new 
ConcurrentHashMap<>();
+        private final ConcurrentHashMap<String, LongAdder> detailsMap = new 
ConcurrentHashMap<>();
+
+        public StatsUnit(String statsName) {
+            this.statsName = statsName;
+        }
+
+        public boolean isEmpty() {
+            return sumMap.isEmpty() && detailsMap.isEmpty();
+        }
+
+        public void incSumStats(String key) {
+            LongAdder statsItem = sumMap.get(key);
+            if (statsItem == null) {
+                LongAdder tmpItem = new LongAdder();
+                statsItem = sumMap.putIfAbsent(key, tmpItem);
+                if (statsItem == null) {
+                    statsItem = tmpItem;
+                }
+            }
+            statsItem.increment();
+        }
+
+        public void incDetailStats(String key) {
+            LongAdder statsItem = detailsMap.get(key);
+            if (statsItem == null) {
+                LongAdder tmpItem = new LongAdder();
+                statsItem = detailsMap.putIfAbsent(key, tmpItem);
+                if (statsItem == null) {
+                    statsItem = tmpItem;
+                }
+            }
+            statsItem.increment();
+        }
+
+        public int printAndResetStatsInfo() {
+            int sumCnt = 0;
+            // print summary info
+            for (Map.Entry<String, LongAdder> entry : sumMap.entrySet()) {
+                if (entry == null || entry.getKey() == null || 
entry.getValue() == null) {
+                    continue;
+                }
+                LOGGER.info("{}.summary.{}={}", this.statsName, 
entry.getKey(), entry.getValue());
+                sumCnt++;
+            }
+            // print detail info
+            for (Map.Entry<String, LongAdder> entry : detailsMap.entrySet()) {
+                if (entry == null || entry.getKey() == null || 
entry.getValue() == null) {
+                    continue;
+                }
+                LOGGER.info("{}.detail.{}={}", this.statsName, entry.getKey(), 
entry.getValue());
+                sumCnt++;
+            }
+            sumMap.clear();
+            detailsMap.clear();
+            return sumCnt;
+        }
+    }
+
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index fd9ae39caf..0cf601495f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.dataproxy.source2;
 
 import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
@@ -29,6 +27,8 @@ import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
 import org.apache.inlong.dataproxy.source2.httpMsg.InLongHttpMsgHandler;
 import org.apache.inlong.dataproxy.utils.AddressUtils;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
@@ -105,6 +105,12 @@ public abstract class BaseSource
     protected long maxReadIdleTimeMs;
     // max connection count
     protected int maxConnections;
+    // reuse address
+    protected boolean reuseAddress;
+    // connect backlog
+    protected int conBacklog;
+    // connect linger
+    protected int conLinger = -1;
     // netty parameters
     protected EventLoopGroup acceptorGroup;
     protected EventLoopGroup workerGroup;
@@ -116,7 +122,7 @@ public abstract class BaseSource
     protected int maxSendBufferSize;
     // file metric statistic
     protected MonitorIndex monitorIndex = null;
-    private MonitorIndexExt monitorIndexExt = null;
+    private MonitorStats monitorStats = null;
     // metric set
     protected DataProxyMetricItemSet metricItemSet;
 
@@ -202,6 +208,24 @@ public abstract class BaseSource
         Preconditions.checkArgument(this.maxConnections >= 
SourceConstants.VAL_MIN_CONNECTION_CNT,
                 SourceConstants.SRCCXT_MAX_CONNECTION_CNT + " must be >= "
                         + SourceConstants.VAL_MIN_CONNECTION_CNT);
+        // get connect backlog
+        this.conBacklog = ConfStringUtils.getIntValue(context,
+                SourceConstants.SRCCXT_CONN_BACKLOG, 
SourceConstants.VAL_DEF_CONN_BACKLOG);
+        Preconditions.checkArgument(this.conBacklog >= 
SourceConstants.VAL_MIN_CONN_BACKLOG,
+                SourceConstants.SRCCXT_CONN_BACKLOG + " must be >= "
+                        + SourceConstants.VAL_MIN_CONN_BACKLOG);
+        // get connect linger
+        Integer tmpValue = 
context.getInteger(SourceConstants.SRCCXT_CONN_LINGER);
+        if (tmpValue != null && tmpValue >= 0) {
+            this.conLinger = tmpValue;
+        }
+        // get whether reuse address
+        this.reuseAddress = 
context.getBoolean(SourceConstants.SRCCXT_REUSE_ADDRESS,
+                SourceConstants.VAL_DEF_REUSE_ADDRESS);
+
+        // get whether custom channel processor
+        this.customProcessor = 
context.getBoolean(SourceConstants.SRCCXT_CUSTOM_CHANNEL_PROCESSOR,
+                SourceConstants.VAL_DEF_CUSTOM_CH_PROCESSOR);
         // get max receive buffer size
         this.maxRcvBufferSize = ConfStringUtils.getIntValue(context,
                 SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE, 
SourceConstants.VAL_DEF_RECEIVE_BUFFER_SIZE);
@@ -233,13 +257,15 @@ public abstract class BaseSource
         // init monitor logic
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
             this.monitorIndex = new 
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(),
-                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
                     
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
-            this.monitorIndexExt = new MonitorIndexExt(
+            this.monitorIndex.start();
+            this.monitorStats = new MonitorStats(
                     
CommonConfigHolder.getInstance().getFileMetricEventOutName()
                             + AttrConstants.SEP_HASHTAG + 
this.getProtocolName(),
-                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
                     
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+            this.monitorStats.start();
         }
         startSource();
         // register
@@ -269,15 +295,6 @@ public abstract class BaseSource
         }
         // stop super class
         super.stop();
-        // stop file statistic index
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
-            if (monitorIndex != null) {
-                monitorIndex.shutDown();
-            }
-            if (monitorIndexExt != null) {
-                monitorIndexExt.shutDown();
-            }
-        }
         // stop workers
         if (this.acceptorGroup != null) {
             this.acceptorGroup.shutdownGracefully();
@@ -285,6 +302,15 @@ public abstract class BaseSource
         if (this.workerGroup != null) {
             this.workerGroup.shutdownGracefully();
         }
+        // stop file statistic index
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            if (monitorIndex != null) {
+                monitorIndex.stop();
+            }
+            if (monitorStats != null) {
+                monitorStats.stop();
+            }
+        }
         logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), 
this.getName());
     }
 
@@ -385,18 +411,29 @@ public abstract class BaseSource
         return maxWorkerThreads;
     }
 
-    public void fileMetricEventInc(String eventKey) {
+    public void fileMetricIncSumStats(String eventKey) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            monitorStats.incSumStats(eventKey);
+        }
+    }
+
+    public void fileMetricIncDetailStats(String eventKey) {
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
-            monitorIndexExt.incrementAndGet(eventKey);
+            monitorStats.incDetailStats(eventKey);
         }
     }
 
-    public void fileMetricRecordAdd(String key, int cnt, int packCnt, long 
packSize, int failCnt) {
+    public void fileMetricAddSuccCnt(String key, int cnt, int packCnt, long 
packSize) {
         if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
-            monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
+            monitorIndex.addSuccStats(key, cnt, packCnt, packSize);
         }
     }
 
+    public void fileMetricAddFailCnt(String key, int failCnt) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            monitorIndex.addFailStats(key, failCnt);
+        }
+    }
     /**
      * addMetric
      *
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index 0594a8682b..9fb5144aa2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -44,7 +44,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,8 +92,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         if (msg == null) {
-            source.fileMetricEventInc(StatConstants.EVENT_EMPTY);
-            logger.debug("Get null msg, just skip!");
+            
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY);
             return;
         }
         ByteBuf cb = (ByteBuf) msg;
@@ -102,12 +100,11 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             int readableLength = cb.readableBytes();
             if (readableLength == 0 && source.isFilterEmptyMsg()) {
                 cb.clear();
-                source.fileMetricEventInc(StatConstants.EVENT_EMPTY);
-                logger.debug("skip empty msg.");
+                
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY);
                 return;
             }
             if (readableLength > source.getMaxMsgLength()) {
-                source.fileMetricEventInc(StatConstants.EVENT_OVERMAXLEN);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_OVERMAX);
                 throw new Exception("Error msg, readableLength(" + 
readableLength +
                         ") > max allowed message length (" + 
source.getMaxMsgLength() + ")");
             }
@@ -118,9 +115,9 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             if (readableLength < totalDataLen + INLONG_LENGTH_FIELD_LENGTH) {
                 // reset index when buffer is not satisfied.
                 cb.resetReaderIndex();
-                source.fileMetricEventInc(StatConstants.EVENT_NOTEQUALLEN);
-                throw new Exception("Error msg, channel buffer is not 
satisfied, and  readableLength="
-                        + readableLength + ", and totalPackLength=" + 
totalDataLen + " + 4");
+                
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_UNFILLED);
+                throw new Exception("Error msg, buffer is unfilled, 
readableLength="
+                        + readableLength + ", totalPackLength=" + totalDataLen 
+ " + 4");
             }
             // read type
             int msgTypeValue = cb.readByte();
@@ -133,7 +130,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
                     processV1Msg(ctx, cb, bodyLength);
                 } else {
                     // unknown message type
-                    
source.fileMetricEventInc(StatConstants.EVENT_MSGUNKNOWN_V1);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V1_INVALID);
                     throw new Exception("Unknown V1 message version, version = 
" + msgTypeValue);
                 }
             } else {
@@ -142,7 +139,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
                 MsgType msgType = MsgType.valueOf(msgTypeValue);
                 final long msgRcvTime = System.currentTimeMillis();
                 if (MsgType.MSG_UNKNOWN == msgType) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_MSGUNKNOWN_V0);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V0_INVALID);
                     if (logger.isDebugEnabled()) {
                         logger.debug("Received unknown message, channel {}", 
channel);
                     }
@@ -162,7 +159,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
                 // check whether totalDataLen is valid.
                 if (MsgType.MSG_BIN_MULTI_BODY == msgType) {
                     if (totalDataLen < BIN_MSG_FIXED_CONTENT_SIZE) {
-                        
source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+                        
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BIN_TOTALLEN_BELOWMIN);
                         String errMsg = String.format("Malformed msg, 
totalDataLen(%d) < min bin7-msg length(%d)",
                                 totalDataLen, BIN_MSG_FIXED_CONTENT_SIZE);
                         if (logger.isDebugEnabled()) {
@@ -173,7 +170,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
                     msgCodec = new CodecBinMsg(totalDataLen, msgTypeValue, 
msgRcvTime, strRemoteIP);
                 } else {
                     if (totalDataLen < TXT_MSG_FIXED_CONTENT_SIZE) {
-                        
source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+                        
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TXT_TOTALLEN_BELOWMIN);
                         String errMsg = String.format("Malformed msg, 
totalDataLen(%d) < min txt-msg length(%d)",
                                 totalDataLen, TXT_MSG_FIXED_CONTENT_SIZE);
                         if (logger.isDebugEnabled()) {
@@ -198,7 +195,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             String strRemoteIp = 
AddressUtils.getChannelRemoteIP(ctx.channel());
             if (strRemoteIp != null
                     && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
-                source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_ILLEGAL);
                 ctx.channel().disconnect();
                 ctx.channel().close();
                 logger.error(strRemoteIp + " is Illegal IP, so refuse it !");
@@ -207,7 +204,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         }
         // check max allowed connection count
         if (source.getAllChannels().size() >= source.getMaxConnections()) {
-            source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+            source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_OVERMAX);
             ctx.channel().disconnect();
             ctx.channel().close();
             logger.warn("{} refuse to connect = {} , connections = {}, 
maxConnections = {}",
@@ -217,32 +214,33 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         // add legal channel
         source.getAllChannels().add(ctx.channel());
         ctx.fireChannelActive();
-        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
+        source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
         logger.info("{} added new channel {}, current connections = {}, 
maxConnections = {}",
                 source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) {
-        logger.error("{} channel {} inactive", source.getName(), 
ctx.channel());
+        source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKOUT);
         ctx.fireChannelInactive();
         source.getAllChannels().remove(ctx.channel());
-        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-        source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
-        logger.error("{} channel {} throw exception", source.getName(), 
ctx.channel(), cause);
-        ctx.fireExceptionCaught(cause);
+        source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
+        if (logCounter.shouldPrint()) {
+            logger.warn("{} received an exception from channel {}",
+                    source.getName(), ctx.channel(), cause);
+        }
         if (ctx.channel() != null) {
+            source.getAllChannels().remove(ctx.channel());
             try {
                 ctx.channel().disconnect();
                 ctx.channel().close();
             } catch (Exception ex) {
                 //
             }
-            source.getAllChannels().remove(ctx.channel());
         }
         ctx.close();
     }
@@ -256,14 +254,14 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         }
         // check service status.
         if (source.isRejectService()) {
-            source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
+            source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
             msgCodec.setFailureInfo(DataProxyErrCode.SERVICE_CLOSED);
             responseV0Msg(channel, msgCodec, strBuff);
             return;
         }
         // check if the node is linked to the Manager.
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_SINK_UNREADY);
             msgCodec.setFailureInfo(DataProxyErrCode.SINK_SERVICE_UNREADY);
             responseV0Msg(channel, msgCodec, strBuff);
             return;
@@ -278,29 +276,37 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         // build metric data item
         long longDataTime = msgCodec.getDataTimeMs() / 1000 / 60 / 10;
         longDataTime = longDataTime * 1000 * 60 * 10;
-        
strBuff.append(source.getProtocolName()).append(AttrConstants.SEPARATOR)
-                
.append(msgCodec.getTopicName()).append(AttrConstants.SEPARATOR)
-                .append(msgCodec.getStreamId()).append(AttrConstants.SEPARATOR)
-                
.append(msgCodec.getStrRemoteIP()).append(AttrConstants.SEPARATOR)
-                .append(source.getStrPort()).append(AttrConstants.SEPARATOR)
-                
.append(msgCodec.getMsgProcType()).append(AttrConstants.SEPARATOR)
-                .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime))
-                .append(AttrConstants.SEPARATOR).append(
-                        
DateTimeUtils.ms2yyyyMMddHHmm(msgCodec.getMsgRcvTime()));
+        String statsKey = 
strBuff.append(source.getProtocolName()).append(AttrConstants.SEP_HASHTAG)
+                
.append(msgCodec.getGroupId()).append(AttrConstants.SEP_HASHTAG)
+                
.append(msgCodec.getStreamId()).append(AttrConstants.SEP_HASHTAG)
+                
.append(msgCodec.getStrRemoteIP()).append(AttrConstants.SEP_HASHTAG)
+                .append(source.getSrcHost()).append(AttrConstants.SEP_HASHTAG)
+                
.append(msgCodec.getMsgProcType()).append(AttrConstants.SEP_HASHTAG)
+                
.append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(AttrConstants.SEP_HASHTAG)
+                
.append(DateTimeUtils.ms2yyyyMMddHHmm(msgCodec.getMsgRcvTime())).toString();
+        strBuff.delete(0, strBuff.length());
         try {
             source.getChannelProcessor().processEvent(event);
-            source.fileMetricEventInc(StatConstants.EVENT_POST_SUCCESS);
-            source.fileMetricRecordAdd(strBuff.toString(),
-                    msgCodec.getMsgCount(), 1, msgCodec.getBodyLength(), 0);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_SUCCESS);
+            source.fileMetricAddSuccCnt(statsKey, msgCodec.getMsgCount(), 1, 
msgCodec.getBodyLength());
             source.addMetric(true, event.getBody().length, event);
-            strBuff.delete(0, strBuff.length());
+            if (msgCodec.isNeedResp() && !msgCodec.isOrderOrProxy()) {
+                msgCodec.setSuccessInfo();
+                responseV0Msg(channel, msgCodec, strBuff);
+            }
         } catch (Throwable ex) {
-            logger.error("Error writting to channel, data will discard.", ex);
-            source.fileMetricEventInc(StatConstants.EVENT_POST_DROPPED);
-            source.fileMetricRecordAdd(strBuff.toString(), 0, 0, 0, 
msgCodec.getMsgCount());
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
+            source.fileMetricAddFailCnt(statsKey, msgCodec.getMsgCount());
             source.addMetric(false, event.getBody().length, event);
-            strBuff.delete(0, strBuff.length());
-            throw new ChannelException("ProcessEvent error can't write event 
to channel.");
+            if (msgCodec.isNeedResp() && !msgCodec.isOrderOrProxy()) {
+                
msgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
+                        strBuff.append("Put event to channel failure: 
").append(ex.getMessage()).toString());
+                strBuff.delete(0, strBuff.length());
+                responseV0Msg(channel, msgCodec, strBuff);
+            }
+            if (logCounter.shouldPrint()) {
+                logger.error("Error writing event to channel failure.", ex);
+            }
         }
     }
 
@@ -313,7 +319,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         // reject service
         if (source.isRejectService()) {
             source.addMetric(false, 0, null);
-            source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
+            source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
             this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT, 
packObject);
             return;
         }
@@ -380,7 +386,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             source.getChannelProcessor().processEvent(packEvent);
             events.forEach(event -> {
                 source.addMetric(true, event.getBody().length, event);
-                source.fileMetricEventInc(StatConstants.EVENT_POST_SUCCESS);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
             });
             boolean awaitResult = callback.getLatch().await(
                     
CommonConfigHolder.getInstance().getMaxResAfterSaveTimeout(), 
TimeUnit.MILLISECONDS);
@@ -393,7 +399,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             logger.error("Process Controller Event error can't write event to 
channel.", ex);
             events.forEach(event -> {
                 source.addMetric(false, event.getBody().length, event);
-                source.fileMetricEventInc(StatConstants.EVENT_POST_DROPPED);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_DROPPED);
             });
             if (!callback.getHasResponsed().getAndSet(true)) {
                 this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT, 
packObject);
@@ -418,7 +424,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
                 if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                     topic = source.getDefTopic();
                 } else {
-                    
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
                     source.addMetric(false, event.getBody().length, event);
                     this.responsePackage(ctx, 
ProxySdk.ResultCode.ERR_ID_ERROR, packObject);
                     return;
@@ -429,12 +435,12 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             try {
                 source.getChannelProcessor().processEvent(event);
                 source.addMetric(true, event.getBody().length, event);
-                source.fileMetricEventInc(StatConstants.EVENT_POST_SUCCESS);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
             } catch (Throwable ex) {
                 logger.error("Process Controller Event error can't write event 
to channel.", ex);
                 source.addMetric(false, event.getBody().length, event);
                 this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT, 
packObject);
-                source.fileMetricEventInc(StatConstants.EVENT_POST_DROPPED);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_DROPPED);
                 return;
             }
         }
@@ -447,7 +453,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
     private void responseV0Msg(Channel channel, AbsV0MsgCodec msgObj, 
StringBuilder strBuff) throws Exception {
         // check channel status
         if (channel == null || !channel.isWritable()) {
-            
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
             if (logCounter.shouldPrint()) {
                 logger.warn("Prepare send msg but channel full, msgType={}, 
attr={}, channel={}",
                         msgObj.getMsgType(), msgObj.getAttr(), channel);
@@ -491,7 +497,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             ByteBuf cb, int totalDataLen) throws Exception {
         // Check if the message is complete and legal
         if (totalDataLen < BIN_HB_FIXED_CONTENT_SIZE) {
-            source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_TOTALLEN_BELOWMIN);
             String errMsg = String.format("Malformed msg, totalDataLen(%d) < 
min hb-msg length(%d)",
                     totalDataLen, BIN_HB_FIXED_CONTENT_SIZE);
             if (logger.isDebugEnabled()) {
@@ -505,12 +511,20 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         int attrLen = cb.getShort(msgHeadPos + BIN_HB_BODY_OFFSET + bodyLen);
         int msgMagic = cb.getUnsignedShort(msgHeadPos
                 + BIN_HB_BODY_OFFSET + bodyLen + BIN_HB_ATTRLEN_SIZE + 
attrLen);
-        if ((totalDataLen + BIN_HB_TOTALLEN_SIZE < (bodyLen + attrLen + 
BIN_HB_FORMAT_SIZE))
-                || (msgMagic != BIN_MSG_MAGIC)) {
-            source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+        if (msgMagic != BIN_MSG_MAGIC) {
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_MAGIC_UNEQUAL);
+            String errMsg = String.format(
+                    "Malformed msg, msgMagic(%d) != %d", msgMagic, 
BIN_MSG_MAGIC);
+            if (logger.isDebugEnabled()) {
+                logger.debug(errMsg + ", channel {}", channel);
+            }
+            throw new Exception(errMsg);
+        }
+        if (totalDataLen + BIN_HB_TOTALLEN_SIZE < (bodyLen + attrLen + 
BIN_HB_FORMAT_SIZE)) {
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_LEN_MALFORMED);
             String errMsg = String.format(
-                    "Malformed msg, bodyLen(%d) + attrLen(%d) > 
totalDataLen(%d) or msgMagic(%d) != %d",
-                    bodyLen, attrLen, totalDataLen, msgMagic, BIN_MSG_MAGIC);
+                    "Malformed msg, bodyLen(%d) + attrLen(%d) > 
totalDataLen(%d)",
+                    bodyLen, attrLen, totalDataLen);
             if (logger.isDebugEnabled()) {
                 logger.debug(errMsg + ", channel {}", channel);
             }
@@ -643,7 +657,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         if (channel == null || !channel.isWritable()) {
             // release allocated ByteBuf
             binBuffer.release();
-            
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
             if (logCounter.shouldPrint()) {
                 logger.warn("Send msg but channel full, attr={}, channel={}", 
orgAttr, channel);
             }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
index 74cc4435c0..7bae8c2852 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
@@ -76,6 +76,11 @@ public class SimpleHttpSource extends BaseSource implements 
Configurable {
                 new DefaultThreadFactory(this.getName() + "-worker-group"));
         // init boostrap
         bootstrap = new ServerBootstrap();
+        if (conLinger >= 0) {
+            bootstrap.option(ChannelOption.SO_LINGER, conLinger);
+        }
+        bootstrap.option(ChannelOption.SO_BACKLOG, conBacklog);
+        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
         bootstrap.childOption(ChannelOption.ALLOCATOR, 
ByteBufAllocator.DEFAULT);
         bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
         bootstrap.childOption(ChannelOption.SO_RCVBUF, maxRcvBufferSize);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
index acda3cf4f7..ef298ce805 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
@@ -83,6 +83,11 @@ public class SimpleTcpSource extends BaseSource implements 
Configurable {
                 new DefaultThreadFactory(this.getName() + "-worker-group"));
         // init boostrap
         bootstrap = new ServerBootstrap();
+        if (conLinger >= 0) {
+            bootstrap.option(ChannelOption.SO_LINGER, conLinger);
+        }
+        bootstrap.option(ChannelOption.SO_BACKLOG, conBacklog);
+        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
         bootstrap.childOption(ChannelOption.ALLOCATOR, 
ByteBufAllocator.DEFAULT);
         bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
         bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
index f3ea695c56..7010c4d933 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
@@ -53,6 +53,11 @@ public class SimpleUdpSource extends BaseSource implements 
Configurable {
         logger.info("start " + this.getName());
         bootstrap = new Bootstrap();
         bootstrap.channel(NioDatagramChannel.class);
+        if (conLinger >= 0) {
+            bootstrap.option(ChannelOption.SO_LINGER, conLinger);
+        }
+        bootstrap.option(ChannelOption.SO_BACKLOG, conBacklog);
+        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
         bootstrap.option(ChannelOption.SO_RCVBUF, maxRcvBufferSize);
         bootstrap.option(ChannelOption.SO_SNDBUF, maxSendBufferSize);
         bootstrap.handler(this.getChannelInitializerFactory());
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 26f1fad6cb..8cb0e018ce 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -73,6 +73,16 @@ public class SourceConstants {
     public static final String SRCCXT_SEND_BUFFER_SIZE = "sendBufferSize";
     public static final int VAL_DEF_SEND_BUFFER_SIZE = 64 * 1024;
     public static final int VAL_MIN_SEND_BUFFER_SIZE = 0;
+    // connect backlog
+    public static final String SRCCXT_CONN_BACKLOG = "con-backlog";
+    public static final int VAL_DEF_CONN_BACKLOG = 128;
+    public static final int VAL_MIN_CONN_BACKLOG = 0;
+    // connect linger
+    public static final String SRCCXT_CONN_LINGER = "con-linger";
+    public static final int VAL_MIN_CONN_LINGER = 0;
+    // connect reuse address
+    public static final String SRCCXT_REUSE_ADDRESS = "reuse-address";
+    public static final boolean VAL_DEF_REUSE_ADDRESS = true;
     // tcp parameter no delay
     public static final String SRCCXT_TCP_NO_DELAY = "tcpNoDelay";
     public static final boolean VAL_DEF_TCP_NO_DELAY = true;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index 4f636f8173..fd016f4857 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -94,25 +94,25 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         final String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
         // check request decode result
         if (!req.decoderResult().isSuccess()) {
-            source.fileMetricEventInc(StatConstants.EVENT_MSG_DECODE_FAIL);
+            source.fileMetricIncSumStats(StatConstants.EVENT_MSG_DECODE_FAIL);
             sendErrorMsg(ctx, DataProxyErrCode.HTTP_DECODE_REQ_FAILURE);
             return;
         }
         // check service status.
         if (source.isRejectService()) {
-            source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
+            source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
             sendErrorMsg(ctx, DataProxyErrCode.SERVICE_CLOSED);
             return;
         }
         // check sink service status
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_SINK_UNREADY);
             sendErrorMsg(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY);
             return;
         }
         // check request method
         if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST) 
{
-            source.fileMetricEventInc(StatConstants.EVENT_MSG_METHOD_INVALID);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_METHOD_INVALID);
             sendErrorMsg(ctx, DataProxyErrCode.HTTP_UNSUPPORTED_METHOD,
                     "Only support [" + HttpMethod.GET.name() + ", "
                             + HttpMethod.POST.name() + "] methods");
@@ -125,7 +125,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         if (!HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())
                 && 
!HttpAttrConst.KEY_SRV_URL_REPORT_MSG.equals(uriDecoder.path())) {
             if (!HttpAttrConst.KEY_URL_FAVICON_ICON.equals(uriDecoder.path())) 
{
-                
source.fileMetricEventInc(StatConstants.EVENT_MSG_PATH_INVALID);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_PATH_INVALID);
                 sendErrorMsg(ctx, 
DataProxyErrCode.HTTP_UNSUPPORTED_SERVICE_URI,
                         "Only support [" + HttpAttrConst.KEY_SRV_URL_HEARTBEAT 
+ ", "
                                 + HttpAttrConst.KEY_SRV_URL_REPORT_MSG + "] 
paths!");
@@ -136,7 +136,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         boolean closeConnection = isCloseConnection(req);
         // process hb service
         if (HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())) {
-            source.fileMetricEventInc(StatConstants.EVENT_MSG_HB_SUCCESS);
+            source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_SUCCESS);
             sendResponse(ctx, closeConnection);
             return;
         }
@@ -152,7 +152,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
                     cntType = cntType.trim();
                     if (!cntType.equalsIgnoreCase(
                             
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString())) {
-                        
source.fileMetricEventInc(StatConstants.EVENT_MSG_CONTYPE_INVALID);
+                        
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_CONTYPE_INVALID);
                         sendErrorMsg(ctx, 
DataProxyErrCode.HTTP_UNSUPPORTED_CONTENT_TYPE,
                                 "Only support [" + 
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED
                                         + "] content type!");
@@ -175,13 +175,13 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
+        source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
         // check illegal ip
         if (ConfigManager.getInstance().needChkIllegalIP()) {
             String strRemoteIp = 
AddressUtils.getChannelRemoteIP(ctx.channel());
             if (strRemoteIp != null
                     && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
-                source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_ILLEGAL);
                 ctx.channel().disconnect();
                 ctx.channel().close();
                 if (logCounter.shouldPrint()) {
@@ -192,7 +192,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         }
         // check max allowed connection count
         if (source.getAllChannels().size() >= source.getMaxConnections()) {
-            source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+            source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_OVERMAX);
             ctx.channel().disconnect();
             ctx.channel().close();
             if (logCounter.shouldPrint()) {
@@ -208,18 +208,18 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) {
-        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
+        source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKOUT);
         ctx.fireChannelInactive();
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
+        source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
+        if (logCounter.shouldPrint()) {
+            logger.warn("{} received an exception from channel {}",
+                    source.getName(), ctx.channel(), cause);
+        }
         if (cause instanceof IOException) {
-            if (logCounter.shouldPrint()) {
-                logger.warn("{} received an IOException from channel {}",
-                        source.getName(), ctx.channel(), cause);
-            }
             ctx.close();
         } else {
             sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
@@ -250,7 +250,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         StringBuilder strBuff = new StringBuilder(512);
         String groupId = reqAttrs.get(HttpAttrConst.KEY_GROUP_ID);
         if (StringUtils.isBlank(groupId)) {
-            source.fileMetricEventInc(StatConstants.EVENT_MSG_GROUPID_MISSING);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
             sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
                     strBuff.append("Field ").append(HttpAttrConst.KEY_GROUP_ID)
                             .append(" must exist and not blank!").toString(),
@@ -260,7 +260,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         // get and check streamId
         String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
         if (StringUtils.isBlank(streamId)) {
-            
source.fileMetricEventInc(StatConstants.EVENT_MSG_STREAMID_MISSING);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_STREAMID_MISSING);
             sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
                     strBuff.append("Field 
").append(HttpAttrConst.KEY_STREAM_ID)
                             .append(" must exist and not blank!").toString(),
@@ -270,9 +270,9 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         // get and check topicName
         String topicName = ConfigManager.getInstance().getTopicName(groupId, 
streamId);
         if (StringUtils.isBlank(topicName)) {
-            
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
             sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
-                    strBuff.append("Topic not configured for 
").append(HttpAttrConst.KEY_STREAM_ID)
+                    strBuff.append("Topic not configured for 
").append(HttpAttrConst.KEY_GROUP_ID)
                             .append("(").append(groupId).append("),")
                             .append(HttpAttrConst.KEY_STREAM_ID)
                             
.append("(,").append(streamId).append(")").toString(),
@@ -293,13 +293,13 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
         if (StringUtils.isBlank(body)) {
             if (body == null) {
-                
source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_MISSING);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_MISSING);
                 sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
                         strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                 .append(" is not exist!").toString(),
                         isCloseCon);
             } else {
-                source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_BLANK);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_BLANK);
                 sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
                         strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                 .append(" is Blank!").toString(),
@@ -308,7 +308,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
             return false;
         }
         if (body.length() > source.getMaxMsgLength()) {
-            source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_OVERMAX);
+            source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_OVERMAX);
             sendResponse(ctx, 
DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
                     strBuff.append("Error msg, the 
").append(HttpAttrConst.KEY_BODY)
                             .append(" length(").append(body.length())
@@ -348,28 +348,27 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         // build metric data item
         dataTime = dataTime / 1000 / 60 / 10;
         dataTime = dataTime * 1000 * 60 * 10;
-        strBuff.append(source.getProtocolName())
-                .append(AttrConstants.SEP_HASHTAG).append(topicName)
-                .append(AttrConstants.SEP_HASHTAG).append(streamId)
-                .append(AttrConstants.SEP_HASHTAG).append(clientIp)
-                .append(AttrConstants.SEP_HASHTAG).append(source.getSrcHost())
-                .append(AttrConstants.SEP_HASHTAG).append("b2b")
-                
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(dataTime))
-                
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
+        String statsKey = 
strBuff.append(source.getProtocolName()).append(AttrConstants.SEP_HASHTAG)
+                .append(groupId).append(AttrConstants.SEP_HASHTAG)
+                .append(streamId).append(AttrConstants.SEP_HASHTAG)
+                .append(clientIp).append(AttrConstants.SEP_HASHTAG)
+                .append(source.getSrcHost()).append(AttrConstants.SEP_HASHTAG)
+                .append("b2b").append(AttrConstants.SEP_HASHTAG)
+                
.append(DateTimeUtils.ms2yyyyMMddHHmm(dataTime)).append(AttrConstants.SEP_HASHTAG)
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime)).toString();
+        strBuff.delete(0, strBuff.length());
         try {
             source.getChannelProcessor().processEvent(event);
-            source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_SUCCESS);
-            source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1, 
event.getBody().length, 0);
-            strBuff.delete(0, strBuff.length());
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_SUCCESS);
+            source.fileMetricAddSuccCnt(statsKey, intMsgCnt, 1, 
event.getBody().length);
             source.addMetric(true, event.getBody().length, event);
             sendResponse(ctx, isCloseCon);
             return true;
         } catch (Throwable ex) {
-            source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_FAILURE);
-            source.fileMetricRecordAdd(strBuff.toString(), 0, 0, 0, intMsgCnt);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
+            source.fileMetricAddFailCnt(statsKey, intMsgCnt);
             source.addMetric(false, event.getBody().length, event);
-            strBuff.delete(0, strBuff.length());
-            sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
+            sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
                     strBuff.append("Put event to channel failure: 
").append(ex.getMessage()).toString());
             if (logCounter.shouldPrint()) {
                 logger.error("Error writing HTTP event to channel failure.", 
ex);
@@ -422,7 +421,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
             return;
         }
         if (!ctx.channel().isWritable()) {
-            
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
             if (logCounter.shouldPrint()) {
                 logger.warn("Send msg but channel full, channel={}", 
ctx.channel());
             }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
index b99d28cddc..69fee83565 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
@@ -61,6 +61,7 @@ public abstract class AbsV0MsgCodec {
     protected String topicName;
     protected String msgSeqId = "";
     protected long uniq = -1L;
+    protected boolean isOrderOrProxy = false;
     protected String msgProcType = "b2b";
     protected boolean needResp = true;
 
@@ -90,6 +91,10 @@ public abstract class AbsV0MsgCodec {
         return this.needResp;
     }
 
+    public boolean isOrderOrProxy() {
+        return isOrderOrProxy;
+    }
+
     public byte getMsgType() {
         return this.msgType;
     }
@@ -142,6 +147,11 @@ public abstract class AbsV0MsgCodec {
         return msgRcvTime;
     }
 
+    public void setSuccessInfo() {
+        this.errCode = DataProxyErrCode.SUCCESS;
+        this.errMsg = "";
+    }
+
     public void setFailureInfo(DataProxyErrCode errCode) {
         setFailureInfo(errCode, "");
     }
@@ -168,7 +178,7 @@ public abstract class AbsV0MsgCodec {
             try {
                 this.attrMap.putAll(mapSplitter.split(this.origAttr));
             } catch (Exception e) {
-                source.fileMetricEventInc(StatConstants.EVENT_INVALIDATTR);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ATTR_INVALID);
                 this.errCode = DataProxyErrCode.SPLIT_ATTR_ERROR;
                 this.errMsg = String.format("Parse attr (%s) failure", 
this.origAttr);
                 return false;
@@ -178,6 +188,28 @@ public abstract class AbsV0MsgCodec {
         if 
("false".equalsIgnoreCase(attrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
             this.needResp = false;
         }
+        // get whether sync send
+        if 
("true".equalsIgnoreCase(attrMap.get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+            if (!this.needResp) {
+                this.needResp = true;
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ORDER_ACK_INVALID);
+                this.errCode = 
DataProxyErrCode.ATTR_ORDER_CONTROL_CONFLICT_ERROR;
+                return false;
+            }
+            this.isOrderOrProxy = true;
+            this.msgProcType = "order";
+        }
+        // get whether proxy send
+        if 
("true".equalsIgnoreCase(attrMap.get(AttributeConstants.MESSAGE_PROXY_SEND))) {
+            if (!this.needResp) {
+                this.needResp = true;
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_PROXY_ACK_INVALID);
+                this.errCode = 
DataProxyErrCode.ATTR_PROXY_CONTROL_CONFLICT_ERROR;
+                return false;
+            }
+            this.isOrderOrProxy = true;
+            this.msgProcType = "proxy";
+        }
         return true;
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index 027e33e74f..4fe96f57ec 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -26,12 +26,10 @@ import 
org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.source2.BaseSource;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 
@@ -63,7 +61,6 @@ public class CodecBinMsg extends AbsV0MsgCodec {
     private long dataTimeSec;
     private boolean num2name = false;
     private boolean transNum2Name = false;
-    private boolean isOrderOrProxy = false;
     private boolean indexMsg = false;
     private boolean fileCheckMsg = false;
     private boolean needTraceMsg = false;
@@ -91,28 +88,32 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                 + bodyLen + BIN_MSG_ATTRLEN_SIZE + attrLen);
         if (bodyLen <= 0) {
             if (bodyLen == 0) {
-                source.fileMetricEventInc(StatConstants.EVENT_NOBODY);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_ZERO);
                 this.errCode = DataProxyErrCode.BODY_LENGTH_ZERO;
             } else {
-                source.fileMetricEventInc(StatConstants.EVENT_NEGBODY);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_NEGATIVE);
                 this.errCode = DataProxyErrCode.BODY_LENGTH_LESS_ZERO;
             }
             return false;
         }
         // get attribute length
         if (attrLen < 0) {
-            source.fileMetricEventInc(StatConstants.EVENT_NEGATTR);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ATTR_NEGATIVE);
             this.errCode = DataProxyErrCode.ATTR_LENGTH_LESS_ZERO;
             return false;
         }
         // get msg magic
-        if ((msgMagic != BIN_MSG_MAGIC)
-                || (totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen 
+ BIN_MSG_FORMAT_SIZE))) {
-            source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
-            this.errCode = DataProxyErrCode.FIELD_VALUE_NOT_EQUAL;
-            this.errMsg = String.format(
-                    "fixedLen(%d) + bodyLen(%d) + attrLen(%d) > 
totalDataLen(%d) + 4 or msgMagic(%d) != %d",
-                    BIN_MSG_FORMAT_SIZE, bodyLen, attrLen, totalDataLen, 
msgMagic, BIN_MSG_MAGIC);
+        if (msgMagic != BIN_MSG_MAGIC) {
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MAGIC_UNEQUAL);
+            this.errCode = DataProxyErrCode.FIELD_MAGIC_NOT_EQUAL;
+            this.errMsg = String.format("magicInMsg(%d) != %d", msgMagic, 
BIN_MSG_MAGIC);
+            return false;
+        }
+        if (totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen + 
BIN_MSG_FORMAT_SIZE)) {
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BIN_LEN_MALFORMED);
+            this.errCode = DataProxyErrCode.FIELD_LENGTH_VALUE_NOT_EQUAL;
+            this.errMsg = String.format("fixedLen(%d) + bodyLen(%d) + 
attrLen(%d) > totalDataLen(%d) + 4",
+                    BIN_MSG_FORMAT_SIZE, bodyLen, attrLen, totalDataLen);
             return false;
         }
         // extract attr bytes
@@ -133,19 +134,13 @@ public class CodecBinMsg extends AbsV0MsgCodec {
         if (((extendField & 0x4) >> 2) == 0x0) {
             this.num2name = true;
         }
-        // parse required fields
-        Pair<Boolean, String> evenProcType =
-                
MessageUtils.getEventProcType(attrMap.get(AttributeConstants.MESSAGE_SYNC_SEND),
-                        attrMap.get(AttributeConstants.MESSAGE_PROXY_SEND));
-        this.isOrderOrProxy = evenProcType.getLeft();
-        this.msgProcType = evenProcType.getRight();
         return true;
     }
 
     public boolean validAndFillFields(BaseSource source, StringBuilder 
strBuff) {
         // reject unsupported index messages
         if (indexMsg) {
-            source.fileMetricEventInc(StatConstants.EVENT_UNSUPMSG);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_INDEXMSG_ILLEGAL);
             this.errCode = DataProxyErrCode.UNSUPPORTED_EXTEND_FIELD_VALUE;
             return false;
         }
@@ -257,7 +252,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
         this.streamId = this.attrMap.get(AttributeConstants.STREAM_ID);
         if (num2name) {
             if (this.groupIdNum == 0) {
-                source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPIDNUM_ZERO);
                 this.errCode = DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT;
                 this.errMsg = "groupIdNum is 0 in message";
                 return false;
@@ -268,18 +263,18 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
             if (StringUtils.isBlank(confGroupId)) {
                 if (configManager.isGroupIdNumConfigEmpty()) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
                     this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
                     this.errMsg = "GroupId-Mapping configuration is null";
                 } else {
-                    
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_GROUPIDNUM_MISSING);
                     this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
                     this.errMsg = String.format("Non-existing groupIdNum(%s) 
configuration", strGroupIdNum);
                 }
                 return false;
             }
             if (StringUtils.isNotBlank(this.groupId) && 
!this.groupId.equalsIgnoreCase(confGroupId)) {
-                
source.fileMetricEventInc(StatConstants.EVENT_INCONSGROUPORSTREAMID);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_GROUP_IDNUM_INCONSTANT);
                 this.errCode = DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
                 this.errMsg = String.format(
                         "Inconstant GroupId not equal, (%s) in attr but (%s) 
in configure by groupIdNum(%s)",
@@ -290,7 +285,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             // check streamId
             if (this.streamIdNum == 0) {
                 if (StringUtils.isNotBlank(this.streamId)) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_INCONSGROUPORSTREAMID);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_STREAMIDNUM_ZERO);
                     this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
                     this.errMsg = String.format("Inconstant streamId(%s) in 
attr but streamIdNum=0", this.streamId);
                     return false;
@@ -300,11 +295,11 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                 confStreamId = 
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
                 if (StringUtils.isBlank(confStreamId)) {
                     if (configManager.isStreamIdNumConfigEmpty()) {
-                        
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+                        
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
                         this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
                         this.errMsg = "StreamId-Mapping configuration is null";
                     } else {
-                        
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+                        
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_STREAMIDNUM_MISSING);
                         this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
                         this.errMsg = String.format("Non-existing 
GroupId(%s)-StreamId(%s) configuration",
                                 strGroupIdNum, strStreamIdNum);
@@ -312,7 +307,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                     return false;
                 }
                 if (StringUtils.isNotBlank(this.streamId) && 
!this.streamId.equalsIgnoreCase(confStreamId)) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_INCONSGROUPORSTREAMID);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_STREAM_IDNUM_INCONSTANT);
                     this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
                     this.errMsg = String.format(
                             "Inconstant StreamId, (%s) in attr but (%s) in 
configure by groupIdNum(%s), streamIdNum(%s)",
@@ -327,7 +322,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             }
         } else {
             if (StringUtils.isBlank(groupId)) {
-                source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
                 this.errCode = DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT;
                 return false;
             }
@@ -338,9 +333,9 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                 this.topicName = source.getDefTopic();
             } else {
-                
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
                 this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
-                this.errMsg = String.format("Topic is null for 
inlongGroupId=(%s), inlongStreamId=(%s)",
+                this.errMsg = String.format("Topic not configured for 
groupId=(%s), streamId=(%s)",
                         this.groupId, this.streamId);
                 return false;
             }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
index f02752ca10..50f7552a32 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
@@ -54,15 +54,16 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         int bodyLen = cb.getInt(msgHeadPos + TXT_MSG_BODYLEN_OFFSET);
         if (bodyLen <= 0) {
             if (bodyLen == 0) {
-                source.fileMetricEventInc(StatConstants.EVENT_NOBODY);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_ZERO);
                 this.errCode = DataProxyErrCode.BODY_LENGTH_ZERO;
             } else {
-                source.fileMetricEventInc(StatConstants.EVENT_NEGBODY);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_NEGATIVE);
                 this.errCode = DataProxyErrCode.BODY_LENGTH_LESS_ZERO;
             }
             return false;
         }
         if (bodyLen + TXT_MSG_FORMAT_SIZE > totalDataLen + 
TXT_MSG_TOTALLEN_SIZE) {
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TXT_LEN_MALFORMED);
             this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
             this.errMsg = String.format("Error msg, bodyLen(%d) + 
fixedLength(%d) > totalDataLen(%d) + 4",
                     bodyLen, TXT_MSG_FORMAT_SIZE, totalDataLen);
@@ -74,11 +75,13 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         // get attribute length
         int attrLen = cb.getInt(msgHeadPos + TXT_MSG_BODY_OFFSET + bodyLen);
         if (attrLen < 0) {
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ATTR_NEGATIVE);
             this.errCode = DataProxyErrCode.ATTR_LENGTH_LESS_ZERO;
             return false;
         }
         // check attribute length
         if (totalDataLen + TXT_MSG_TOTALLEN_SIZE != TXT_MSG_FORMAT_SIZE + 
bodyLen + attrLen) {
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TXT_LEN_MALFORMED);
             this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
             this.errMsg = String.format(
                     "Error msg, totalDataLen(%d) + 4 != fixedLength(%d) + 
bodyLen(%d) + attrLen(%d)",
@@ -97,14 +100,14 @@ public class CodecTextMsg extends AbsV0MsgCodec {
                 unCompressedData = new byte[uncompressedLen];
                 Snappy.uncompress(bodyData, 0, bodyData.length, 
unCompressedData, 0);
             } catch (IOException e) {
-                source.fileMetricEventInc(StatConstants.EVENT_UNPRESSEXP);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_UNPRESS_EXP);
                 this.errCode = DataProxyErrCode.UNCOMPRESS_DATA_ERROR;
                 this.errMsg = String.format("Error to uncompress msg, compress 
type(%s), attr: (%s), error: (%s)",
                         attrMap.get(AttributeConstants.COMPRESS_TYPE), 
origAttr, e.getCause());
                 return false;
             }
             if (unCompressedData.length == 0) {
-                source.fileMetricEventInc(StatConstants.EVENT_UNPRESSEXP);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_UNPRESS_EXP);
                 this.errCode = DataProxyErrCode.UNCOMPRESS_DATA_ERROR;
                 this.errMsg = String.format("Error to uncompress msg, compress 
type(%s), attr: (%s), error: 2",
                         attrMap.get(AttributeConstants.COMPRESS_TYPE), 
origAttr);
@@ -120,7 +123,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
             while (bodyBuffer.remaining() > 0) {
                 singleMsgLen = bodyBuffer.getInt(readPos);
                 if (singleMsgLen <= 0 || singleMsgLen > 
bodyBuffer.remaining()) {
-                    source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+                    
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ITEM_LEN_MALFORMED);
                     this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
                     this.errMsg = String.format(
                             "Malformed data len, singleMsgLen(%d), buffer 
remaining(%d), attr: (%s)",
@@ -138,7 +141,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         String tmpGroupId = attrMap.get(AttributeConstants.GROUP_ID);
         String tmpStreamId = attrMap.get(AttributeConstants.STREAM_ID);
         if (StringUtils.isBlank(tmpGroupId)) {
-            source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
             this.errCode = DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT;
             return false;
         }
@@ -148,10 +151,10 @@ public class CodecTextMsg extends AbsV0MsgCodec {
             if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                 tmpTopicName = source.getDefTopic();
             } else {
-                
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+                
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
                 this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
                 this.errMsg = String.format(
-                        "Topic is null for inlongGroupId=(%s), 
inlongStreamId=(%s)", tmpGroupId, tmpStreamId);
+                        "Topic not configured for groupId=(%s), 
streamId=(%s)", tmpGroupId, tmpStreamId);
                 return false;
             }
         }

Reply via email to