This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cde9f3b3 [INLONG-5941][DataProxy] Store the initial number of message 
items in Map (#5942)
8cde9f3b3 is described below

commit 8cde9f3b30a20e43ab3c48f499fc32cc2856bee9
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Sep 20 12:57:03 2022 +0800

    [INLONG-5941][DataProxy] Store the initial number of message items in Map 
(#5942)
---
 .../inlong/dataproxy/source/ServerMessageHandler.java      | 14 +++-----------
 1 file changed, 3 insertions(+), 11 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index e4a436448..a381f8c1c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -362,14 +362,12 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
             for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : 
topicEntry.getValue().entrySet()) {
                 // build InLongMsg
                 String groupId = null;
-                int streamMsgCnt = 0;
                 InLongMsg inLongMsg = 
InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
                 if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || 
MsgType.MSG_MULTI_BODY.equals(msgType)) {
                     for (ProxyMessage message : streamIdEntry.getValue()) {
                         if (StringUtils.isEmpty(groupId)) {
                             groupId = message.getGroupId();
                         }
-                        streamMsgCnt++;
                         
message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(1));
                         
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
@@ -378,7 +376,6 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                         if (StringUtils.isEmpty(groupId)) {
                             groupId = message.getGroupId();
                         }
-                        streamMsgCnt++;
                         inLongMsg.addMsg(message.getData());
                     }
                 } else {
@@ -386,15 +383,10 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                         if (StringUtils.isEmpty(groupId)) {
                             groupId = message.getGroupId();
                         }
-                        streamMsgCnt++;
                         
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
                     }
                 }
-                if (recordMsgCnt != streamMsgCnt) {
-                    logger.debug("Found message count not equal, record={}, 
calculate value = {}",
-                            recordMsgCnt, streamMsgCnt);
-                }
-                commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(streamMsgCnt));
+                commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(recordMsgCnt));
                 // build headers
                 Map<String, String> headers = new HashMap<>();
                 headers.put(AttributeConstants.GROUP_ID, groupId);
@@ -456,12 +448,12 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                     monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                     this.addStatistics(true, data.length, event);
                     monitorIndex.addAndGet(strBuff.toString(),
-                            streamMsgCnt, 1, data.length, 0);
+                            recordMsgCnt, 1, data.length, 0);
                     strBuff.delete(0, strBuff.length());
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will 
discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
-                    monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, 
streamMsgCnt);
+                    monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, 
recordMsgCnt);
                     this.addStatistics(false, data.length, event);
                     strBuff.delete(0, strBuff.length());
                     throw new ChannelException("ProcessEvent error can't write 
event to channel.");

Reply via email to