This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8cde9f3b3 [INLONG-5941][DataProxy] Store the initial number of message
items in Map (#5942)
8cde9f3b3 is described below
commit 8cde9f3b30a20e43ab3c48f499fc32cc2856bee9
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Sep 20 12:57:03 2022 +0800
[INLONG-5941][DataProxy] Store the initial number of message items in Map
(#5942)
---
.../inlong/dataproxy/source/ServerMessageHandler.java | 14 +++-----------
1 file changed, 3 insertions(+), 11 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index e4a436448..a381f8c1c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -362,14 +362,12 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
for (Map.Entry<String, List<ProxyMessage>> streamIdEntry :
topicEntry.getValue().entrySet()) {
// build InLongMsg
String groupId = null;
- int streamMsgCnt = 0;
InLongMsg inLongMsg =
InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) ||
MsgType.MSG_MULTI_BODY.equals(msgType)) {
for (ProxyMessage message : streamIdEntry.getValue()) {
if (StringUtils.isEmpty(groupId)) {
groupId = message.getGroupId();
}
- streamMsgCnt++;
message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(1));
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
@@ -378,7 +376,6 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
if (StringUtils.isEmpty(groupId)) {
groupId = message.getGroupId();
}
- streamMsgCnt++;
inLongMsg.addMsg(message.getData());
}
} else {
@@ -386,15 +383,10 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
if (StringUtils.isEmpty(groupId)) {
groupId = message.getGroupId();
}
- streamMsgCnt++;
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
- if (recordMsgCnt != streamMsgCnt) {
- logger.debug("Found message count not equal, record={},
calculate value = {}",
- recordMsgCnt, streamMsgCnt);
- }
- commonAttrMap.put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(streamMsgCnt));
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(recordMsgCnt));
// build headers
Map<String, String> headers = new HashMap<>();
headers.put(AttributeConstants.GROUP_ID, groupId);
@@ -456,12 +448,12 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
this.addStatistics(true, data.length, event);
monitorIndex.addAndGet(strBuff.toString(),
- streamMsgCnt, 1, data.length, 0);
+ recordMsgCnt, 1, data.length, 0);
strBuff.delete(0, strBuff.length());
} catch (Throwable ex) {
logger.error("Error writting to channel,data will
discard.", ex);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
- monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0,
streamMsgCnt);
+ monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0,
recordMsgCnt);
this.addStatistics(false, data.length, event);
strBuff.delete(0, strBuff.length());
throw new ChannelException("ProcessEvent error can't write
event to channel.");