This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c3807e6fd [INLONG-6550][SDK] Fix the decode heartbeat response error
(#6551)
c3807e6fd is described below
commit c3807e6fdcf9245d59f4ed5451cb09f16593d2cf
Author: xueyingzhang <[email protected]>
AuthorDate: Tue Nov 15 19:24:19 2022 +0800
[INLONG-6550][SDK] Fix the decode heartbeat response error (#6551)
---
.../inlong/sdk/dataproxy/codec/EncodeObject.java | 6 +-----
.../inlong/sdk/dataproxy/codec/ProtocolDecoder.java | 18 ++++++++++++++----
2 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
index 8634621cf..fd8ab8c8c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
@@ -67,11 +67,7 @@ public class EncodeObject {
private String errMsg;
private String dpIp;
- /* Used by de_serialization. msgtype=8*/
- public EncodeObject() {
- }
-
- /* Used by de_serialization. msgtype=7*/
+ /* Used by de_serialization. msgtype=7/8 */
public EncodeObject(String attributes) {
handleAttr(attributes);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 7bf9bd883..af829f8ef 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -93,11 +93,21 @@ public class ProtocolDecoder extends
MessageToMessageDecoder<ByteBuf> {
out.add(object);
} else if (msgType == 8) {
- int attrlen = buffer.getShort(4 + 1 + 4 + 1 + 4 + 2);
- buffer.skipBytes(13 + attrlen + 2);
- EncodeObject object = new EncodeObject();
+ // dataTime(4) + body_ver(1) + body_len(4) + body + attr_len(2) +
attr + magic(2)
+ buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and
body_len
+ final short load = buffer.readShort(); // read from body
+ int attrLen = buffer.readShort();
+ byte[] attrBytes = null;
+ if (attrLen > 0) {
+ attrBytes = new byte[attrLen];
+ buffer.readBytes(attrBytes);
+ }
+ buffer.skipBytes(2); // skip magic
+
+ String attrs = (attrBytes == null ? "" : new String(attrBytes,
StandardCharsets.UTF_8));
+ EncodeObject object = new EncodeObject(attrs);
object.setMsgtype(8);
- object.setLoad(buffer.getShort(4 + 1 + 4 + 1 + 4));
+ object.setLoad(load);
out.add(object);
}
}