This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dda08970c7 [INLONG-11410][DataProxy] Fixed the bug of excessive packet
length when Go SDK receives DataProxy response (#11411)
dda08970c7 is described below
commit dda08970c775075173995c19f9dbe2ccfa568be6
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Oct 25 12:58:11 2024 +0800
[INLONG-11410][DataProxy] Fixed the bug of excessive packet length when Go
SDK receives DataProxy response (#11411)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/dataproxy/consts/StatConstants.java | 1 +
.../dataproxy/source/ServerMessageHandler.java | 47 +++++++++++++---------
.../dataproxy/source/v0msg/CodecTextMsg.java | 8 +++-
3 files changed, 34 insertions(+), 22 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 01db0527e9..dc39f6c5a7 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -77,6 +77,7 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED =
"msg.txt.len.malformed";
public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED =
"msg.item.len.malformed";
public static final java.lang.String EVENT_MSG_TYPE_5_LEN_MALFORMED =
"msg.type5.len.malformed";
+ public static final java.lang.String EVENT_MSG_TYPE_5_CNT_UNEQUAL =
"msg.type5.cnt.unequal";
public static final java.lang.String EVENT_MSG_ATTR_INVALID =
"msg.attr.invalid";
public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID =
"msg.attr.order.noack";
public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID =
"msg.attr.proxy.noack";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index fcba591e65..3117890922 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -558,11 +558,14 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
*/
public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) {
// calculate total length
- // binTotalLen = mstType + uniq + attrsLen + attrs + magic
- int binTotalLen = 1 + 4 + 2 + 2;
+ int attrsLen = 0;
+ byte[] byTeAttrs = null;
if (null != attrs) {
- binTotalLen += attrs.length();
+ byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
+ attrsLen = byTeAttrs.length;
}
+ // binTotalLen = mstType + uniq + attrsLen + attrs + magic
+ int binTotalLen = 1 + 4 + 2 + attrsLen + 2;
// allocate buffer and write fields
ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
@@ -573,11 +576,9 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
uniq[3] = (byte) (uniqVal & 0xFF);
binBuffer.writeBytes(uniq);
- if (null != attrs) {
- binBuffer.writeShort(attrs.length());
- binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
- } else {
- binBuffer.writeShort(0x0);
+ binBuffer.writeShort(attrsLen);
+ if (attrsLen > 0) {
+ binBuffer.writeBytes(byTeAttrs);
}
binBuffer.writeShort(0xee01);
return binBuffer;
@@ -593,8 +594,10 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs)
{
int attrsLen = 0;
int bodyLen = 0;
+ byte[] byTeAttrs = null;
if (attrs != null) {
- attrsLen = attrs.length();
+ byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
+ attrsLen = byTeAttrs.length;
}
// backTotalLen = mstType + bodyLen + body + attrsLen + attrs
int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
@@ -604,7 +607,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
buffer.writeInt(bodyLen);
buffer.writeInt(attrsLen);
if (attrsLen > 0) {
- buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+ buffer.writeBytes(byTeAttrs);
}
return buffer;
}
@@ -620,15 +623,20 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs,
AbsV0MsgCodec msgObj) {
int attrsLen = 0;
int bodyLen = 0;
- byte[] backBody = null;
+ byte[] backBody;
+ byte[] byTeAttrs = null;
if (attrs != null) {
- attrsLen = attrs.length();
+ byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
+ attrsLen = byTeAttrs.length;
}
if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) {
backBody = msgObj.getOrigBody();
if (backBody != null) {
bodyLen = backBody.length;
}
+ } else {
+ backBody = new byte[]{50};
+ bodyLen = backBody.length;
}
// backTotalLen = mstType + bodyLen + body + attrsLen + attrs
int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
@@ -641,7 +649,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
}
buffer.writeInt(attrsLen);
if (attrsLen > 0) {
- buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+ buffer.writeBytes(byTeAttrs);
}
return buffer;
}
@@ -656,15 +664,16 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
*/
private ByteBuf buildHBRspPackage(byte[] attrData, byte version, int
loadValue) {
// calculate total length
- // binTotalLen = mstType + dataTime + body_ver + bodyLen + body +
attrsLen + attrs + magic
- int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;
+ int attrsLen = 0;
if (null != attrData) {
- binTotalLen += attrData.length;
+ attrsLen = attrData.length;
}
// check load value
if (loadValue == 0 || loadValue == (-1)) {
loadValue = 0xffff;
}
+ // binTotalLen = mstType + dataTime + version + bodyLen + body +
attrsLen + attrs + magic
+ int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + attrsLen + 2;
// allocate buffer and write fields
ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
@@ -673,11 +682,9 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
binBuffer.writeByte(version);
binBuffer.writeInt(2);
binBuffer.writeShort(loadValue);
- if (null != attrData) {
- binBuffer.writeShort(attrData.length);
+ binBuffer.writeShort(attrsLen);
+ if (attrsLen > 0) {
binBuffer.writeBytes(attrData);
- } else {
- binBuffer.writeShort(0x0);
}
binBuffer.writeShort(0xee01);
return binBuffer;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index 746cac9ca0..68fb139957 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -259,8 +259,12 @@ public class CodecTextMsg extends AbsV0MsgCodec {
inLongMsg.addMsg(mapJoiner.join(attrMap), bodyBuffer);
calcCnt++;
}
- attrMap.put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(calcCnt));
- this.msgCount = calcCnt;
+ if (calcCnt != this.msgCount) {
+ this.msgCount = calcCnt;
+ source.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_MSG_TYPE_5_CNT_UNEQUAL, groupId);
+ }
+ attrMap.put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(this.msgCount));
} else if
(MsgType.MSG_MULTI_BODY_ATTR.equals(MsgType.valueOf(msgType))) {
attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);