This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dda08970c7 [INLONG-11410][DataProxy] Fixed the bug of excessive packet 
length when Go SDK receives DataProxy response (#11411)
dda08970c7 is described below

commit dda08970c775075173995c19f9dbe2ccfa568be6
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Oct 25 12:58:11 2024 +0800

    [INLONG-11410][DataProxy] Fixed the bug of excessive packet length when Go 
SDK receives DataProxy response (#11411)
    
    
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/dataproxy/consts/StatConstants.java     |  1 +
 .../dataproxy/source/ServerMessageHandler.java     | 47 +++++++++++++---------
 .../dataproxy/source/v0msg/CodecTextMsg.java       |  8 +++-
 3 files changed, 34 insertions(+), 22 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 01db0527e9..dc39f6c5a7 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -77,6 +77,7 @@ public class StatConstants {
     public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED = 
"msg.txt.len.malformed";
     public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED = 
"msg.item.len.malformed";
     public static final java.lang.String EVENT_MSG_TYPE_5_LEN_MALFORMED = 
"msg.type5.len.malformed";
+    public static final java.lang.String EVENT_MSG_TYPE_5_CNT_UNEQUAL = 
"msg.type5.cnt.unequal";
     public static final java.lang.String EVENT_MSG_ATTR_INVALID = 
"msg.attr.invalid";
     public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID = 
"msg.attr.order.noack";
     public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID = 
"msg.attr.proxy.noack";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index fcba591e65..3117890922 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -558,11 +558,14 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
      */
     public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) {
         // calculate total length
-        // binTotalLen = mstType + uniq + attrsLen + attrs + magic
-        int binTotalLen = 1 + 4 + 2 + 2;
+        int attrsLen = 0;
+        byte[] byTeAttrs = null;
         if (null != attrs) {
-            binTotalLen += attrs.length();
+            byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
+            attrsLen = byTeAttrs.length;
         }
+        // binTotalLen = mstType + uniq + attrsLen + attrs + magic
+        int binTotalLen = 1 + 4 + 2 + attrsLen + 2;
         // allocate buffer and write fields
         ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
         binBuffer.writeInt(binTotalLen);
@@ -573,11 +576,9 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
         uniq[3] = (byte) (uniqVal & 0xFF);
         binBuffer.writeBytes(uniq);
-        if (null != attrs) {
-            binBuffer.writeShort(attrs.length());
-            binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
-        } else {
-            binBuffer.writeShort(0x0);
+        binBuffer.writeShort(attrsLen);
+        if (attrsLen > 0) {
+            binBuffer.writeBytes(byTeAttrs);
         }
         binBuffer.writeShort(0xee01);
         return binBuffer;
@@ -593,8 +594,10 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
     public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) 
{
         int attrsLen = 0;
         int bodyLen = 0;
+        byte[] byTeAttrs = null;
         if (attrs != null) {
-            attrsLen = attrs.length();
+            byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
+            attrsLen = byTeAttrs.length;
         }
         // backTotalLen = mstType + bodyLen + body + attrsLen + attrs
         int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
@@ -604,7 +607,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         buffer.writeInt(bodyLen);
         buffer.writeInt(attrsLen);
         if (attrsLen > 0) {
-            buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+            buffer.writeBytes(byTeAttrs);
         }
         return buffer;
     }
@@ -620,15 +623,20 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
     private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, 
AbsV0MsgCodec msgObj) {
         int attrsLen = 0;
         int bodyLen = 0;
-        byte[] backBody = null;
+        byte[] backBody;
+        byte[] byTeAttrs = null;
         if (attrs != null) {
-            attrsLen = attrs.length();
+            byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
+            attrsLen = byTeAttrs.length;
         }
         if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) {
             backBody = msgObj.getOrigBody();
             if (backBody != null) {
                 bodyLen = backBody.length;
             }
+        } else {
+            backBody = new byte[]{50};
+            bodyLen = backBody.length;
         }
         // backTotalLen = mstType + bodyLen + body + attrsLen + attrs
         int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
@@ -641,7 +649,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         }
         buffer.writeInt(attrsLen);
         if (attrsLen > 0) {
-            buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+            buffer.writeBytes(byTeAttrs);
         }
         return buffer;
     }
@@ -656,15 +664,16 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
      */
     private ByteBuf buildHBRspPackage(byte[] attrData, byte version, int 
loadValue) {
         // calculate total length
-        // binTotalLen = mstType + dataTime + body_ver + bodyLen + body + 
attrsLen + attrs + magic
-        int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;
+        int attrsLen = 0;
         if (null != attrData) {
-            binTotalLen += attrData.length;
+            attrsLen = attrData.length;
         }
         // check load value
         if (loadValue == 0 || loadValue == (-1)) {
             loadValue = 0xffff;
         }
+        // binTotalLen = mstType + dataTime + version + bodyLen + body + 
attrsLen + attrs + magic
+        int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + attrsLen + 2;
         // allocate buffer and write fields
         ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
         binBuffer.writeInt(binTotalLen);
@@ -673,11 +682,9 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         binBuffer.writeByte(version);
         binBuffer.writeInt(2);
         binBuffer.writeShort(loadValue);
-        if (null != attrData) {
-            binBuffer.writeShort(attrData.length);
+        binBuffer.writeShort(attrsLen);
+        if (attrsLen > 0) {
             binBuffer.writeBytes(attrData);
-        } else {
-            binBuffer.writeShort(0x0);
         }
         binBuffer.writeShort(0xee01);
         return binBuffer;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index 746cac9ca0..68fb139957 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -259,8 +259,12 @@ public class CodecTextMsg extends AbsV0MsgCodec {
                 inLongMsg.addMsg(mapJoiner.join(attrMap), bodyBuffer);
                 calcCnt++;
             }
-            attrMap.put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(calcCnt));
-            this.msgCount = calcCnt;
+            if (calcCnt != this.msgCount) {
+                this.msgCount = calcCnt;
+                source.fileMetricIncWithDetailStats(
+                        StatConstants.EVENT_MSG_TYPE_5_CNT_UNEQUAL, groupId);
+            }
+            attrMap.put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(this.msgCount));
         } else if 
(MsgType.MSG_MULTI_BODY_ATTR.equals(MsgType.valueOf(msgType))) {
             attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
             inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);

Reply via email to