This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c0b1d148 [INLONG-8267][DataProxy] Add the control of whether to 
retry and the count of retries for the failure message (#8271)
73c0b1d148 is described below

commit 73c0b1d1488ffb9520c2661a867b796bf5d6c444
Author: Goson Zhang <[email protected]>
AuthorDate: Sun Jun 18 19:10:11 2023 +0800

    [INLONG-8267][DataProxy] Add the control of whether to retry and the count 
of retries for the failure message (#8271)
---
 .../dataproxy/config/CommonConfigHolder.java       | 58 +++++++++++++++------
 .../inlong/dataproxy/consts/StatConstants.java     |  1 +
 .../inlong/dataproxy/sink/common/TubeUtils.java    |  3 +-
 .../inlong/dataproxy/sink/mq/BatchPackManager.java | 21 ++++----
 .../inlong/dataproxy/sink/mq/BatchPackProfile.java | 23 +++++----
 .../sink/mq/MessageQueueZoneSinkContext.java       | 43 ++++++++++++++++
 .../inlong/dataproxy/sink/mq/PackProfile.java      | 32 +++++++-----
 .../dataproxy/sink/mq/SimplePackProfile.java       | 18 ++++---
 .../dataproxy/sink/mq/kafka/KafkaHandler.java      | 25 +++++----
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java    | 23 +++++----
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 59 +++++++++++++++-------
 .../dataproxy/source2/InLongMessageHandler.java    |  2 +-
 .../source2/httpMsg/InLongHttpMsgHandler.java      |  4 +-
 .../dataproxy/source2/v0msg/CodecBinMsg.java       |  4 +-
 .../config/holder/TestCommonConfigHolder.java      |  3 ++
 .../src/test/resources/common.properties           |  4 ++
 16 files changed, 220 insertions(+), 103 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index d6e246978a..be8abdc80b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -119,7 +119,12 @@ public class CommonConfigHolder {
     public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
     public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = 
"auditFormatInterval";
     public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
-    // Whether response after save msg
+    // whether to retry after the message send failure
+    public static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILURE = 
"send.retry.after.failure";
+    public static final boolean VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE = true;
+    // max retry count
+    public static final String KEY_MAX_RETRIES_AFTER_FAILURE = 
"max.retries.after.failure";
+    public static final int VAL_DEF_MAX_RETRIES_AFTER_FAILURE = -1;
     public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
     public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false;
     // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
@@ -182,6 +187,8 @@ public class CommonConfigHolder {
     private String fileMetricSourceOutName = 
VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME;
     private String fileMetricSinkOutName = 
VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME;
     private String fileMetricEventOutName = 
VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME;
+    private boolean enableSendRetryAfterFailure = 
VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE;
+    private int maxRetriesAfterFailure = VAL_DEF_MAX_RETRIES_AFTER_FAILURE;
 
     /**
      * get instance for common.properties config manager
@@ -362,6 +369,14 @@ public class CommonConfigHolder {
         return fileMetricEventOutName;
     }
 
+    public boolean isEnableSendRetryAfterFailure() {
+        return enableSendRetryAfterFailure;
+    }
+
+    public int getMaxRetriesAfterFailure() {
+        return maxRetriesAfterFailure;
+    }
+
     private void preReadFields() {
         String tmpValue;
         // read cluster tag
@@ -403,22 +418,20 @@ public class CommonConfigHolder {
         tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
         if (StringUtils.isNotEmpty(tmpValue)) {
             this.enableUnConfigTopicAccept = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
-            if (enableUnConfigTopicAccept) {
-                // read default topics
-                tmpValue = 
this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
-                if (StringUtils.isNotBlank(tmpValue)) {
-                    List<String> tmpList = new ArrayList<>();
-                    String[] topicItems = tmpValue.split("\\s+");
-                    for (String item : topicItems) {
-                        if (StringUtils.isBlank(item)) {
-                            continue;
-                        }
-                        tmpList.add(item.trim());
-                    }
-                    if (tmpList.size() > 0) {
-                        defaultTopics = tmpList;
-                    }
+        }
+        // read default topics
+        tmpValue = this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            List<String> tmpList = new ArrayList<>();
+            String[] topicItems = tmpValue.split("\\s+");
+            for (String item : topicItems) {
+                if (StringUtils.isBlank(item)) {
+                    continue;
                 }
+                tmpList.add(item.trim());
+            }
+            if (tmpList.size() > 0) {
+                defaultTopics = tmpList;
             }
         }
         // read enable whitelist
@@ -548,6 +561,19 @@ public class CommonConfigHolder {
         if (StringUtils.isNotEmpty(tmpValue)) {
             this.prometheusHttpPort = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_PROMETHEUS_HTTP_PORT);
         }
+        // read whether retry send message after sent failure
+        tmpValue = this.props.get(KEY_ENABLE_SEND_RETRY_AFTER_FAILURE);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.enableSendRetryAfterFailure = 
"TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read max retry count
+        tmpValue = this.props.get(KEY_MAX_RETRIES_AFTER_FAILURE);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            int retries = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_MAX_RETRIES_AFTER_FAILURE);
+            if (retries >= 0) {
+                this.maxRetriesAfterFailure = retries;
+            }
+        }
         // initial ip parser
         try {
             Class<? extends IManagerIpListParser> ipListParserClass =
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 973f01451f..45126cc55b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -87,6 +87,7 @@ public class StatConstants {
     public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry";
     public static final java.lang.String EVENT_SINK_FAILDROPPED = 
"sink.dropped";
     public static final java.lang.String EVENT_SINK_SUCCESS = "sink.success";
+    public static final java.lang.String EVENT_SINK_FAILURE = "sink.failure";
     public static final java.lang.String EVENT_SINK_RECEIVEEXCEPT = 
"sink.rcvexcept";
 
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index 5542e8026f..ca4bcceed4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -67,9 +67,8 @@ public class TubeUtils {
             message.putSystemHeader(headers.get(Constants.INLONG_STREAM_ID),
                     DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
         } else {
-            long dataTimeL = 
Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
             message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID),
-                    DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+                    headers.get(ConfigConstants.PKG_TIME_KEY));
         }
         Map<String, String> extraAttrMap = MessageUtils.getXfsAttrs(headers, 
pkgVersion);
         for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index 84a1f5c976..9daebf5c90 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -54,18 +54,18 @@ public class BatchPackManager {
     private final long dispatchTimeout;
     private final long maxPackCount;
     private final long maxPackSize;
-    private BufferQueue<PackProfile> dispatchQueue;
-    private ConcurrentHashMap<String, PackProfile> profileCache = new 
ConcurrentHashMap<>();
+    private final BufferQueue<PackProfile> dispatchQueue;
+    private final ConcurrentHashMap<String, PackProfile> profileCache = new 
ConcurrentHashMap<>();
     // flag that manager need to output overtime data.
-    private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
-    private AtomicLong inCounter = new AtomicLong(0);
-    private AtomicLong outCounter = new AtomicLong(0);
+    private final AtomicBoolean needOutputOvertimeData = new 
AtomicBoolean(false);
+    private final AtomicLong inCounter = new AtomicLong(0);
+    private final AtomicLong outCounter = new AtomicLong(0);
 
     /**
      * Constructor
      * 
-     * @param context
-     * @param dispatchQueue
+     * @param context the process context
+     * @param dispatchQueue  the batch queue
      */
     public BatchPackManager(Context context, BufferQueue<PackProfile> 
dispatchQueue) {
         this.dispatchQueue = dispatchQueue;
@@ -76,7 +76,7 @@ public class BatchPackManager {
 
     /**
      * addEvent
-     * @param event
+     * @param event the event to add
      */
     public void addEvent(ProxyEvent event) {
         // parse
@@ -106,7 +106,7 @@ public class BatchPackManager {
 
     /**
      * addPackEvent
-     * @param packEvent
+     * @param packEvent  the event need to add
      */
     public void addPackEvent(ProxyPackEvent packEvent) {
         String eventUid = packEvent.getUid();
@@ -142,7 +142,7 @@ public class BatchPackManager {
 
     /**
      * addSimpleEvent
-     * @param event
+     * @param event  the event need to add
      */
     public void addSimpleEvent(SimpleEvent event) {
         Map<String, String> headers = event.getHeaders();
@@ -162,7 +162,6 @@ public class BatchPackManager {
     /**
      * outputOvertimeData
      * 
-     * @return
      */
     public void outputOvertimeData() {
         if (!needOutputOvertimeData.getAndSet(false)) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
index 479b26ee88..99cc05e9e5 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
@@ -37,10 +37,10 @@ public class BatchPackProfile extends PackProfile {
     /**
      * Constructor
      *
-     * @param uid
-     * @param inlongGroupId
-     * @param inlongStreamId
-     * @param dispatchTime
+     * @param uid   the inlong id
+     * @param inlongGroupId   the group id
+     * @param inlongStreamId  the stream id
+     * @param dispatchTime    the dispatch time
      */
     public BatchPackProfile(String uid, String inlongGroupId, String 
inlongStreamId, long dispatchTime) {
         super(uid, inlongGroupId, inlongStreamId, dispatchTime);
@@ -49,10 +49,10 @@ public class BatchPackProfile extends PackProfile {
     /**
      * addEvent
      * 
-     * @param  event
-     * @param  maxPackCount
-     * @param  maxPackSize
-     * @return
+     * @param  event   the event to added
+     * @param  maxPackCount   the max package count to cached
+     * @param  maxPackSize    the max package size to cached
+     * @return  whether added the event
      */
     public boolean addEvent(Event event, long maxPackCount, long maxPackSize) {
         long eventLength = event.getBody().length;
@@ -94,7 +94,6 @@ public class BatchPackProfile extends PackProfile {
 
     /**
      * fail
-     * @return
      */
     public void fail(DataProxyErrCode errCode, String errMsg) {
         if (callback != null) {
@@ -104,10 +103,12 @@ public class BatchPackProfile extends PackProfile {
 
     /**
      * isResend
-     * @return
+     * @return  whether resend message
      */
     public boolean isResend() {
-        return callback == null;
+        return callback == null
+                && enableRetryAfterFailure
+                && (maxRetries < 0 || ++retries <= maxRetries);
     }
 
     /**
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index c395af814d..15f0879ad0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -18,7 +18,10 @@
 package org.apache.inlong.dataproxy.sink.mq;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.apache.inlong.dataproxy.consts.AttrConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -28,6 +31,7 @@ import 
org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
 
 import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.conf.Configurable;
@@ -272,4 +276,43 @@ public class MessageQueueZoneSinkContext extends 
SinkContext {
         }
         return null;
     }
+
+    public void fileMetricAddSuccCnt(PackProfile packProfile, String topic, 
String remoteId) {
+        if (!CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            return;
+        }
+        if (packProfile instanceof SimplePackProfile) {
+            SimplePackProfile simpleProfile = (SimplePackProfile) packProfile;
+            StringBuilder statsKey = new StringBuilder(512)
+                    .append(sinkName).append(AttrConstants.SEPARATOR)
+                    
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEPARATOR)
+                    
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEPARATOR)
+                    .append(topic).append(AttrConstants.SEPARATOR)
+                    
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR)
+                    .append(remoteId).append(AttrConstants.SEPARATOR)
+                    
.append(simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
+            monitorIndex.addSuccStats(statsKey.toString(), NumberUtils.toInt(
+                    
simpleProfile.getProperties().get(ConfigConstants.MSG_COUNTER_KEY), 1),
+                    1, simpleProfile.getSize());
+        }
+    }
+
+    public void fileMetricAddFailCnt(PackProfile packProfile, String topic, 
String remoteId) {
+        if (!CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            return;
+        }
+
+        if (packProfile instanceof SimplePackProfile) {
+            SimplePackProfile simpleProfile = (SimplePackProfile) packProfile;
+            StringBuilder statsKey = new StringBuilder(512)
+                    .append(sinkName).append(AttrConstants.SEPARATOR)
+                    
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEPARATOR)
+                    
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEPARATOR)
+                    .append(topic).append(AttrConstants.SEPARATOR)
+                    
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR)
+                    .append(remoteId).append(AttrConstants.SEPARATOR)
+                    
.append(simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
+            monitorIndex.addFailStats(statsKey.toString(), 1);
+        }
+    }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
index 9541091347..1867e6a8ce 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.sink.mq;
 
 import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 
 import org.apache.flume.Event;
 
@@ -34,20 +35,24 @@ public abstract class PackProfile {
     private final String uid;
     protected long count = 0;
     protected long size = 0;
-
+    protected final boolean enableRetryAfterFailure;
+    protected final int maxRetries;
+    protected int retries = 0;
     /**
      * Constructor
      *
-     * @param uid
-     * @param inlongGroupId
-     * @param inlongStreamId
-     * @param dispatchTime
+     * @param uid  the inlong id
+     * @param inlongGroupId   the group id
+     * @param inlongStreamId  the stream id
+     * @param dispatchTime the dispatch time
      */
     public PackProfile(String uid, String inlongGroupId, String 
inlongStreamId, long dispatchTime) {
         this.uid = uid;
         this.inlongGroupId = inlongGroupId;
         this.inlongStreamId = inlongStreamId;
         this.dispatchTime = dispatchTime;
+        this.enableRetryAfterFailure = 
CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure();
+        this.maxRetries = 
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure();
     }
 
     /**
@@ -80,7 +85,7 @@ public abstract class PackProfile {
     /**
      * getDispatchTime
      * 
-     * @return
+     * @return the dispatch time
      */
     public long getDispatchTime() {
         return dispatchTime;
@@ -125,8 +130,8 @@ public abstract class PackProfile {
     /**
      * isTimeout
      *
-     * @param  createThreshold
-     * @return
+     * @param  createThreshold  the creation threshold
+     * @return whether time out
      */
     public boolean isTimeout(long createThreshold) {
         return createThreshold >= createTime;
@@ -139,23 +144,22 @@ public abstract class PackProfile {
 
     /**
      * fail
-     * @return
      */
     public abstract void fail(DataProxyErrCode errCode, String errMsg);
 
     /**
      * isResend
-     * @return
+     * @return whether resend message
      */
     public abstract boolean isResend();
 
     /**
      * addEvent
      *
-     * @param  event
-     * @param  maxPackCount
-     * @param  maxPackSize
-     * @return
+     * @param  event   the event need to add
+     * @param  maxPackCount   the max package count
+     * @param  maxPackSize    the max package size
+     * @return whether add success
      */
     public abstract boolean addEvent(Event event, long maxPackCount, long 
maxPackSize);
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index e701509822..99d6ac63c9 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -53,10 +53,10 @@ public class SimplePackProfile extends PackProfile {
 
     /**
      * Constructor
-     * @param uid
-     * @param inlongGroupId
-     * @param inlongStreamId
-     * @param dispatchTime
+     * @param uid   the inlong id
+     * @param inlongGroupId   the group id
+     * @param inlongStreamId  the stream id
+     * @param dispatchTime    the received time
      */
     public SimplePackProfile(String uid, String inlongGroupId, String 
inlongStreamId, long dispatchTime) {
         super(uid, inlongGroupId, inlongStreamId, dispatchTime);
@@ -80,7 +80,9 @@ public class SimplePackProfile extends PackProfile {
 
     @Override
     public boolean isResend() {
-        return !needRspEvent;
+        return !needRspEvent
+                && enableRetryAfterFailure
+                && (maxRetries < 0 || ++retries <= maxRetries);
     }
 
     @Override
@@ -101,9 +103,9 @@ public class SimplePackProfile extends PackProfile {
     }
 
     /**
-     * create
-     * @param event
-     * @return
+     * create simple pack profile
+     * @param event  the event to process
+     * @return  the package profile
      */
     public static SimplePackProfile create(Event event) {
         Map<String, String> headers = event.getHeaders();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index 86e0fddefc..40a2b4aa7a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -62,12 +62,12 @@ public class KafkaHandler implements MessageQueueHandler {
 
     // kafka producer
     private KafkaProducer<String, byte[]> producer;
-    private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
+    private final ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
 
     /**
      * init
-     * @param config
-     * @param sinkContext
+     * @param config   the kafka cluster configure
+     * @param sinkContext   the sink context
      */
     @Override
     public void init(CacheClusterConfig config, MessageQueueZoneSinkContext 
sinkContext) {
@@ -112,14 +112,14 @@ public class KafkaHandler implements MessageQueueHandler {
     }
 
     /**
-     * send
-     * @param profile
-     * @return
+     * send  message to mq
+     * @param profile the profile need to send
+     * @return whether sent the profile
      */
     @Override
     public boolean send(PackProfile profile) {
+        String topic = null;
         try {
-            String topic;
             // get idConfig
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
@@ -158,7 +158,8 @@ public class KafkaHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception ex) {
-            
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
+            
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
+                    profile.getUid() + "." + topic);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
             if (logCounter.shouldPrint()) {
@@ -201,7 +202,7 @@ public class KafkaHandler implements MessageQueueHandler {
             @Override
             public void onCompletion(RecordMetadata arg0, Exception ex) {
                 if (ex != null) {
-                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
                     sinkContext.processSendFail(batchProfile, clusterName, 
topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                     if (logCounter.shouldPrint()) {
@@ -245,13 +246,17 @@ public class KafkaHandler implements MessageQueueHandler {
             @Override
             public void onCompletion(RecordMetadata arg0, Exception ex) {
                 if (ex != null) {
-                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                    
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE, 
topic);
+                    sinkContext.fileMetricAddFailCnt(simpleProfile, topic,
+                            arg0 == null ? "" : 
String.valueOf(arg0.partition()));
                     sinkContext.processSendFail(simpleProfile, clusterName, 
topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                     if (logCounter.shouldPrint()) {
                         LOG.error("Send SimplePackProfile to Kafka failure", 
ex);
                     }
                 } else {
+                    sinkContext.fileMetricAddSuccCnt(simpleProfile, topic,
+                            arg0 == null ? "" : 
String.valueOf(arg0.partition()));
                     
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(simpleProfile, 
clusterName, topic, true, sendTime);
                     
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 7800e615fa..e0a57e5e94 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -95,7 +95,7 @@ public class PulsarHandler implements MessageQueueHandler {
 
     private String tenant;
     private String namespace;
-    private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
+    private final ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
 
     /**
      * pulsar client
@@ -103,12 +103,12 @@ public class PulsarHandler implements MessageQueueHandler 
{
     private PulsarClient client;
     private ProducerBuilder<byte[]> baseBuilder;
 
-    private ConcurrentHashMap<String, Producer<byte[]>> producerMap = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Producer<byte[]>> producerMap = 
new ConcurrentHashMap<>();
 
     /**
      * init
-     * @param config
-     * @param sinkContext
+     * @param config   the Pulsar cluster configure
+     * @param sinkContext the sink context
      */
     public void init(CacheClusterConfig config, MessageQueueZoneSinkContext 
sinkContext) {
         this.config = config;
@@ -198,13 +198,13 @@ public class PulsarHandler implements MessageQueueHandler 
{
 
     /**
      * send
-     * @param profile
-     * @return
+     * @param profile   the profile to send
+     * @return   whether sent the profile
      */
     @Override
     public boolean send(PackProfile profile) {
+        String producerTopic = null;
         try {
-            String producerTopic;
             // get idConfig
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
@@ -266,7 +266,8 @@ public class PulsarHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception ex) {
-            
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
+            
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
+                    profile.getUid() + "." + producerTopic);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
             if (logCounter.shouldPrint()) {
@@ -323,7 +324,7 @@ public class PulsarHandler implements MessageQueueHandler {
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
                 sinkContext.processSendFail(batchProfile, clusterName, 
producerTopic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
@@ -357,13 +358,15 @@ public class PulsarHandler implements MessageQueueHandler 
{
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE, 
producerTopic);
+                sinkContext.fileMetricAddFailCnt(simpleProfile, producerTopic, 
msgId.toString());
                 sinkContext.processSendFail(simpleProfile, clusterName, 
producerTopic, sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
                     LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
                 }
             } else {
+                sinkContext.fileMetricAddSuccCnt(simpleProfile, producerTopic, 
msgId.toString());
                 
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(simpleProfile, clusterName, 
producerTopic, true, sendTime);
                 
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 0b5ee8cffa..d78e282d78 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -78,8 +78,8 @@ public class TubeHandler implements MessageQueueHandler {
 
     /**
      * init
-     * @param config
-     * @param sinkContext
+     * @param config   the cluster configure
+     * @param sinkContext the sink context
      */
     @Override
     public void init(CacheClusterConfig config, MessageQueueZoneSinkContext 
sinkContext) {
@@ -124,7 +124,7 @@ public class TubeHandler implements MessageQueueHandler {
 
     /**
      * initTubeConfig
-     * @return
+     * @return the client configure
      *
      * @throws Exception
      */
@@ -182,9 +182,9 @@ public class TubeHandler implements MessageQueueHandler {
      * send
      */
     public boolean send(PackProfile profile) {
+        String topic = null;
         try {
             // idConfig
-            String topic;
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
@@ -227,11 +227,14 @@ public class TubeHandler implements MessageQueueHandler {
                 this.sendBatchPackProfile((BatchPackProfile) profile, 
idConfig, topic);
             }
             return true;
-        } catch (Exception ex) {
-            
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
+        } catch (Throwable ex) {
+            sinkContext.fileMetricIncWithDetailStats(
+                    StatConstants.EVENT_SINK_SEND_EXCEPTION, profile.getUid() 
+ "." + topic);
             sinkContext.processSendFail(profile, clusterName, 
profile.getUid(), 0,
                     DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE, 
ex.getMessage());
-            LOG.error(ex.getMessage(), ex);
+            if (logCounter.shouldPrint()) {
+                LOG.error("Send Message to Tube failure", ex);
+            }
             return false;
         }
     }
@@ -265,10 +268,19 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onMessageSent(MessageSentResult result) {
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
-                sinkContext.addSendResultMetric(batchProfile, clusterName, 
topic, true, sendTime);
-                sinkContext.getDispatchQueue().release(batchProfile.getSize());
-                batchProfile.ack();
+                if (result.isSuccess()) {
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
+                    sinkContext.addSendResultMetric(batchProfile, clusterName, 
topic, true, sendTime);
+                    
sinkContext.getDispatchQueue().release(batchProfile.getSize());
+                    batchProfile.ack();
+                } else {
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
+                    sinkContext.processSendFail(batchProfile, clusterName, 
topic, sendTime,
+                            DataProxyErrCode.MQ_RETURN_ERROR, 
result.getErrMsg());
+                    if (logCounter.shouldPrint()) {
+                        LOG.error("Send ProfileV1 to tube failure {}", 
result.getErrMsg());
+                    }
+                }
             }
 
             @Override
@@ -277,7 +289,7 @@ public class TubeHandler implements MessageQueueHandler {
                 sinkContext.processSendFail(batchProfile, clusterName, topic, 
sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
-                    LOG.error("Send ProfileV1 to tube failure", ex);
+                    LOG.error("Send ProfileV1 to tube exception", ex);
                 }
             }
         };
@@ -299,19 +311,32 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onMessageSent(MessageSentResult result) {
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
-                sinkContext.addSendResultMetric(simpleProfile, clusterName, 
topic, true, sendTime);
-                
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
-                simpleProfile.ack();
+                if (result.isSuccess()) {
+                    sinkContext.fileMetricAddSuccCnt(simpleProfile, topic, 
result.getPartition().getHost());
+                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
+                    sinkContext.addSendResultMetric(simpleProfile, 
clusterName, topic, true, sendTime);
+                    
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+                    simpleProfile.ack();
+                } else {
+                    
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
+                            topic + "." + result.getErrCode());
+                    sinkContext.fileMetricAddFailCnt(simpleProfile, topic, 
result.getPartition().getHost());
+                    sinkContext.processSendFail(simpleProfile, clusterName, 
topic, sendTime,
+                            DataProxyErrCode.MQ_RETURN_ERROR, 
result.getErrMsg());
+                    if (logCounter.shouldPrint()) {
+                        LOG.error("Send SimpleProfileV0 to tube failure: {}", 
result.getErrMsg());
+                    }
+                }
             }
 
             @Override
             public void onException(Throwable ex) {
                 
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                sinkContext.fileMetricAddFailCnt(simpleProfile, topic, "");
                 sinkContext.processSendFail(simpleProfile, clusterName, topic, 
sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
-                    LOG.error("Send SimpleProfileV0 to tube failure", ex);
+                    LOG.error("Send SimpleProfileV0 to tube exception", ex);
                 }
             }
         };
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index a4d094872b..1ff018786b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -295,7 +295,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
             }
         } catch (Throwable ex) {
             
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
-            source.fileMetricAddFailCnt(statsKey, msgCodec.getMsgCount());
+            source.fileMetricAddFailCnt(statsKey, 1);
             source.addMetric(false, event.getBody().length, event);
             if (msgCodec.isNeedResp() && !msgCodec.isOrderOrProxy()) {
                 
msgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index fd016f4857..2244207f06 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -332,6 +332,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
                 
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
         inLongMsg.addMsg(strBuff.toString(), 
body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
         byte[] inlongMsgData = inLongMsg.buildArray();
+        long pkgTime = inLongMsg.getCreatetime();
         inLongMsg.reset();
         strBuff.delete(0, strBuff.length());
         // build flume event
@@ -344,6 +345,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
         eventHeaders.put(ConfigConstants.MSG_ENCODE_VER, 
InLongMsgVer.INLONG_V0.getName());
         eventHeaders.put(AttributeConstants.RCV_TIME, 
String.valueOf(msgRcvTime));
+        eventHeaders.put(ConfigConstants.PKG_TIME_KEY, 
DateTimeUtils.ms2yyyyMMddHHmm(pkgTime));
         Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
         // build metric data item
         dataTime = dataTime / 1000 / 60 / 10;
@@ -366,7 +368,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
             return true;
         } catch (Throwable ex) {
             
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
-            source.fileMetricAddFailCnt(statsKey, intMsgCnt);
+            source.fileMetricAddFailCnt(statsKey, 1);
             source.addMetric(false, event.getBody().length, event);
             sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
                     strBuff.append("Put event to channel failure: 
").append(ex.getMessage()).toString());
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index d5dd70c7a4..9f31e7f97e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -272,7 +272,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                 }
                 return false;
             }
-            if (StringUtils.isNotBlank(this.groupId) && 
!this.groupId.equalsIgnoreCase(confGroupId)) {
+            if (StringUtils.isNotBlank(this.groupId) && 
!this.groupId.equals(confGroupId)) {
                 
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_GROUP_IDNUM_INCONSTANT);
                 this.errCode = DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
                 this.errMsg = String.format(
@@ -305,7 +305,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                     }
                     return false;
                 }
-                if (StringUtils.isNotBlank(this.streamId) && 
!this.streamId.equalsIgnoreCase(confStreamId)) {
+                if (StringUtils.isNotBlank(this.streamId) && 
!this.streamId.equals(confStreamId)) {
                     
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_STREAM_IDNUM_INCONSTANT);
                     this.errCode = 
DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
                     this.errMsg = String.format(
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 9f0111e21e..8e02c66b0e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -38,5 +38,8 @@ public class TestCommonConfigHolder {
         Assert.assertEquals(10000, 
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
         
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept());
         
Assert.assertTrue(CommonConfigHolder.getInstance().getDefTopics().contains("test2"));
+        
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure());
+        Assert.assertEquals(2, 
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure());
     }
+
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties 
b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index 4ee6350595..e37ef6cfeb 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -30,3 +30,7 @@ meta.config.sync.interval.ms=10000
 id2topic.unconfigured.accept.enable=true
 # the default topic if accept unconfigured topic's message
 id2topic.unconfigured.default.topics= test1 test2 test3
+# whether to retry send after sent failure
+send.retry.after.failure = true
+# max retries after sent failure
+max.retries.after.failure=2


Reply via email to