This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 083b1dfb76 [INLONG-8914][DataProxy] Optimize DataProxy event
statistics (#8915)
083b1dfb76 is described below
commit 083b1dfb76ffdd0ede39277d2afa9208ec3b64cb
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Sep 14 22:21:39 2023 +0800
[INLONG-8914][DataProxy] Optimize DataProxy event statistics (#8915)
---
.../inlong/dataproxy/config/ConfigManager.java | 4 +--
.../inlong/dataproxy/consts/ConfigConstants.java | 2 --
.../inlong/dataproxy/consts/StatConstants.java | 8 +++--
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 4 +--
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 24 +++++++------
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 41 +++++++++++++---------
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 29 ++++++++-------
.../apache/inlong/dataproxy/source/BaseSource.java | 16 ++-------
.../dataproxy/source/ServerMessageHandler.java | 10 +++---
.../inlong/dataproxy/source/SourceConstants.java | 2 --
.../source/httpMsg/HttpMessageHandler.java | 14 ++++----
.../inlong/dataproxy/source/v0msg/CodecBinMsg.java | 8 ++---
.../dataproxy/source/v0msg/CodecTextMsg.java | 4 +--
.../inlong/dataproxy/source/UdpSourceTest.java | 1 -
14 files changed, 86 insertions(+), 81 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index c72af67ae4..647a41936e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -359,8 +359,8 @@ public class ConfigManager {
String returnStr = EntityUtils.toString(response.getEntity());
long dltTime = System.currentTimeMillis() - startTime;
if (dltTime >=
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()) {
- LOG.warn("End to request {} to get config info:{}, WAIST
{} ms",
- url, returnStr, dltTime);
+ LOG.warn("End to request {} to get config info, WAIST {}
ms, over alarm value {} ms",
+ url, dltTime,
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("End to request {} to get config info:{},
WAIST {} ms",
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 47f2e9fcb9..0622f7aa3c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -37,8 +37,6 @@ public class ConfigConstants {
public static final String TOPIC = "topic";
- public static final String ATTR = "attr";
-
public static final String FILTER_EMPTY_MSG = "filter-empty-msg";
public static final String TCP_NO_DELAY = "tcpNoDelay";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index b67b3537fe..8617f74b8a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -41,7 +41,7 @@ public class StatConstants {
public static final java.lang.String EVENT_HTTP_LINK_UNKNOWN_EXCEPTION =
"http.link.unknown.exception";
public static final java.lang.String EVENT_HTTP_LINK_UNWRITABLE =
"http.link.unwritable";
// configure
- public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING =
"config.topic.missing";
+ public static final java.lang.String EVENT_SOURCE_TOPIC_MISSING =
"source.topic.missing";
public static final java.lang.String EVENT_CONFIG_IDNUM_EMPTY =
"config.idnum.empty";
public static final java.lang.String EVENT_CONFIG_GROUPIDNUM_MISSING =
"config.groupidnum.missing";
public static final java.lang.String EVENT_CONFIG_GROUP_IDNUM_INCONSTANT =
"config.group.idnum.incons";
@@ -91,12 +91,14 @@ public class StatConstants {
public static final java.lang.String EVENT_SINK_EVENT_V1_MALFORMED =
"sink.event.v1.malformed";
public static final java.lang.String EVENT_SINK_EVENT_TAKE_SUCCESS =
"sink.event.take.success";
public static final java.lang.String EVENT_SINK_EVENT_TAKE_FAILURE =
"sink.event.take.failure";
- public static final java.lang.String EVENT_SINK_EVENT_V1_FILE =
"sink.event.v1.file";
- public static final java.lang.String EVENT_SINK_EVENT_V0_FILE =
"sink.event.v1.file";
+ public static final java.lang.String EVENT_SINK_FILE_V1_TAKE_SUCCESS =
"sink.file.v1.take.success";
+ public static final java.lang.String EVENT_SINK_FILE_V0_TAKE_SUCCESS =
"sink.file.v0.take.success";
public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING =
"sink.topic.missing";
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING =
"default.topic.empty";
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED =
"default.topic.used";
public static final java.lang.String EVENT_SINK_PRODUCER_NULL =
"sink.producer.null";
+ public static final java.lang.String EVENT_SINK_PRODUCER_CREATE_SUCCESS =
"sink.producer.create.success";
+ public static final java.lang.String EVENT_SINK_PRODUCER_CREATE_FAILURE =
"sink.producer.create.failure";
public static final java.lang.String EVENT_SINK_CLUSTER_EMPTY =
"sink.cluster.empty";
public static final java.lang.String EVENT_SINK_CLUSTER_UNMATCHED =
"sink.cluster.unmatched";
public static final java.lang.String EVENT_SINK_CPRODUCER_NULL =
"sink.cluster.producer.null";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index a9897641fe..8b654a65bb 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -234,7 +234,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
ProxyEvent proxyEvent = new ProxyEvent(groupId, streamId,
msgTimeStr,
sourceIp, sourceTimeStr, event.getHeaders(),
event.getBody());
this.dispatchManager.addEvent(proxyEvent);
-
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_FILE);
+
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_FILE_V1_TAKE_SUCCESS);
} else {
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_MALFORMED);
}
@@ -243,7 +243,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
simpleEvent.setBody(event.getBody());
simpleEvent.setHeaders(event.getHeaders());
this.dispatchManager.addSimpleEvent(simpleEvent);
-
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V0_FILE);
+
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_FILE_V0_TAKE_SUCCESS);
}
tx.commit();
return Status.READY;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index b282cc31ca..e9ab048781 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -124,7 +124,20 @@ public class KafkaHandler implements MessageQueueHandler {
IdTopicConfig idConfig =
ConfigManager.getInstance().getSinkIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
- if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ // add default topics first
+ if
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ topic =
CommonConfigHolder.getInstance().getRandDefTopics();
+ if (StringUtils.isEmpty(topic)) {
+ sinkContext.fileMetricIncWithDetailStats(
+
StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, profile.getUid());
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED,
profile.getUid());
+ } else {
sinkContext.fileMetricIncWithDetailStats(
StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING,
profile.getUid());
sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
@@ -132,15 +145,6 @@ public class KafkaHandler implements MessageQueueHandler {
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
- topic = CommonConfigHolder.getInstance().getRandDefTopics();
- if (StringUtils.isEmpty(topic)) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
-
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
-
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
- return false;
- }
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
} else {
topic = idConfig.getTopicName();
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 650aafb637..cdddc4e87a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -209,7 +209,20 @@ public class PulsarHandler implements MessageQueueHandler {
IdTopicConfig idConfig =
ConfigManager.getInstance().getSinkIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
- if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ // add default topics first
+ if
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ producerTopic =
CommonConfigHolder.getInstance().getRandDefTopics();
+ if (StringUtils.isEmpty(producerTopic)) {
+ sinkContext.fileMetricIncWithDetailStats(
+
StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, profile.getUid());
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED,
profile.getUid());
+ } else {
sinkContext.fileMetricIncWithDetailStats(
StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING,
profile.getUid());
sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
@@ -217,15 +230,6 @@ public class PulsarHandler implements MessageQueueHandler {
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
- producerTopic =
CommonConfigHolder.getInstance().getRandDefTopics();
- if (StringUtils.isEmpty(producerTopic)) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
-
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
-
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
- return false;
- }
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
} else {
producerTopic = idConfig.getPulsarTopicName(tenant, namespace);
}
@@ -250,13 +254,16 @@ public class PulsarHandler implements MessageQueueHandler
{
} catch (Throwable ex) {
logger.error("create new producer failed", ex);
}
- }
- // create producer failed
- if (producer == null) {
-
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_PRODUCER_NULL,
producerTopic);
- sinkContext.processSendFail(profile, clusterName,
producerTopic, 0,
- DataProxyErrCode.PRODUCER_IS_NULL, "");
- return false;
+ // create producer failed
+ if (producer == null) {
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_PRODUCER_CREATE_FAILURE,
producerTopic);
+ sinkContext.processSendFail(profile, clusterName,
producerTopic, 0,
+ DataProxyErrCode.PRODUCER_IS_NULL, "");
+ return false;
+ }
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_PRODUCER_CREATE_SUCCESS,
producerTopic);
}
// send
if (profile instanceof SimplePackProfile) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 60b3910dc0..922ff492a1 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -189,7 +189,20 @@ public class TubeHandler implements MessageQueueHandler {
IdTopicConfig idConfig =
ConfigManager.getInstance().getSinkIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
- if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ // add default topics first
+ if
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ topic =
CommonConfigHolder.getInstance().getRandDefTopics();
+ if (StringUtils.isEmpty(topic)) {
+ sinkContext.fileMetricIncWithDetailStats(
+
StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING, profile.getUid());
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED,
profile.getUid());
+ } else {
sinkContext.fileMetricIncWithDetailStats(
StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING,
profile.getUid());
sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
@@ -197,15 +210,6 @@ public class TubeHandler implements MessageQueueHandler {
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
return false;
}
- topic = CommonConfigHolder.getInstance().getRandDefTopics();
- if (StringUtils.isEmpty(topic)) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
-
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(profile);
-
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
- return false;
- }
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
} else {
topic = idConfig.getTopicName();
}
@@ -273,7 +277,8 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.getMqZoneSink().releaseAcquiredSizePermit(batchProfile);
batchProfile.ack();
} else {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
+ topic + "." + result.getErrCode());
sinkContext.processSendFail(batchProfile, clusterName,
topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR,
result.getErrMsg());
if (logCounter.shouldPrint()) {
@@ -284,7 +289,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onException(Throwable ex) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT,
topic);
sinkContext.processSendFail(batchProfile, clusterName, topic,
sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 814aae2620..07ed1efe01 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -90,8 +90,6 @@ public abstract class BaseSource
protected String msgFactoryName;
// message handler name
protected String messageHandlerName;
- // source default append attribute
- protected String defAttr = "";
// allowed max message length
protected int maxMsgLength;
// whether compress message
@@ -161,11 +159,6 @@ public abstract class BaseSource
Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
SourceConstants.SRCCXT_MESSAGE_HANDLER_NAME + " config is
blank");
this.messageHandlerName = tmpVal;
- // get default attributes
- tmpVal = context.getString(SourceConstants.SRCCXT_DEF_ATTR);
- if (StringUtils.isNotBlank(tmpVal)) {
- this.defAttr = tmpVal.trim();
- }
// get allowed max message length
this.maxMsgLength = ConfStringUtils.getIntValue(context,
SourceConstants.SRCCXT_MAX_MSG_LENGTH,
SourceConstants.VAL_DEF_MAX_MSG_LENGTH);
@@ -371,10 +364,6 @@ public abstract class BaseSource
return strPort;
}
- public String getDefAttr() {
- return defAttr;
- }
-
public int getMaxMsgLength() {
return maxMsgLength;
}
@@ -417,9 +406,10 @@ public abstract class BaseSource
}
}
- public void fileMetricIncDetailStats(String eventKey) {
+ public void fileMetricIncWithDetailStats(String eventKey, String
detailInfoKey) {
if (enableFileMetric) {
- monitorStats.incDetailStats(eventKey);
+ monitorStats.incSumStats(eventKey);
+ monitorStats.incDetailStats(eventKey + "#" + detailInfoKey);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 61788496ae..37b48121b2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -308,12 +308,13 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
source.addMetric(false, event.getBody().length, event);
if (msgCodec.isNeedResp()) {
msgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
- strBuff.append("Put event to channel failure:
").append(ex.getMessage()).toString());
+ strBuff.append("Put msg event to channel failure:
").append(ex.getMessage()).toString());
strBuff.delete(0, strBuff.length());
responseV0Msg(channel, msgCodec, strBuff);
}
if (logCounter.shouldPrint()) {
- logger.error("Error writing event to channel failure.", ex);
+ logger.error("Error writing msg event to channel failure,
attrs={}",
+ msgCodec.getAttr(), ex);
}
}
}
@@ -428,8 +429,9 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
for (ProxyEvent event : events) {
// get configured topic name
String topic =
configManager.getTopicName(event.getInlongGroupId(), event.getInlongStreamId());
- if (StringUtils.isBlank(topic)) {
-
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ if (StringUtils.isEmpty(topic)) {
+ source.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SOURCE_TOPIC_MISSING,
event.getInlongGroupId());
source.addMetric(false, event.getBody().length, event);
this.responsePackage(ctx, ProxySdk.ResultCode.ERR_ID_ERROR,
packObject);
return;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
index 98cf98b677..67758db01c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
@@ -33,8 +33,6 @@ public class SourceConstants {
public static final String SRCCXT_MSG_FACTORY_NAME = "msg-factory-name";
// message handler name
public static final String SRCCXT_MESSAGE_HANDLER_NAME =
"message-handler-name";
- // default attributes
- public static final String SRCCXT_DEF_ATTR = "attr";
// max message length
public static final String SRCCXT_MAX_MSG_LENGTH = "max-msg-length";
// allowed max message length
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index bf7d92a190..7d98d199f3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -275,7 +275,7 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
// get and check streamId
String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
if (StringUtils.isBlank(streamId)) {
-
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_STREAMID_MISSING);
+
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_STREAMID_MISSING,
groupId);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
strBuff.append("Field
").append(HttpAttrConst.KEY_STREAM_ID)
.append(" must exist and not blank!").toString(),
@@ -284,8 +284,8 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
}
// get and check topicName
String topicName = ConfigManager.getInstance().getTopicName(groupId,
streamId);
- if (StringUtils.isBlank(topicName)) {
-
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ if (StringUtils.isEmpty(topicName)) {
+
source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING,
groupId);
sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
strBuff.append("Topic not configured for
").append(HttpAttrConst.KEY_GROUP_ID)
.append("(").append(groupId).append("),")
@@ -308,13 +308,13 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
if (StringUtils.isBlank(body)) {
if (body == null) {
-
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_MISSING);
+
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_MISSING,
groupId);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is not exist!").toString(),
isCloseCon, callback);
} else {
-
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_BLANK);
+
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_BLANK,
groupId);
sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is Blank!").toString(),
@@ -323,7 +323,7 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
return;
}
if (body.length() > source.getMaxMsgLength()) {
- source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_OVERMAX);
+
source.fileMetricIncWithDetailStats(StatConstants.EVENT_MSG_BODY_OVERMAX,
groupId);
sendResponse(ctx,
DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
strBuff.append("Error msg, the
").append(HttpAttrConst.KEY_BODY)
.append(" length(").append(body.length())
@@ -377,7 +377,7 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
"b2b", dataTime, pkgTime, 1);
source.addMetric(false, event.getBody().length, event);
sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
- strBuff.append("Put event to channel failure:
").append(ex.getMessage()).toString(), callback);
+ strBuff.append("Put HTTP event to channel failure:
").append(ex.getMessage()).toString(), callback);
if (logCounter.shouldPrint()) {
logger.error("Error writing HTTP event to channel failure.",
ex);
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
index 017191dc67..71014d3058 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
@@ -259,7 +259,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
String confStreamId;
String strGroupIdNum = String.valueOf(this.groupIdNum);
confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
- if (StringUtils.isBlank(confGroupId)) {
+ if (StringUtils.isEmpty(confGroupId)) {
if (configManager.isGroupIdNumConfigEmpty()) {
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
@@ -291,7 +291,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
} else {
String strStreamIdNum = String.valueOf(this.streamIdNum);
confStreamId =
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
- if (StringUtils.isBlank(confStreamId)) {
+ if (StringUtils.isEmpty(confStreamId)) {
if (configManager.isStreamIdNumConfigEmpty()) {
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
@@ -327,8 +327,8 @@ public class CodecBinMsg extends AbsV0MsgCodec {
}
// get and check topic configure
this.topicName = configManager.getTopicName(this.groupId,
this.streamId);
- if (StringUtils.isBlank(this.topicName)) {
-
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ if (StringUtils.isEmpty(this.topicName)) {
+
source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING,
this.groupId);
this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
this.errMsg = String.format("Topic not configured for
groupId=(%s), streamId=(%s)",
this.groupId, this.streamId);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index c3bf9281ac..1b06d7e2f0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -152,8 +152,8 @@ public class CodecTextMsg extends AbsV0MsgCodec {
}
// get and check topic configure
String tmpTopicName =
ConfigManager.getInstance().getTopicName(tmpGroupId, tmpStreamId);
- if (StringUtils.isBlank(tmpTopicName)) {
-
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ if (StringUtils.isEmpty(tmpTopicName)) {
+
source.fileMetricIncWithDetailStats(StatConstants.EVENT_SOURCE_TOPIC_MISSING,
tmpGroupId);
this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
this.errMsg = String.format(
"Topic not configured for groupId=(%s), streamId=(%s)",
tmpGroupId, tmpStreamId);
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
index ca349e9247..0060d96fdc 100644
---
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
+++
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
@@ -35,7 +35,6 @@ public class UdpSourceTest {
map.put(ConfigConstants.CONFIG_PORT, "8080");
map.put(ConfigConstants.CONFIG_HOST, "127.0.0.1");
map.put(ConfigConstants.TOPIC, "topic");
- map.put(ConfigConstants.ATTR, "{}");
Context context = new Context(map);
SimpleUdpSource udpSource = new SimpleUdpSource();
udpSource.configure(context);