This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 370815119 [INLONG-8084][DataProxy] Add abnormal event statistics in 
sink (#8085)
370815119 is described below

commit 370815119d7d8898bbf13febc5b70628893ef43f
Author: Goson Zhang <[email protected]>
AuthorDate: Thu May 25 14:05:29 2023 +0800

    [INLONG-8084][DataProxy] Add abnormal event statistics in sink (#8085)
---
 .../dataproxy/config/CommonConfigHolder.java       | 86 ++++++++++++++++++++++
 .../inlong/dataproxy/consts/StatConstants.java     | 27 ++++---
 .../inlong/dataproxy/sink/common/SinkContext.java  | 38 ++++++++++
 .../sink/mq/MessageQueueZoneSinkContext.java       |  3 +
 .../dataproxy/sink/mq/kafka/KafkaHandler.java      | 43 +++++------
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java    | 28 +++++--
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 36 +++++++--
 .../inlong/dataproxy/source2/BaseSource.java       | 38 ++++------
 .../inlong/dataproxy/source2/SourceConstants.java  | 11 ---
 .../source2/httpMsg/InLongHttpMsgHandler.java      |  3 +-
 10 files changed, 227 insertions(+), 86 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index d6f3aec76..71a514444 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -75,6 +75,26 @@ public class CommonConfigHolder {
     // whether enable whitelist, optional field.
     public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
     public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
+    // whether enable file metric, optional field.
+    public static final String KEY_ENABLE_FILE_METRIC = "file.metric.enable";
+    public static final boolean VAL_DEF_ENABLE_FILE_METRIC = true;
+    // file metric statistic interval (second)
+    public static final String KEY_FILE_METRIC_STAT_INTERVAL_SEC = 
"file.metric.stat.interval.sec";
+    public static final int VAL_DEF_FILE_METRIC_STAT_INVL_SEC = 60;
+    public static final int VAL_MIN_FILE_METRIC_STAT_INVL_SEC = 0;
+    // file metric max statistic key count
+    public static final String KEY_FILE_METRIC_MAX_CACHE_CNT = 
"file.metric.max.cache.cnt";
+    public static final int VAL_DEF_FILE_METRIC_MAX_CACHE_CNT = 1000000;
+    public static final int VAL_MIN_FILE_METRIC_MAX_CACHE_CNT = 0;
+    // source metric statistic name
+    public static final String KEY_FILE_METRIC_SOURCE_OUTPUT_NAME = 
"file.metric.source.output.name";
+    public static final String VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME = 
"Source";
+    // sink metric statistic name
+    public static final String KEY_FILE_METRIC_SINK_OUTPUT_NAME = 
"file.metric.sink.output.name";
+    public static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
+    // event metric statistic name
+    public static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME = 
"file.metric.event.output.name";
+    public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME = 
"DataProxy_monitors";
     // Audit fields
     public static final String KEY_ENABLE_AUDIT = "audit.enable";
     public static final boolean VAL_DEF_ENABLE_AUDIT = true;
@@ -140,6 +160,12 @@ public class CommonConfigHolder {
     private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
     private String msgCompressType = VAL_DEF_MSG_COMPRESS_TYPE;
     private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+    private boolean enableFileMetric = VAL_DEF_ENABLE_FILE_METRIC;
+    private int fileMetricStatInvlSec = VAL_DEF_FILE_METRIC_STAT_INVL_SEC;
+    private int fileMetricStatCacheCnt = VAL_DEF_FILE_METRIC_MAX_CACHE_CNT;
+    private String fileMetricSourceOutName = 
VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME;
+    private String fileMetricSinkOutName = 
VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME;
+    private String fileMetricEventOutName = 
VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME;
 
     /**
      * get instance for common.properties config manager
@@ -232,6 +258,18 @@ public class CommonConfigHolder {
         return enableAudit;
     }
 
+    public boolean isEnableFileMetric() {
+        return enableFileMetric;
+    }
+
+    public int getFileMetricStatInvlSec() {
+        return fileMetricStatInvlSec;
+    }
+
+    public int getFileMetricStatCacheCnt() {
+        return fileMetricStatCacheCnt;
+    }
+
     public HashSet<String> getAuditProxys() {
         return auditProxys;
     }
@@ -280,6 +318,18 @@ public class CommonConfigHolder {
         return msgCompressType;
     }
 
+    public String getFileMetricSourceOutName() {
+        return fileMetricSourceOutName;
+    }
+
+    public String getFileMetricSinkOutName() {
+        return fileMetricSinkOutName;
+    }
+
+    public String getFileMetricEventOutName() {
+        return fileMetricEventOutName;
+    }
+
     private void preReadFields() {
         String tmpValue;
         // read cluster tag
@@ -331,6 +381,42 @@ public class CommonConfigHolder {
         if (StringUtils.isNotBlank(tmpValue)) {
             this.managerAuthSecretKey = tmpValue.trim();
         }
+        // read whether enable file metric
+        tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.enableFileMetric = "TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read file metric statistic interval
+        tmpValue = this.props.get(KEY_FILE_METRIC_STAT_INTERVAL_SEC);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            int statInvl = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_FILE_METRIC_STAT_INVL_SEC);
+            if (statInvl >= VAL_MIN_FILE_METRIC_MAX_CACHE_CNT) {
+                this.fileMetricStatInvlSec = statInvl;
+            }
+        }
+        // read file metric statistic max cache count
+        tmpValue = this.props.get(KEY_FILE_METRIC_MAX_CACHE_CNT);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            int maxCacheCnt = NumberUtils.toInt(tmpValue.trim(), 
VAL_DEF_FILE_METRIC_MAX_CACHE_CNT);
+            if (maxCacheCnt >= VAL_MIN_FILE_METRIC_STAT_INVL_SEC) {
+                this.fileMetricStatCacheCnt = maxCacheCnt;
+            }
+        }
+        // read source file statistic output name
+        tmpValue = this.props.get(KEY_FILE_METRIC_SOURCE_OUTPUT_NAME);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.fileMetricSourceOutName = tmpValue.trim();
+        }
+        // read sink file statistic output name
+        tmpValue = this.props.get(KEY_FILE_METRIC_SINK_OUTPUT_NAME);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.fileMetricSinkOutName = tmpValue.trim();
+        }
+        // read event file statistic output name
+        tmpValue = this.props.get(KEY_FILE_METRIC_EVENT_OUTPUT_NAME);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.fileMetricEventOutName = tmpValue.trim();
+        }
         // read whether enable audit
         tmpValue = this.props.get(KEY_ENABLE_AUDIT);
         if (StringUtils.isNotEmpty(tmpValue)) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 6dbc4f14b..c445cadd3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -23,17 +23,6 @@ public class StatConstants {
     public static final java.lang.String EVENT_SERVICE_UNREADY = 
"sink.unready";
     public static final java.lang.String EVENT_VISITIP_ILLEGAL = 
"links.illegal";
     public static final java.lang.String EVENT_NOTOPIC = "config.notopic";
-
-    public static final java.lang.String METASINK_SUCCESS = "metasink.success";
-    public static final java.lang.String METASINK_DROPPED = "metasink.dropped";
-    public static final java.lang.String METASINK_RETRY = "metasink.retry";
-    public static final java.lang.String METASINK_OTHEREXP = 
"metasink.otherexp";
-    public static final java.lang.String METASINK_NOTOPIC = "metasink.notopic";
-    public static final java.lang.String METASINK_NOSLAVE = "metasink.noslave";
-    public static final java.lang.String METASINK_MSG_NOTOPIC = 
"metasink.msgnotopic";
-    public static final java.lang.String METASINK_PROCESS_SPEED = 
"metasink.process.speed";
-    public static final java.lang.String EVENT_OTHEREXP = "socketmsg.otherexp";
-    public static final java.lang.String EVENT_INVALID = "socketmsg.invalid";
     // source
     public static final java.lang.String EVENT_LINKS_OVERMAX = "links.overmax";
     public static final java.lang.String EVENT_LINKS_IN = "links.linkin";
@@ -72,6 +61,22 @@ public class StatConstants {
     public static final java.lang.String EVENT_HTTP_POST_SUCCESS = 
"httpmsg.success";
     public static final java.lang.String EVENT_HTTP_POST_DROPPED = 
"httpmsg.dropped";
 
+    public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
+    public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
+    public static final java.lang.String EVENT_SINK_NOPRODUCER = 
"sink.noproducer";
+    public static final java.lang.String EVENT_SINK_SENDEXCEPT = 
"sink.sendexcept";
+    public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry";
+    public static final java.lang.String EVENT_SINK_FAILDROPPED = 
"sink.dropped";
+    public static final java.lang.String EVENT_SINK_SUCCESS = "sink.success";
+    public static final java.lang.String EVENT_SINK_RECEIVEEXCEPT = 
"sink.rcvexcept";
+
+    public static final java.lang.String METASINK_OTHEREXP = 
"metasink.otherexp";
+    public static final java.lang.String METASINK_NOSLAVE = "metasink.noslave";
+    public static final java.lang.String METASINK_MSG_NOTOPIC = 
"metasink.msgnotopic";
+    public static final java.lang.String METASINK_PROCESS_SPEED = 
"metasink.process.speed";
+    public static final java.lang.String EVENT_OTHEREXP = "socketmsg.otherexp";
+    public static final java.lang.String EVENT_INVALID = "socketmsg.invalid";
+
     public static final java.lang.String AGENT_MESSAGES_SENT_SUCCESS = 
"agent.messages.success";
     public static final java.lang.String AGENT_PACKAGES_SENT_SUCCESS = 
"agent.packages.success";
     public static final java.lang.String MSG_COUNTER_KEY = "msgcnt";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 8641d0606..95e00d382 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -21,8 +21,11 @@ import org.apache.commons.lang.ClassUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.monitor.MonitorIndex;
+import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
@@ -59,6 +62,9 @@ public class SinkContext {
     //
     protected final DataProxyMetricItemSet metricItemSet;
     protected Timer reloadTimer;
+    // file metric statistic
+    protected MonitorIndex monitorIndex = null;
+    private MonitorIndexExt monitorIndexExt = null;
 
     /**
      * Constructor
@@ -80,6 +86,17 @@ public class SinkContext {
      * start
      */
     public void start() {
+        // init monitor logic
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            this.monitorIndex = new 
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSinkOutName(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+            this.monitorIndexExt = new MonitorIndexExt(
+                    
CommonConfigHolder.getInstance().getFileMetricEventOutName()
+                            + AttrConstants.SEP_HASHTAG + this.getSinkName(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+        }
         try {
             this.reload();
             this.setReloadTimer();
@@ -97,6 +114,27 @@ public class SinkContext {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
+        // stop file statistic index
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            if (monitorIndex != null) {
+                monitorIndex.shutDown();
+            }
+            if (monitorIndexExt != null) {
+                monitorIndexExt.shutDown();
+            }
+        }
+    }
+
+    public void fileMetricEventInc(String eventKey) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            monitorIndexExt.incrementAndGet(eventKey);
+        }
+    }
+
+    public void fileMetricRecordAdd(String key, int cnt, int packCnt, long 
packSize, int failCnt) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
+        }
     }
 
     /**
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 8308af8ec..2f9d54375 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -25,6 +25,7 @@ import org.apache.flume.conf.Configurable;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
+import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.common.SinkContext;
@@ -271,9 +272,11 @@ public class MessageQueueZoneSinkContext extends 
SinkContext {
     public void processSendFail(BatchPackProfile currentRecord, String mqName, 
String topic, long sendTime) {
         if (currentRecord.isResend()) {
             dispatchQueue.offer(currentRecord);
+            fileMetricEventInc(StatConstants.EVENT_SINK_FAILRETRY);
             this.addSendResultMetric(currentRecord, mqName, topic, false, 
sendTime);
         } else {
             currentRecord.fail();
+            fileMetricEventInc(StatConstants.EVENT_SINK_FAILDROPPED);
         }
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index bce84aba9..dc3bc94a9 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -23,6 +23,7 @@ import org.apache.flume.Context;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.sink.common.EventHandler;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
@@ -111,20 +112,24 @@ public class KafkaHandler implements MessageQueueHandler {
             // idConfig
             IdTopicConfig idConfig = 
sinkContext.getIdTopicHolder().getIdConfig(event.getUid());
             if (idConfig == null) {
+                sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
                 sinkContext.addSendResultMetric(event, clusterName, 
event.getUid(), false, 0);
                 sinkContext.getDispatchQueue().release(event.getSize());
+                event.fail();
                 return false;
             }
             String baseTopic = idConfig.getTopicName();
             if (baseTopic == null) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
                 sinkContext.addSendResultMetric(event, clusterName, 
event.getUid(), false, 0);
                 sinkContext.getDispatchQueue().release(event.getSize());
+                event.fail();
                 return false;
             }
             String topic = getProducerTopic(baseTopic, idConfig);
-
             // create producer failed
             if (producer == null) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
                 sinkContext.processSendFail(event, clusterName, topic, 0);
                 return false;
             }
@@ -138,8 +143,9 @@ public class KafkaHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
             sinkContext.processSendFail(event, clusterName, event.getUid(), 0);
+            LOG.error(e.getMessage(), e);
             return false;
         }
     }
@@ -188,14 +194,11 @@ public class KafkaHandler implements MessageQueueHandler {
             @Override
             public void onCompletion(RecordMetadata arg0, Exception ex) {
                 if (ex != null) {
-                    LOG.error("Send fail:{}", ex.getMessage());
-                    LOG.error(ex.getMessage(), ex);
-                    if (event.isResend()) {
-                        sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
-                    } else {
-                        event.fail();
-                    }
+                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                    sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
+                    LOG.error("Send ProfileV1 to Kafka failure", ex);
                 } else {
+                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(event, clusterName, topic, 
true, sendTime);
                     sinkContext.getDispatchQueue().release(event.getSize());
                     event.ack();
@@ -235,14 +238,11 @@ public class KafkaHandler implements MessageQueueHandler {
             @Override
             public void onCompletion(RecordMetadata arg0, Exception ex) {
                 if (ex != null) {
-                    LOG.error("Send fail:{}", ex.getMessage());
-                    LOG.error(ex.getMessage(), ex);
-                    if (event.isResend()) {
-                        sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
-                    } else {
-                        event.fail();
-                    }
+                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                    sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
+                    LOG.error("Send SimpleProfileV0 to Kafka failure", ex);
                 } else {
+                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(event, clusterName, topic, 
true, sendTime);
                     sinkContext.getDispatchQueue().release(event.getSize());
                     event.ack();
@@ -279,14 +279,11 @@ public class KafkaHandler implements MessageQueueHandler {
             @Override
             public void onCompletion(RecordMetadata arg0, Exception ex) {
                 if (ex != null) {
-                    LOG.error("Send fail:{}", ex.getMessage());
-                    LOG.error(ex.getMessage(), ex);
-                    if (event.isResend()) {
-                        sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
-                    } else {
-                        event.fail();
-                    }
+                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                    sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
+                    LOG.error("Send OrderProfileV0 to Kafka failure", ex);
                 } else {
+                    
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                     sinkContext.addSendResultMetric(event, clusterName, topic, 
true, sendTime);
                     sinkContext.getDispatchQueue().release(event.getSize());
                     event.ack();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index b500485d7..35205ee62 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -21,8 +21,10 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
+import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.sink.common.EventHandler;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
@@ -58,6 +60,8 @@ import static 
org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_STATS_INTER
 public class PulsarHandler implements MessageQueueHandler {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(PulsarHandler.class);
+    // log print count
+    private static final LogCounter logCounter = new LogCounter(10, 100000, 30 
* 1000);
 
     public static final String KEY_TENANT = "tenant";
     public static final String KEY_NAMESPACE = "namespace";
@@ -194,19 +198,24 @@ public class PulsarHandler implements MessageQueueHandler 
{
             // idConfig
             IdTopicConfig idConfig = 
sinkContext.getIdTopicHolder().getIdConfig(event.getUid());
             if (idConfig == null) {
+                sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
                 sinkContext.addSendResultMetric(event, clusterName, 
event.getUid(), false, 0);
                 sinkContext.getDispatchQueue().release(event.getSize());
+                event.fail();
                 return false;
             }
             String baseTopic = idConfig.getTopicName();
             if (baseTopic == null) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
                 sinkContext.addSendResultMetric(event, clusterName, 
event.getUid(), false, 0);
                 sinkContext.getDispatchQueue().release(event.getSize());
+                event.fail();
                 return false;
             }
             // topic
             String producerTopic = this.getProducerTopic(baseTopic, idConfig);
             if (producerTopic == null) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
                 sinkContext.addSendResultMetric(event, clusterName, 
event.getUid(), false, 0);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 event.fail();
@@ -236,6 +245,7 @@ public class PulsarHandler implements MessageQueueHandler {
             }
             // create producer failed
             if (producer == null) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
                 sinkContext.processSendFail(event, clusterName, producerTopic, 
0);
                 return false;
             }
@@ -249,8 +259,9 @@ public class PulsarHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
             sinkContext.processSendFail(event, clusterName, event.getUid(), 0);
+            LOG.error(e.getMessage(), e);
             return false;
         }
     }
@@ -321,10 +332,11 @@ public class PulsarHandler implements MessageQueueHandler 
{
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                LOG.error("Send fail:{}", ex.getMessage());
-                LOG.error(ex.getMessage(), ex);
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(event, clusterName, producerTopic, 
sendTime);
+                LOG.error("Send ProfileV1 to Pulsar failure", ex);
             } else {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(event, clusterName, 
producerTopic, true, sendTime);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 event.ack();
@@ -354,10 +366,11 @@ public class PulsarHandler implements MessageQueueHandler 
{
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                LOG.error("Send fail:{}", ex.getMessage());
-                LOG.error(ex.getMessage(), ex);
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(event, clusterName, producerTopic, 
sendTime);
+                LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
             } else {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(event, clusterName, 
producerTopic, true, sendTime);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 event.ack();
@@ -383,10 +396,11 @@ public class PulsarHandler implements MessageQueueHandler 
{
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
-                LOG.error("Send fail:{}", ex.getMessage());
-                LOG.error(ex.getMessage(), ex);
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(event, clusterName, producerTopic, 
sendTime);
+                LOG.error("Send OrderProfileV0 to Pulsar failure", ex);
             } else {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(event, clusterName, 
producerTopic, true, sendTime);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 event.ack();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index ecd1ddb36..cad19c2ee 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -19,9 +19,11 @@ package org.apache.inlong.dataproxy.sink.mq.tube;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
+import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.sink.common.EventHandler;
 import org.apache.inlong.dataproxy.sink.common.TubeUtils;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
@@ -49,6 +51,9 @@ import java.util.Set;
 public class TubeHandler implements MessageQueueHandler {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(TubeHandler.class);
+    // log print count
+    private static final LogCounter logCounter = new LogCounter(10, 100000, 30 
* 1000);
+
     private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
     public static final String KEY_NAMESPACE = "namespace";
 
@@ -172,20 +177,25 @@ public class TubeHandler implements MessageQueueHandler {
             // idConfig
             IdTopicConfig idConfig = 
sinkContext.getIdTopicHolder().getIdConfig(event.getUid());
             if (idConfig == null) {
+                sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
                 sinkContext.addSendResultMetric(event, clusterName, 
event.getUid(), false, 0);
                 sinkContext.getDispatchQueue().release(event.getSize());
+                event.fail();
                 return false;
             }
             String topic = getTubeTopic(idConfig);
             if (topic == null) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
                 sinkContext.addSendResultMetric(event, clusterName, 
event.getUid(), false, 0);
                 sinkContext.getDispatchQueue().release(event.getSize());
+                event.fail();
                 return false;
             }
             // create producer failed
             if (producer == null) {
-                LOG.error("producer is null");
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
                 sinkContext.processSendFail(event, clusterName, topic, 0);
+                LOG.error("producer is null");
                 return false;
             }
             // publish
@@ -203,8 +213,9 @@ public class TubeHandler implements MessageQueueHandler {
             }
             return true;
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
             sinkContext.processSendFail(event, clusterName, event.getUid(), 0);
+            LOG.error(e.getMessage(), e);
             return false;
         }
     }
@@ -238,6 +249,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onMessageSent(MessageSentResult result) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(event, clusterName, topic, 
true, sendTime);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 event.ack();
@@ -245,9 +257,11 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onException(Throwable ex) {
-                LOG.error("Send fail:{}", ex.getMessage());
-                LOG.error(ex.getMessage(), ex);
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
+                if (logCounter.shouldPrint()) {
+                    LOG.error("Send ProfileV1 to tube failure", ex);
+                }
             }
         };
         producer.sendMessage(message, callback);
@@ -268,6 +282,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onMessageSent(MessageSentResult result) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(event, clusterName, topic, 
true, sendTime);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 event.ack();
@@ -275,9 +290,11 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onException(Throwable ex) {
-                LOG.error("Send fail:{}", ex.getMessage());
-                LOG.error(ex.getMessage(), ex);
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
+                if (logCounter.shouldPrint()) {
+                    LOG.error("Send SimpleProfileV0 to tube failure", ex);
+                }
             }
         };
         producer.sendMessage(message, callback);
@@ -304,6 +321,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onMessageSent(MessageSentResult result) {
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
                 sinkContext.addSendResultMetric(event, clusterName, topic, 
true, sendTime);
                 sinkContext.getDispatchQueue().release(event.getSize());
                 event.ack();
@@ -312,9 +330,11 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onException(Throwable ex) {
-                LOG.error("Send fail:{}", ex.getMessage());
-                LOG.error(ex.getMessage(), ex);
+                
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
                 sinkContext.processSendFail(event, clusterName, topic, 
sendTime);
+                if (logCounter.shouldPrint()) {
+                    LOG.error("Send OrderProfileV0 to tube failure", ex);
+                }
             }
         };
         producer.sendMessage(message, callback);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index d843f8b1e..7a86784c6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -33,6 +33,7 @@ import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -109,13 +110,10 @@ public abstract class BaseSource
     // send buffer size
     protected int maxSendBufferSize;
     // file metric statistic
-    protected boolean fileMetricOn;
-    protected int monitorStatInvlSec;
-    protected int maxMonitorStatCnt;
     protected MonitorIndex monitorIndex = null;
+    private MonitorIndexExt monitorIndexExt = null;
     // metric set
     protected DataProxyMetricItemSet metricItemSet;
-    private MonitorIndexExt monitorIndexExt = null;
 
     public BaseSource() {
         super();
@@ -195,27 +193,12 @@ public abstract class BaseSource
                 SourceConstants.SRCCXT_MAX_READ_IDLE_TIME_MS + " must be in ["
                         + SourceConstants.VAL_MIN_READ_IDLE_TIME_MS + ", "
                         + SourceConstants.VAL_MAX_READ_IDLE_TIME_MS + "]");
-        // get file metric statistic
-        this.monitorStatInvlSec = ConfStringUtils.getIntValue(context,
-                SourceConstants.SRCCXT_STAT_INTERVAL_SEC, 
SourceConstants.VAL_DEF_STAT_INVL_SEC);
-        Preconditions.checkArgument((this.monitorStatInvlSec >= 
SourceConstants.VAL_MIN_STAT_INVL_SEC),
-                SourceConstants.SRCCXT_STAT_INTERVAL_SEC + " must be >= "
-                        + SourceConstants.VAL_MIN_STAT_INVL_SEC);
-        // get max monitor key count
-        this.maxMonitorStatCnt = ConfStringUtils.getIntValue(context,
-                SourceConstants.SRCCXT_MAX_MONITOR_STAT_CNT, 
SourceConstants.VAL_DEF_MON_STAT_CNT);
-        Preconditions.checkArgument(this.maxMonitorStatCnt >= 
SourceConstants.VAL_MIN_MON_STAT_CNT,
-                SourceConstants.SRCCXT_MAX_MONITOR_STAT_CNT + " must be >= "
-                        + SourceConstants.VAL_MIN_MON_STAT_CNT);
         // get max connect count
         this.maxConnections = ConfStringUtils.getIntValue(context,
                 SourceConstants.SRCCXT_MAX_CONNECTION_CNT, 
SourceConstants.VAL_DEF_MAX_CONNECTION_CNT);
         Preconditions.checkArgument(this.maxConnections >= 
SourceConstants.VAL_MIN_CONNECTION_CNT,
                 SourceConstants.SRCCXT_MAX_CONNECTION_CNT + " must be >= "
                         + SourceConstants.VAL_MIN_CONNECTION_CNT);
-        // get whether enable file metric
-        this.fileMetricOn = 
context.getBoolean(SourceConstants.SRCCXT_FILE_METRIC_ON,
-                SourceConstants.VAL_DEF_FILE_METRIC_ON);
         // get max receive buffer size
         this.maxRcvBufferSize = ConfStringUtils.getIntValue(context,
                 SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE, 
SourceConstants.VAL_DEF_RECEIVE_BUFFER_SIZE);
@@ -251,10 +234,15 @@ public abstract class BaseSource
                 CommonConfigHolder.getInstance().getClusterName(), getName(), 
String.valueOf(srcPort));
         MetricRegister.register(metricItemSet);
         // init monitor logic
-        if (fileMetricOn) {
-            this.monitorIndex = new MonitorIndex("Source", monitorStatInvlSec, 
maxMonitorStatCnt);
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+            this.monitorIndex = new 
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
             this.monitorIndexExt = new MonitorIndexExt(
-                    "DataProxy_monitors#" + this.getProtocolName(), 
monitorStatInvlSec, maxMonitorStatCnt);
+                    
CommonConfigHolder.getInstance().getFileMetricEventOutName()
+                            + AttrConstants.SEP_HASHTAG + 
this.getProtocolName(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+                    
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
         }
         startSource();
         // register
@@ -285,7 +273,7 @@ public abstract class BaseSource
         // stop super class
         super.stop();
         // stop file statistic index
-        if (fileMetricOn) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
             if (monitorIndex != null) {
                 monitorIndex.shutDown();
             }
@@ -373,13 +361,13 @@ public abstract class BaseSource
     }
 
     public void fileMetricEventInc(String eventKey) {
-        if (fileMetricOn) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
             monitorIndexExt.incrementAndGet(eventKey);
         }
     }
 
     public void fileMetricRecordAdd(String key, int cnt, int packCnt, long 
packSize, int failCnt) {
-        if (fileMetricOn) {
+        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
             monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
         }
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 1f556c895..64bcf55eb 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -62,17 +62,6 @@ public class SourceConstants {
     public static final int VAL_DEF_WORKER_THREADS = 
Runtime.getRuntime().availableProcessors();
     public static final int VAL_MIN_WORKER_THREADS = 1;
     public static final int VAL_MAX_WORKER_THREADS = 
Runtime.getRuntime().availableProcessors() * 2;
-    // file metric statistic interval(second)
-    public static final String SRCCXT_STAT_INTERVAL_SEC = "stat-interval-sec";
-    public static final int VAL_DEF_STAT_INVL_SEC = 60;
-    public static final int VAL_MIN_STAT_INVL_SEC = 0;
-    // max file statistic key count
-    public static final String SRCCXT_MAX_MONITOR_STAT_CNT = "max-monitor-cnt";
-    public static final int VAL_DEF_MON_STAT_CNT = 1000000;
-    public static final int VAL_MIN_MON_STAT_CNT = 0;
-    // max file statistic key count
-    public static final String SRCCXT_FILE_METRIC_ON = "file-metric-on";
-    public static final boolean VAL_DEF_FILE_METRIC_ON = true;
     // max connection count
     public static final String SRCCXT_MAX_CONNECTION_CNT = "connections";
     public static final int VAL_DEF_MAX_CONNECTION_CNT = 5000;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index 924760a7e..8de181843 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -274,7 +274,8 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         // build metric data item
         dataTime = dataTime / 1000 / 60 / 10;
         dataTime = dataTime * 1000 * 60 * 10;
-        
strBuff.append("http").append(AttrConstants.SEP_HASHTAG).append(topicName)
+        strBuff.append(source.getProtocolName())
+                .append(AttrConstants.SEP_HASHTAG).append(topicName)
                 .append(AttrConstants.SEP_HASHTAG).append(streamId)
                 .append(AttrConstants.SEP_HASHTAG).append(clientIp)
                 
.append(AttrConstants.SEP_HASHTAG).append(NetworkUtils.getLocalIp())


Reply via email to