This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 083b1dfb76 [INLONG-8914][DataProxy] Optimize DataProxy event 
statistics (#8915)
083b1dfb76 is described below

commit 083b1dfb76ffdd0ede39277d2afa9208ec3b64cb
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Sep 14 22:21:39 2023 +0800

    [INLONG-8914][DataProxy] Optimize DataProxy event statistics (#8915)
---
 .../inlong/dataproxy/config/ConfigManager.java     |  4 +--
 .../inlong/dataproxy/consts/ConfigConstants.java   |  2 --
 .../inlong/dataproxy/consts/StatConstants.java     |  8 +++--
 .../dataproxy/sink/mq/MessageQueueZoneSink.java    |  4 +--
 .../dataproxy/sink/mq/kafka/KafkaHandler.java      | 24 +++++++------
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java    | 41 +++++++++++++---------
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 29 ++++++++-------
 .../apache/inlong/dataproxy/source/BaseSource.java | 16 ++-------
 .../dataproxy/source/ServerMessageHandler.java     | 10 +++---
 .../inlong/dataproxy/source/SourceConstants.java   |  2 --
 .../source/httpMsg/HttpMessageHandler.java         | 14 ++++----
 .../inlong/dataproxy/source/v0msg/CodecBinMsg.java |  8 ++---
 .../dataproxy/source/v0msg/CodecTextMsg.java       |  4 +--
 .../inlong/dataproxy/source/UdpSourceTest.java     |  1 -
 14 files changed, 86 insertions(+), 81 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index c72af67ae4..647a41936e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -359,8 +359,8 @@ public class ConfigManager {
                 String returnStr = EntityUtils.toString(response.getEntity());
                 long dltTime = System.currentTimeMillis() - startTime;
                 if (dltTime >= 
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()) {
-                    LOG.warn("End to request {} to get config info:{}, WAIST 
{} ms",
-                            url, returnStr, dltTime);
+                    LOG.warn("End to request {} to get config info, WAIST {} 
ms, over alarm value {} ms",
+                            url, dltTime, 
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs());
                 } else {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("End to request {} to get config info:{}, 
WAIST {} ms",
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 47f2e9fcb9..0622f7aa3c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -37,8 +37,6 @@ public class ConfigConstants {
 
     public static final String TOPIC = "topic";
 
-    public static final String ATTR = "attr";
-
     public static final String FILTER_EMPTY_MSG = "filter-empty-msg";
 
     public static final String TCP_NO_DELAY = "tcpNoDelay";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index b67b3537fe..8617f74b8a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -41,7 +41,7 @@ public class StatConstants {
     public static final java.lang.String EVENT_HTTP_LINK_UNKNOWN_EXCEPTION = 
"http.link.unknown.exception";
     public static final java.lang.String EVENT_HTTP_LINK_UNWRITABLE = 
"http.link.unwritable";
     // configure
-    public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING = 
"config.topic.missing";
+    public static final java.lang.String EVENT_SOURCE_TOPIC_MISSING = 
"source.topic.missing";
     public static final java.lang.String EVENT_CONFIG_IDNUM_EMPTY = 
"config.idnum.empty";
     public static final java.lang.String EVENT_CONFIG_GROUPIDNUM_MISSING = 
"config.groupidnum.missing";
     public static final java.lang.String EVENT_CONFIG_GROUP_IDNUM_INCONSTANT = 
"config.group.idnum.incons";
@@ -91,12 +91,14 @@ public class StatConstants {
     public static final java.lang.String EVENT_SINK_EVENT_V1_MALFORMED = 
"sink.event.v1.malformed";
     public static final java.lang.String EVENT_SINK_EVENT_TAKE_SUCCESS = 
"sink.event.take.success";
     public static final java.lang.String EVENT_SINK_EVENT_TAKE_FAILURE = 
"sink.event.take.failure";
-    public static final java.lang.String EVENT_SINK_EVENT_V1_FILE = 
"sink.event.v1.file";
-    public static final java.lang.String EVENT_SINK_EVENT_V0_FILE = 
"sink.event.v1.file";
+    public static final java.lang.String EVENT_SINK_FILE_V1_TAKE_SUCCESS = 
"sink.file.v1.take.success";
+    public static final java.lang.String EVENT_SINK_FILE_V0_TAKE_SUCCESS = 
"sink.file.v0.take.success";
     public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING = 
"sink.topic.missing";
     public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING = 
"default.topic.empty";
     public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED = 
"default.topic.used";
     public static final java.lang.String EVENT_SINK_PRODUCER_NULL = 
"sink.producer.null";
+    public static final java.lang.String EVENT_SINK_PRODUCER_CREATE_SUCCESS = 
"sink.producer.create.success";
+    public static final java.lang.String EVENT_SINK_PRODUCER_CREATE_FAILURE = 
"sink.producer.create.failure";
     public static final java.lang.String EVENT_SINK_CLUSTER_EMPTY = 
"sink.cluster.empty";
     public static final java.lang.String EVENT_SINK_CLUSTER_UNMATCHED = 
"sink.cluster.unmatched";
     public static final java.lang.String EVENT_SINK_CPRODUCER_NULL = 
"sink.cluster.producer.null";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index a9897641fe..8b654a65bb 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -234,7 +234,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
                     ProxyEvent proxyEvent = new ProxyEvent(groupId, streamId, 
msgTimeStr,
                             sourceIp, sourceTimeStr, event.getHeaders(), 
event.getBody());
                     this.dispatchManager.addEvent(proxyEvent);
-                    
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_FILE);
+                    
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_FILE_V1_TAKE_SUCCESS);
                 } else {
                     
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_MALFORMED);
                 }
@@ -243,7 +243,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
                 simpleEvent.setBody(event.getBody());
                 simpleEvent.setHeaders(event.getHeaders());
                 this.dispatchManager.addSimpleEvent(simpleEvent);
-                
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V0_FILE);
+                
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_FILE_V0_TAKE_SUCCESS);
             }
             tx.commit();
             return Status.READY;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index b282cc31ca..e9ab048781 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -124,7 +124,20 @@ public class KafkaHandler implements MessageQueueHandler {
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getSinkIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
-                if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                // add default topics first
+                if 
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                    topic = 
CommonConfigHolder.getInstance().getRandDefTopics();
+                    if (StringUtils.isEmpty(topic)) {
+                        sinkContext.fileMetricIncWithDetailStats(
+                                
StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, profile.getUid());
+                        sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                        
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
+                        
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                        return false;
+                    }
+                    sinkContext.fileMetricIncWithDetailStats(
+                            StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED, 
profile.getUid());
+                } else {
                     sinkContext.fileMetricIncWithDetailStats(
                             StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, 
profile.getUid());
                     sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
@@ -132,15 +145,6 @@ public class KafkaHandler implements MessageQueueHandler {
                     
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
-                topic = CommonConfigHolder.getInstance().getRandDefTopics();
-                if (StringUtils.isEmpty(topic)) {
-                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
-                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                    
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
-                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
-                    return false;
-                }
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
             } else {
                 topic = idConfig.getTopicName();
             }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 650aafb637..cdddc4e87a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -209,7 +209,20 @@ public class PulsarHandler implements MessageQueueHandler {
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getSinkIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
-                if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                // add default topics first
+                if 
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                    producerTopic = 
CommonConfigHolder.getInstance().getRandDefTopics();
+                    if (StringUtils.isEmpty(producerTopic)) {
+                        sinkContext.fileMetricIncWithDetailStats(
+                                
StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, profile.getUid());
+                        sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                        
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
+                        
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                        return false;
+                    }
+                    sinkContext.fileMetricIncWithDetailStats(
+                            StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED, 
profile.getUid());
+                } else {
                     sinkContext.fileMetricIncWithDetailStats(
                             StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, 
profile.getUid());
                     sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
@@ -217,15 +230,6 @@ public class PulsarHandler implements MessageQueueHandler {
                     
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
-                producerTopic = 
CommonConfigHolder.getInstance().getRandDefTopics();
-                if (StringUtils.isEmpty(producerTopic)) {
-                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
-                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                    
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
-                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
-                    return false;
-                }
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
             } else {
                 producerTopic = idConfig.getPulsarTopicName(tenant, namespace);
             }
@@ -250,13 +254,16 @@ public class PulsarHandler implements MessageQueueHandler 
{
                 } catch (Throwable ex) {
                     logger.error("create new producer failed", ex);
                 }
-            }
-            // create producer failed
-            if (producer == null) {
-                
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_PRODUCER_NULL,
 producerTopic);
-                sinkContext.processSendFail(profile, clusterName, 
producerTopic, 0,
-                        DataProxyErrCode.PRODUCER_IS_NULL, "");
-                return false;
+                // create producer failed
+                if (producer == null) {
+                    sinkContext.fileMetricIncWithDetailStats(
+                            StatConstants.EVENT_SINK_PRODUCER_CREATE_FAILURE, 
producerTopic);
+                    sinkContext.processSendFail(profile, clusterName, 
producerTopic, 0,
+                            DataProxyErrCode.PRODUCER_IS_NULL, "");
+                    return false;
+                }
+                sinkContext.fileMetricIncWithDetailStats(
+                        StatConstants.EVENT_SINK_PRODUCER_CREATE_SUCCESS, 
producerTopic);
             }
             // send
             if (profile instanceof SimplePackProfile) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 60b3910dc0..922ff492a1 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -189,7 +189,20 @@ public class TubeHandler implements MessageQueueHandler {
             IdTopicConfig idConfig = 
ConfigManager.getInstance().getSinkIdTopicConfig(
                     profile.getInlongGroupId(), profile.getInlongStreamId());
             if (idConfig == null) {
-                if 
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                // add default topics first
+                if 
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+                    topic = 
CommonConfigHolder.getInstance().getRandDefTopics();
+                    if (StringUtils.isEmpty(topic)) {
+                        sinkContext.fileMetricIncWithDetailStats(
+                                
StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, profile.getUid());
+                        sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
+                        
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
+                        
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+                        return false;
+                    }
+                    sinkContext.fileMetricIncWithDetailStats(
+                            StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED, 
profile.getUid());
+                } else {
                     sinkContext.fileMetricIncWithDetailStats(
                             StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING, 
profile.getUid());
                     sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
@@ -197,15 +210,6 @@ public class TubeHandler implements MessageQueueHandler {
                     
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
                     return false;
                 }
-                topic = CommonConfigHolder.getInstance().getRandDefTopics();
-                if (StringUtils.isEmpty(topic)) {
-                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
-                    sinkContext.addSendResultMetric(profile, clusterName, 
profile.getUid(), false, 0);
-                    
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
-                    
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
-                    return false;
-                }
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
             } else {
                 topic = idConfig.getTopicName();
             }
@@ -273,7 +277,8 @@ public class TubeHandler implements MessageQueueHandler {
                     
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
                     batchProfile.ack();
                 } else {
-                    
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
+                    
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
+                            topic + "." + result.getErrCode());
                     sinkContext.processSendFail(batchProfile, clusterName, 
topic, sendTime,
                             DataProxyErrCode.MQ_RETURN_ERROR, 
result.getErrMsg());
                     if (logCounter.shouldPrint()) {
@@ -284,7 +289,7 @@ public class TubeHandler implements MessageQueueHandler {
 
             @Override
             public void onException(Throwable ex) {
-                
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+                
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT,
 topic);
                 sinkContext.processSendFail(batchProfile, clusterName, topic, 
sendTime,
                         DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
                 if (logCounter.shouldPrint()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 814aae2620..07ed1efe01 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -90,8 +90,6 @@ public abstract class BaseSource
     protected String msgFactoryName;
     // message handler name
     protected String messageHandlerName;
-    // source default append attribute
-    protected String defAttr = "";
     // allowed max message length
     protected int maxMsgLength;
     // whether compress message
@@ -161,11 +159,6 @@ public abstract class BaseSource
         Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
                 SourceConstants.SRCCXT_MESSAGE_HANDLER_NAME + " config is 
blank");
         this.messageHandlerName = tmpVal;
-        // get default attributes
-        tmpVal = context.getString(SourceConstants.SRCCXT_DEF_ATTR);
-        if (StringUtils.isNotBlank(tmpVal)) {
-            this.defAttr = tmpVal.trim();
-        }
         // get allowed max message length
         this.maxMsgLength = ConfStringUtils.getIntValue(context,
                 SourceConstants.SRCCXT_MAX_MSG_LENGTH, 
SourceConstants.VAL_DEF_MAX_MSG_LENGTH);
@@ -371,10 +364,6 @@ public abstract class BaseSource
         return strPort;
     }
 
-    public String getDefAttr() {
-        return defAttr;
-    }
-
     public int getMaxMsgLength() {
         return maxMsgLength;
     }
@@ -417,9 +406,10 @@ public abstract class BaseSource
         }
     }
 
-    public void fileMetricIncDetailStats(String eventKey) {
+    public void fileMetricIncWithDetailStats(String eventKey, String 
detailInfoKey) {
         if (enableFileMetric) {
-            monitorStats.incDetailStats(eventKey);
+            monitorStats.incSumStats(eventKey);
+            monitorStats.incDetailStats(eventKey + "#" + detailInfoKey);
         }
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 61788496ae..37b48121b2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -308,12 +308,13 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
             source.addMetric(false, event.getBody().length, event);
             if (msgCodec.isNeedResp()) {
                 
msgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
-                        strBuff.append("Put event to channel failure: 
").append(ex.getMessage()).toString());
+                        strBuff.append("Put msg event to channel failure: 
").append(ex.getMessage()).toString());
                 strBuff.delete(0, strBuff.length());
                 responseV0Msg(channel, msgCodec, strBuff);
             }
             if (logCounter.shouldPrint()) {
-                logger.error("Error writing event to channel failure.", ex);
+                logger.error("Error writing msg event to channel failure, 
attrs={}",
+                        msgCodec.getAttr(), ex);
             }
         }
     }
@@ -428,8 +429,9 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         for (ProxyEvent event : events) {
             // get configured topic name
             String topic = 
configManager.getTopicName(event.getInlongGroupId(), event.getInlongStreamId());
-            if (StringUtils.isBlank(topic)) {
-                
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+            if (StringUtils.isEmpty(topic)) {
+                source.fileMetricIncWithDetailStats(
+                        StatConstants.EVENT_SOURCE_TOPIC_MISSING, 
event.getInlongGroupId());
                 source.addMetric(false, event.getBody().length, event);
                 this.responsePackage(ctx, ProxySdk.ResultCode.ERR_ID_ERROR, 
packObject);
                 return;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
index 98cf98b677..67758db01c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
@@ -33,8 +33,6 @@ public class SourceConstants {
     public static final String SRCCXT_MSG_FACTORY_NAME = "msg-factory-name";
     // message handler name
     public static final String SRCCXT_MESSAGE_HANDLER_NAME = 
"message-handler-name";
-    // default attributes
-    public static final String SRCCXT_DEF_ATTR = "attr";
     // max message length
     public static final String SRCCXT_MAX_MSG_LENGTH = "max-msg-length";
     // allowed max message length
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index bf7d92a190..7d98d199f3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -275,7 +275,7 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
         // get and check streamId
         String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
         if (StringUtils.isBlank(streamId)) {
-            
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_STREAMID_MISSING);
+            
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_STREAMID_MISSING, 
groupId);
             sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
                     strBuff.append("Field 
").append(HttpAttrConst.KEY_STREAM_ID)
                             .append(" must exist and not blank!").toString(),
@@ -284,8 +284,8 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
         }
         // get and check topicName
         String topicName = ConfigManager.getInstance().getTopicName(groupId, 
streamId);
-        if (StringUtils.isBlank(topicName)) {
-            
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+        if (StringUtils.isEmpty(topicName)) {
+            
source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING, 
groupId);
             sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
                     strBuff.append("Topic not configured for 
").append(HttpAttrConst.KEY_GROUP_ID)
                             .append("(").append(groupId).append("),")
@@ -308,13 +308,13 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
         String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
         if (StringUtils.isBlank(body)) {
             if (body == null) {
-                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_MISSING);
+                
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_MISSING, 
groupId);
                 sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
                         strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                 .append(" is not exist!").toString(),
                         isCloseCon, callback);
             } else {
-                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_BLANK);
+                
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_BLANK, 
groupId);
                 sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
                         strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                 .append(" is Blank!").toString(),
@@ -323,7 +323,7 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
             return;
         }
         if (body.length() > source.getMaxMsgLength()) {
-            source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_OVERMAX);
+            
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_OVERMAX, 
groupId);
             sendResponse(ctx, 
DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
                     strBuff.append("Error msg, the 
").append(HttpAttrConst.KEY_BODY)
                             .append(" length(").append(body.length())
@@ -377,7 +377,7 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
                     "b2b", dataTime, pkgTime, 1);
             source.addMetric(false, event.getBody().length, event);
             sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
-                    strBuff.append("Put event to channel failure: 
").append(ex.getMessage()).toString(), callback);
+                    strBuff.append("Put HTTP event to channel failure: 
").append(ex.getMessage()).toString(), callback);
             if (logCounter.shouldPrint()) {
                 logger.error("Error writing HTTP event to channel failure.", 
ex);
             }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
index 017191dc67..71014d3058 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
@@ -259,7 +259,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             String confStreamId;
             String strGroupIdNum = String.valueOf(this.groupIdNum);
             confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
-            if (StringUtils.isBlank(confGroupId)) {
+            if (StringUtils.isEmpty(confGroupId)) {
                 if (configManager.isGroupIdNumConfigEmpty()) {
                     
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
                     this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
@@ -291,7 +291,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             } else {
                 String strStreamIdNum = String.valueOf(this.streamIdNum);
                 confStreamId = 
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
-                if (StringUtils.isBlank(confStreamId)) {
+                if (StringUtils.isEmpty(confStreamId)) {
                     if (configManager.isStreamIdNumConfigEmpty()) {
                         
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
                         this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
@@ -327,8 +327,8 @@ public class CodecBinMsg extends AbsV0MsgCodec {
         }
         // get and check topic configure
         this.topicName = configManager.getTopicName(this.groupId, 
this.streamId);
-        if (StringUtils.isBlank(this.topicName)) {
-            
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+        if (StringUtils.isEmpty(this.topicName)) {
+            
source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING, 
this.groupId);
             this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
             this.errMsg = String.format("Topic not configured for 
groupId=(%s), streamId=(%s)",
                     this.groupId, this.streamId);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index c3bf9281ac..1b06d7e2f0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -152,8 +152,8 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         }
         // get and check topic configure
         String tmpTopicName = 
ConfigManager.getInstance().getTopicName(tmpGroupId, tmpStreamId);
-        if (StringUtils.isBlank(tmpTopicName)) {
-            
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+        if (StringUtils.isEmpty(tmpTopicName)) {
+            
source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING, 
tmpGroupId);
             this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
             this.errMsg = String.format(
                     "Topic not configured for groupId=(%s), streamId=(%s)", 
tmpGroupId, tmpStreamId);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
index ca349e9247..0060d96fdc 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
@@ -35,7 +35,6 @@ public class UdpSourceTest {
         map.put(ConfigConstants.CONFIG_PORT, "8080");
         map.put(ConfigConstants.CONFIG_HOST, "127.0.0.1");
         map.put(ConfigConstants.TOPIC, "topic");
-        map.put(ConfigConstants.ATTR, "{}");
         Context context = new Context(map);
         SimpleUdpSource udpSource = new SimpleUdpSource();
         udpSource.configure(context);

Reply via email to