This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bfbeae27e1 [INLONG-11698][SDK] Optimize TCP encode and decode
implementation (#11699)
bfbeae27e1 is described below
commit bfbeae27e125edd8b5849ffbf4cfa415193be5a3
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 21 16:39:09 2025 +0800
[INLONG-11698][SDK] Optimize TCP encode and decode implementation (#11699)
* [INLONG-11698][SDK] Optimize TCP encode and decode implementation
* [INLONG-11698][SDK] Optimize TCP encode and decode implementation
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/common/ErrorCode.java | 19 +
.../sdk/dataproxy/network/tcp/TcpCallFuture.java | 135 ++++++
.../sdk/dataproxy/network/tcp/TcpNettyClient.java | 480 +++++++++++++++++++++
.../dataproxy/network/tcp/codec/DecodeObject.java | 144 +++++++
.../dataproxy/network/tcp/codec/EncodeObject.java | 188 ++++++++
.../network/tcp/codec/ProtocolDecoder.java | 114 +++++
.../network/tcp/codec/ProtocolEncoder.java | 121 ++++++
7 files changed, 1201 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index 5f4cd0cdc8..d2cb671a58 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -62,6 +62,25 @@ public enum ErrorCode {
META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"),
//
+ CONNECTION_UNAVAILABLE(111, "Connection unavailable"),
+ CONNECTION_BREAK(112, "Connection break"),
+ CONNECTION_UNWRITABLE(113, "Connection unwritable"),
+ CONNECTION_WRITE_EXCEPTION(114, "Connection write exception"),
+ DUPLICATED_MESSAGE_ID(115, "Duplicated message id"),
+ SEND_WAIT_INTERRUPT(116, "Send wait interrupted"),
+ //
+ SEND_WAIT_TIMEOUT(121, "Send wait timeout"),
+ SEND_ON_EXCEPTION(122, "Send on exception"),
+
+ // dataproxy return failure
+ DP_SINK_SERVICE_UNREADY(151, "DataProxy sink service unready"),
+ DP_INVALID_ATTRS(152, "DataProxy return invalid attributes"),
+ DP_EMPTY_BODY(153, "DataProxy return empty body"),
+ DP_BODY_EXCEED_MAX_LEN(154, "DataProxy return body length over max"),
+ DP_UNCONFIGURED_GROUPID_OR_STREAMID(155, "DataProxy return unconfigured
groupId or streamId"),
+ //
+ DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"),
+
UNKNOWN_ERROR(9999, "Unknown error");
public static ErrorCode valueOf(int value) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
new file mode 100644
index 0000000000..90132018a7
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TCP Call Future class
+ *
+ * a future implementation for tcp RPCs.
+ */
+public class TcpCallFuture implements MsgSendCallback {
+
+ private final int messageId;
+ private final String groupId;
+ private final String streamId;
+ private final int msgCnt;
+ private final long rtTime;
+ private final String clientAddr;
+ private final long chanTerm;
+ private final String chanStr;
+ private final MsgSendCallback callback;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final boolean isAsyncCall;
+ private ProcessResult result = null;
+ private Throwable error = null;
+
+ public TcpCallFuture(EncodeObject encObject,
+ String clientAddr, long chanTerm, String chanStr, MsgSendCallback
callback) {
+ this.messageId = encObject.getMessageId();
+ this.groupId = encObject.getGroupId();
+ this.streamId = encObject.getStreamId();
+ this.rtTime = encObject.getRtms();
+ this.msgCnt = encObject.getMsgCnt();
+ this.clientAddr = clientAddr;
+ this.chanTerm = chanTerm;
+ this.chanStr = chanStr;
+ this.callback = callback;
+ this.isAsyncCall = (callback != null);
+ }
+
+ @Override
+ public void onMessageAck(ProcessResult result) {
+ this.result = result;
+ latch.countDown();
+ if (isAsyncCall) {
+ callback.onMessageAck(result);
+ }
+ }
+
+ @Override
+ public void onException(Throwable ex) {
+ this.error = ex;
+ latch.countDown();
+ if (isAsyncCall) {
+ callback.onException(error);
+ }
+ }
+
+ public boolean get(ProcessResult processResult, long timeout, TimeUnit
unit) {
+ try {
+ if (latch.await(timeout, unit)) {
+ if (error != null) {
+ return
processResult.setFailResult(ErrorCode.SEND_ON_EXCEPTION, error.getMessage());
+ }
+ return processResult.setFailResult(result);
+ } else {
+ return
processResult.setFailResult(ErrorCode.SEND_WAIT_TIMEOUT);
+ }
+ } catch (Throwable ex) {
+ if (ex instanceof InterruptedException) {
+ return
processResult.setFailResult(ErrorCode.SEND_WAIT_INTERRUPT);
+ } else {
+ return processResult.setFailResult(ErrorCode.UNKNOWN_ERROR,
ex.getMessage());
+ }
+ }
+ }
+
+ public int getMessageId() {
+ return messageId;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public int getMsgCnt() {
+ return msgCnt;
+ }
+
+ public long getRtTime() {
+ return rtTime;
+ }
+
+ public String getClientAddr() {
+ return clientAddr;
+ }
+
+ public String getChanStr() {
+ return chanStr;
+ }
+
+ public long getChanTerm() {
+ return chanTerm;
+ }
+
+ public boolean isAsyncCall() {
+ return isAsyncCall;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
new file mode 100644
index 0000000000..29899e6f58
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.AuthzUtils;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * TCP Netty client class
+ *
+ * Used to manage TCP netty client, including connection, closing, keep-alive,
sending status check, etc.
+ */
+public class TcpNettyClient {
+
+ private final static int CLIENT_FAIL_CONNECT_WAIT_CNT = 3;
+ private static final Logger logger =
LoggerFactory.getLogger(TcpNettyClient.class);
+ private static final LogCounter conExptCnt = new LogCounter(10, 100000, 60
* 1000L);
+ private static final LogCounter hbExptCnt = new LogCounter(10, 100000, 60
* 1000L);
+ private final static int CLIENT_STATUS_INIT = -1;
+ private final static int CLIENT_STATUS_READY = 0;
+ private final static int CLIENT_STATUS_FROZEN = 1;
+ private final static int CLIENT_STATUS_DEAD = 2;
+ private final static int CLIENT_STATUS_BUSY = 3;
+
+ private final String senderId;
+ private final TcpMsgSenderConfig tcpConfig;
+ private final Bootstrap bootstrap;
+ private final HostInfo hostInfo;
+ private final AtomicInteger conStatus = new
AtomicInteger(CLIENT_STATUS_INIT);
+ private final AtomicLong channelTermId = new AtomicLong(0);
+ private final AtomicInteger clientUsingCnt = new AtomicInteger(0);
+ private final AtomicInteger msgSentCnt = new AtomicInteger(0);
+ private final AtomicInteger msgInflightCnt = new AtomicInteger(0);
+ private final AtomicLong chanInvalidTime = new AtomicLong(0);
+ private final AtomicInteger conFailCnt = new AtomicInteger(0);
+ private final AtomicLong lstConFailTime = new AtomicLong(0);
+ private final AtomicInteger chanSyncTimeoutCnt = new AtomicInteger(0);
+ private final AtomicLong chanFstBusyTime = new AtomicLong(0);
+ private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
+ private Channel channel = null;
+ private String channelStr = "";
+ private int lstRoundSentCnt = -1;
+ private int clientIdleRounds = 0;
+ private long fstIdleTime = 0;
+
+ public TcpNettyClient(String senderId,
+ Bootstrap bootstrap, HostInfo hostInfo, TcpMsgSenderConfig
tcpConfig) {
+ this.conStatus.set(CLIENT_STATUS_INIT);
+ this.hostInfo = hostInfo;
+ this.tcpConfig = tcpConfig;
+ this.senderId = senderId;
+ this.bootstrap = bootstrap;
+ }
+
+ public boolean connect(boolean needPrint, long termId) {
+ // Initial status
+ this.conStatus.set(CLIENT_STATUS_INIT);
+ long curTime = System.currentTimeMillis();
+ final CountDownLatch awaitLatch = new CountDownLatch(1);
+ // Build connect to server
+ ChannelFuture future = bootstrap.connect(
+ new InetSocketAddress(hostInfo.getHostName(),
hostInfo.getPortNumber()));
+ future.addListener(new ChannelFutureListener() {
+
+ public void operationComplete(ChannelFuture arg0) throws Exception
{
+ awaitLatch.countDown();
+ }
+ });
+ try {
+ // Wait until the connection is built.
+ awaitLatch.await(tcpConfig.getConnectTimeoutMs(),
TimeUnit.MILLISECONDS);
+ } catch (Throwable ex) {
+ if (conExptCnt.shouldPrint()) {
+ logger.warn("NettyClient({}) connect to {} exception",
+ senderId, hostInfo.getReferenceName(), ex);
+ }
+ return false;
+ }
+ // Return if no connection is built.
+ if (!future.isSuccess()) {
+ this.conFailCnt.getAndIncrement();
+ this.lstConFailTime.set(System.currentTimeMillis());
+ if (conExptCnt.shouldPrint()) {
+ logger.warn("NettyClient({}) connect to {} failure, wast {}ms",
+ senderId, hostInfo.getReferenceName(),
(System.currentTimeMillis() - curTime));
+ }
+ return false;
+ }
+ this.channelTermId.set(termId);
+ this.channel = future.channel();
+ this.channelStr = this.channel.toString();
+ this.conFailCnt.set(0);
+ this.msgSentCnt.set(0);
+ this.chanSyncTimeoutCnt.set(0);
+ this.msgInflightCnt.set(0);
+ this.conStatus.set(CLIENT_STATUS_READY);
+ if (needPrint) {
+ logger.info("NettyClient({}) connect to {} success, wast {}ms",
+ senderId, channel.toString(), (System.currentTimeMillis()
- curTime));
+ }
+ return true;
+ }
+
+ public boolean close(boolean needPrint) {
+ this.conStatus.set(CLIENT_STATUS_DEAD);
+ long curTime = System.currentTimeMillis();
+ this.chanInvalidTime.set(curTime);
+ final CountDownLatch awaitLatch = new CountDownLatch(1);
+ boolean ret = true;
+ String channelStr = "";
+ try {
+ if (channel == null) {
+ channelStr = hostInfo.getReferenceName();
+ } else {
+ channelStr = channel.toString();
+ ChannelFuture future = channel.close();
+ future.addListener(new ChannelFutureListener() {
+
+ public void operationComplete(ChannelFuture arg0) throws
Exception {
+ awaitLatch.countDown();
+ }
+ });
+ // Wait until the connection is close.
+ awaitLatch.await(tcpConfig.getConCloseWaitPeriodMs(),
TimeUnit.MILLISECONDS);
+ // Return if close this connection fail.
+ if (!future.isSuccess()) {
+ ret = false;
+ }
+ }
+ } catch (Throwable ex) {
+ if (conExptCnt.shouldPrint()) {
+ logger.warn("NettyClient({}) close {} exception", senderId,
channelStr, ex);
+ }
+ ret = false;
+ } finally {
+ this.channel = null;
+ this.channelStr = "";
+ this.msgInflightCnt.set(0);
+ }
+ if (needPrint) {
+ if (ret) {
+ logger.info("NettyClient({}) close {} success, wast {}ms",
+ this.senderId, channelStr, (System.currentTimeMillis()
- curTime));
+ } else {
+ logger.info("NettyClient({}) close {} failure, wast {}ms",
+ this.senderId, channelStr, (System.currentTimeMillis()
- curTime));
+ }
+ }
+ return ret;
+ }
+
+ public boolean reconnect(boolean needPrint, long termId) {
+ long curTime = System.currentTimeMillis();
+ if ((this.conFailCnt.get() >= CLIENT_FAIL_CONNECT_WAIT_CNT)
+ && (curTime - lstConFailTime.get() <
this.tcpConfig.getReconFailWaitMs())) {
+ return false;
+ }
+ int curStatus = this.conStatus.get();
+ if (curStatus == CLIENT_STATUS_READY) {
+ return true;
+ } else if (curStatus == CLIENT_STATUS_BUSY) {
+ if (curTime - this.chanInvalidTime.get() <
this.tcpConfig.getBusyReconnectWaitMs()) {
+ return false;
+ }
+ } else if (curStatus == CLIENT_STATUS_FROZEN) {
+ if (curTime - this.chanInvalidTime.get() <
this.tcpConfig.getFrozenReconnectWaitMs()) {
+ return false;
+ }
+ }
+ rw.writeLock().lock();
+ try {
+ if (this.conStatus.get() == CLIENT_STATUS_READY) {
+ return true;
+ }
+ curTime = System.currentTimeMillis();
+ this.close(false);
+ if (this.connect(false, termId)) {
+ if (needPrint) {
+ logger.info("NettyClient({}) re-connect to {} success,
wast {}ms",
+ senderId, this.channel.toString(),
System.currentTimeMillis() - curTime);
+ }
+ return true;
+ } else {
+ if (needPrint) {
+ logger.info("NettyClient({}) re-connect to {} failure",
+ senderId, hostInfo.getReferenceName());
+ }
+ return false;
+ }
+ } finally {
+ rw.writeLock().unlock();
+ }
+ }
+
+ public int incClientUsingCnt() {
+ return clientUsingCnt.incrementAndGet();
+ }
+
+ public int getClientUsingCnt() {
+ return clientUsingCnt.get();
+ }
+
+ public int decClientUsingCnt() {
+ return clientUsingCnt.decrementAndGet();
+ }
+
+ public boolean write(long termId, EncodeObject encodeObject, ProcessResult
procResult) {
+ if (this.conStatus.get() != CLIENT_STATUS_READY) {
+ return procResult.setFailResult(ErrorCode.CONNECTION_UNAVAILABLE);
+ }
+ this.rw.readLock().lock();
+ if (this.conStatus.get() != CLIENT_STATUS_READY) {
+ return procResult.setFailResult(ErrorCode.CONNECTION_UNAVAILABLE);
+ }
+ if (this.channelTermId.get() != termId) {
+ return procResult.setFailResult(ErrorCode.CONNECTION_BREAK);
+ }
+ if (!(this.channel.isOpen()
+ && this.channel.isActive() && this.channel.isWritable())) {
+ return procResult.setFailResult(ErrorCode.CONNECTION_UNWRITABLE);
+ }
+ try {
+ this.msgSentCnt.incrementAndGet();
+ this.channel.writeAndFlush(encodeObject);
+ this.msgInflightCnt.incrementAndGet();
+ } catch (Throwable ex) {
+ if (conExptCnt.shouldPrint()) {
+ logger.warn("NettyClient({}) write {} exception",
+ this.senderId, this.channel.toString(), ex);
+ }
+ return
procResult.setFailResult(ErrorCode.CONNECTION_WRITE_EXCEPTION, ex.getMessage());
+ } finally {
+ this.rw.readLock().unlock();
+ }
+ return procResult.setSuccess();
+ }
+
+ public void setFrozen(long termId) {
+ if (this.channelTermId.get() != termId
+ || this.conStatus.get() != CLIENT_STATUS_READY) {
+ return;
+ }
+ boolean changed = false;
+ rw.readLock().lock();
+ try {
+ if (this.channelTermId.get() != termId) {
+ return;
+ }
+ int curStatus = this.conStatus.get();
+ if (curStatus == CLIENT_STATUS_READY) {
+ this.conStatus.compareAndSet(curStatus, CLIENT_STATUS_FROZEN);
+ this.chanInvalidTime.set(System.currentTimeMillis());
+ changed = true;
+ }
+ } finally {
+ rw.readLock().unlock();
+ }
+ if (changed) {
+ logger.warn("NettyClient({}) set {} frozen!", senderId,
hostInfo.getReferenceName());
+ }
+ }
+
+ public void setBusy(long termId) {
+ if (this.channelTermId.get() != termId
+ || this.conStatus.get() != CLIENT_STATUS_READY) {
+ return;
+ }
+ boolean changed = false;
+ long befTime;
+ int curTimeoutCnt;
+ long curTime = System.currentTimeMillis();
+ rw.readLock().lock();
+ try {
+ if (this.channelTermId.get() != termId) {
+ return;
+ }
+ befTime = this.chanFstBusyTime.get();
+ if (curTime - befTime >= tcpConfig.getSyncMsgTimeoutChkDurMs()) {
+ if (this.chanFstBusyTime.compareAndSet(befTime, curTime)) {
+ this.chanSyncTimeoutCnt.set(0);
+ }
+ }
+ curTimeoutCnt = this.chanSyncTimeoutCnt.incrementAndGet();
+ if (tcpConfig.getMaxAllowedSyncMsgTimeoutCnt() >= 0
+ && curTimeoutCnt <
tcpConfig.getMaxAllowedSyncMsgTimeoutCnt()) {
+ return;
+ }
+ int curStatus = this.conStatus.get();
+ if (curStatus == CLIENT_STATUS_READY) {
+ this.conStatus.compareAndSet(curStatus, CLIENT_STATUS_BUSY);
+ this.chanInvalidTime.set(System.currentTimeMillis());
+ changed = true;
+ }
+ } finally {
+ rw.readLock().unlock();
+ }
+ if (changed) {
+ logger.warn("NettyClient({}) set {} busy!", senderId,
hostInfo.getReferenceName());
+ }
+ }
+
+ public boolean isActive() {
+ if (this.conStatus.get() != CLIENT_STATUS_READY) {
+ return false;
+ }
+ rw.readLock().lock();
+ try {
+ return ((this.conStatus.get() == CLIENT_STATUS_READY)
+ && channel != null && channel.isOpen() &&
channel.isActive());
+ } finally {
+ rw.readLock().unlock();
+ }
+ }
+
+ public void sendHeartBeatMsg(ProcessResult procResult) {
+ if (!isActive()) {
+ logger.warn("NettyClient({}) to {} hb inActive",
+ this.senderId, hostInfo.getReferenceName());
+ return;
+ }
+ if (!channel.isWritable()) {
+ if (hbExptCnt.shouldPrint()) {
+ logger.warn("NettyClient({}) to {} hb write_over_water",
this.senderId, channelStr);
+ }
+ return;
+ }
+ EncodeObject encodeObject = buildHeartBeatMsg(this.senderId,
tcpConfig);
+ if (encodeObject == null) {
+ if (hbExptCnt.shouldPrint()) {
+ logger.warn("NettyClient({}) to {} hb failure:{}!",
+ this.senderId, channelStr, procResult.getErrMsg());
+ }
+ }
+ try {
+ write(channelTermId.get(), encodeObject, procResult);
+ } catch (Throwable ex) {
+ if (hbExptCnt.shouldPrint()) {
+ logger.warn("NettyClient({}) send to {} hb exception ",
+ this.senderId, channelStr, ex);
+ }
+ }
+ procResult.setSuccess();
+ }
+
+ public boolean isIdleClient(long curTime) {
+ int curSentCnt = this.msgSentCnt.get();
+ if (curSentCnt != this.lstRoundSentCnt) {
+ this.lstRoundSentCnt = curSentCnt;
+ this.clientIdleRounds = 0;
+ return false;
+ }
+ if (this.clientIdleRounds++ == 0) {
+ this.fstIdleTime = curTime;
+ return false;
+ }
+ return curTime - this.fstIdleTime >= 30000L;
+ }
+
+ public String getClientAddr() {
+ return hostInfo.getReferenceName();
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public String getChanStr() {
+ return channelStr;
+ }
+
+ public void decInFlightMsgCnt(long termId) {
+ if (this.channelTermId.get() != termId) {
+ return;
+ }
+ this.msgInflightCnt.decrementAndGet();
+ }
+
+ public int getMsgInflightCnt() {
+ return msgInflightCnt.get();
+ }
+
+ public boolean isConFailNodes() {
+ return (conFailCnt.get() >= CLIENT_FAIL_CONNECT_WAIT_CNT);
+ }
+
+ public long getChanTermId() {
+ return channelTermId.get();
+ }
+
+ public long getChanInvalidTime() {
+ return chanInvalidTime.get();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ TcpNettyClient other = (TcpNettyClient) obj;
+ if (channel == null) {
+ return other.channel == null;
+ } else {
+ return channel.equals(other.channel);
+ }
+ }
+
+ private EncodeObject buildHeartBeatMsg(String senderId, ProxyClientConfig
configure) {
+ EncodeObject encObject = new EncodeObject(null, null,
+ MsgType.MSG_BIN_HEARTBEAT, System.currentTimeMillis());
+ encObject.setMessageIdInfo(0);
+ int intMsgType = encObject.getMsgType().getValue();
+ Map<String, String> newAttrs = new HashMap<>();
+ if (configure.isEnableReportAuthz()) {
+ intMsgType |= SdkConsts.FLAG_ALLOW_AUTH;
+ long timestamp = System.currentTimeMillis();
+ int nonce = new
SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
+ String signature = AuthzUtils.generateSignature(
+ configure.getRptUserName(), timestamp, nonce,
configure.getRptSecretKey());
+ if (StringUtils.isBlank(signature)) {
+ return null;
+ }
+ newAttrs.put("_userName", configure.getRptUserName());
+ newAttrs.put("_clientIP", ProxyUtils.getLocalIp());
+ newAttrs.put("_signature", signature);
+ newAttrs.put("_timeStamp", String.valueOf(timestamp));
+ newAttrs.put("_nonce", String.valueOf(nonce));
+ }
+ encObject.setAttrInfo(intMsgType, false, null, newAttrs);
+ return encObject;
+ }
+
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
new file mode 100644
index 0000000000..669f8330de
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+import com.google.common.base.Splitter;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Decode Object class
+ *
+ * Used to carry the decoded information of the response
+ */
+public class DecodeObject {
+
+ private static final Splitter.MapSplitter MAP_SPLITTER =
+ Splitter.on(AttributeConstants.SEPARATOR).trimResults()
+
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
+ private final MsgType msgType;
+ private int messageId;
+ private String dpIp;
+ private ProcessResult procResult;
+ private String addErrMsg;
+ private Map<String, String> retAttr;
+
+ public DecodeObject(MsgType msgType, String attributes) {
+ this.msgType = msgType;
+ handleAttr(attributes);
+ }
+
+ public DecodeObject(MsgType msgType, int messageId, String attributes) {
+ this.msgType = msgType;
+ this.messageId = messageId;
+ handleAttr(attributes);
+ }
+
+ public MsgType getMsgType() {
+ return msgType;
+ }
+
+ public int getMessageId() {
+ return messageId;
+ }
+
+ public String getDpIp() {
+ return dpIp;
+ }
+
+ public ProcessResult getSendResult() {
+ return procResult;
+ }
+
+ public String getAddErrMsg() {
+ return addErrMsg;
+ }
+
+ public Map<String, String> getRetAttr() {
+ return retAttr;
+ }
+
+ private void handleAttr(String attributes) {
+ if (StringUtils.isBlank(attributes)) {
+ return;
+ }
+ retAttr = new HashMap<>(MAP_SPLITTER.split(attributes));
+ if (retAttr.containsKey(AttributeConstants.MESSAGE_ID)) {
+ this.messageId =
Integer.parseInt(retAttr.get(AttributeConstants.MESSAGE_ID));
+ }
+ dpIp = retAttr.get(AttributeConstants.MESSAGE_DP_IP);
+
+ String errCode =
retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+ // errCode is empty or equals 0 -> success
+ if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
+ this.procResult = new ProcessResult(ErrorCode.OK);
+ } else {
+ // get errMsg
+ this.addErrMsg =
retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+ if (StringUtils.isBlank(addErrMsg)) {
+ this.addErrMsg =
DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg();
+ }
+ // sendResult
+ this.procResult = convertToSendResult(Integer.parseInt(errCode));
+ }
+ }
+
+ private ProcessResult convertToSendResult(int errCode) {
+ DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode);
+ switch (dpErrCode) {
+ case SINK_SERVICE_UNREADY:
+ return new ProcessResult(ErrorCode.DP_SINK_SERVICE_UNREADY);
+
+ case MISS_REQUIRED_GROUPID_ARGUMENT:
+ case MISS_REQUIRED_STREAMID_ARGUMENT:
+ case MISS_REQUIRED_DT_ARGUMENT:
+ case UNSUPPORTED_EXTEND_FIELD_VALUE:
+ return new ProcessResult(ErrorCode.DP_INVALID_ATTRS,
String.valueOf(dpErrCode));
+
+ case MISS_REQUIRED_BODY_ARGUMENT:
+ case EMPTY_MSG:
+ return new ProcessResult(ErrorCode.DP_EMPTY_BODY,
String.valueOf(dpErrCode));
+
+ case BODY_EXCEED_MAX_LEN:
+ return new ProcessResult(ErrorCode.DP_BODY_EXCEED_MAX_LEN);
+
+ case UNCONFIGURED_GROUPID_OR_STREAMID:
+ return new
ProcessResult(ErrorCode.DP_UNCONFIGURED_GROUPID_OR_STREAMID);
+
+ case PUT_EVENT_TO_CHANNEL_FAILURE:
+ case NO_AVAILABLE_PRODUCER:
+ case PRODUCER_IS_NULL:
+ case SEND_REQUEST_TO_MQ_FAILURE:
+ case MQ_RETURN_ERROR:
+ case DUPLICATED_MESSAGE:
+ return new ProcessResult(ErrorCode.DP_RECEIVE_FAILURE,
String.valueOf(dpErrCode));
+
+ default:
+ return new ProcessResult(ErrorCode.UNKNOWN_ERROR,
String.valueOf(dpErrCode));
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
new file mode 100644
index 0000000000..a32227c1af
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.msg.MsgType;
+
+import com.google.common.base.Joiner;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Encode Object class
+ *
+ * Used to encapsulate the reported event information to be sent
+ */
+public class EncodeObject {
+
+ private static final Joiner.MapJoiner mapJoiner =
Joiner.on(AttributeConstants.SEPARATOR)
+ .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
+ private final MsgType msgType;
+ private int intMsgType;
+ private final String groupId;
+ private final String streamId;
+ private final long dtMs;
+ private final long rtms;
+ private int messageId;
+ private int msgCnt = 0;
+ private int extField = 0;
+ private int attrDataLength = 0;
+ private byte[] attrData = null;
+ private int bodyDataLength = 0;
+ private byte[] bodyData = null;
+ private int groupIdNum = 0;
+ private int streamIdNum = 0;
+ //
+ private final Map<String, String> attrMap = new HashMap<>();
+ private boolean compress;
+ private byte[] aesKey;
+
+ public EncodeObject(String groupId, String streamId, MsgType msgType, long
dtMs) {
+ this.groupId = groupId;
+ this.streamId = streamId;
+ this.msgType = msgType;
+ this.intMsgType = this.msgType.getValue();
+ this.rtms = System.currentTimeMillis();
+ if (this.msgType == MsgType.MSG_BIN_MULTI_BODY) {
+ this.dtMs = dtMs / 1000;
+ } else {
+ this.dtMs = dtMs;
+ }
+ }
+
+ public void setGroupAndStreamId2Num(int groupIdNum, int streamIdNum) {
+ this.groupIdNum = groupIdNum;
+ this.streamIdNum = streamIdNum;
+ }
+
+ public void setExtField(int extField) {
+ this.extField = extField;
+ }
+
+ public void setAttrInfo(int intMsgType, boolean isCompress, byte[] aesKey,
Map<String, String> tgtAttrs) {
+ this.intMsgType = intMsgType;
+ this.compress = isCompress;
+ this.aesKey = aesKey;
+ if (tgtAttrs != null && !tgtAttrs.isEmpty()) {
+ for (Map.Entry<String, String> entry : tgtAttrs.entrySet()) {
+ if (entry == null || entry.getKey() == null) {
+ continue;
+ }
+ this.attrMap.put(entry.getKey(), entry.getValue());
+ }
+ String preAttrStr = mapJoiner.join(this.attrMap);
+ this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8);
+ this.attrDataLength = this.attrData.length;
+ }
+ }
+
+ public void setMessageIdInfo(int messageId) {
+ this.messageId = messageId;
+ if (msgType == MsgType.MSG_ACK_SERVICE
+ || msgType == MsgType.MSG_MULTI_BODY) {
+ this.attrMap.put(AttributeConstants.MESSAGE_ID,
String.valueOf(this.messageId));
+ String preAttrStr = mapJoiner.join(this.attrMap);
+ this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8);
+ this.attrDataLength = this.attrData.length;
+ }
+ }
+
+ public void setBodyData(int msgCnt, byte[] bodyBytes) {
+ this.msgCnt = msgCnt;
+ this.bodyData = bodyBytes;
+ if (this.bodyData != null) {
+ this.bodyDataLength = this.bodyData.length;
+ }
+ }
+
+ public MsgType getMsgType() {
+ return msgType;
+ }
+
+ public int getIntMsgType() {
+ return intMsgType;
+ }
+
+ public int getMessageId() {
+ return messageId;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public long getRtms() {
+ return rtms;
+ }
+
+ public int getStreamIdNum() {
+ return streamIdNum;
+ }
+
+ public int getGroupIdNum() {
+ return groupIdNum;
+ }
+
+ public boolean isCompress() {
+ return compress;
+ }
+
+ public byte[] getAesKey() {
+ return aesKey;
+ }
+
+ public int getBodyDataLength() {
+ return bodyDataLength;
+ }
+
+ public byte[] getBodyData() {
+ return bodyData;
+ }
+
+ public int getAttrDataLength() {
+ return attrDataLength;
+ }
+
+ public byte[] getAttrData() {
+ return attrData;
+ }
+
+ public int getExtField() {
+ return extField;
+ }
+
+ public int getMsgSize() {
+ return attrDataLength + bodyDataLength;
+ }
+
+ public int getMsgCnt() {
+ return msgCnt;
+ }
+
+ public long getDtMs() {
+ return dtMs;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java
new file mode 100644
index 0000000000..2132c72747
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * TCP protocol decoder class
+ *
+ * Used to decode the response package returned from DataProxy
+ */
+public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ProtocolDecoder.class);
+ private static final LogCounter decExptCounter = new LogCounter(10,
200000, 60 * 1000L);
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx,
+ ByteBuf buffer, List<Object> out) throws Exception {
+ buffer.markReaderIndex();
+ // totallen
+ int totalLen = buffer.readInt();
+ if (totalLen != buffer.readableBytes()) {
+ if (decExptCounter.shouldPrint()) {
+ logger.error("Length not equal,
totalLen={},readableBytes={},from={}",
+ totalLen, buffer.readableBytes(), ctx.channel());
+ }
+ buffer.resetReaderIndex();
+ throw new Exception("totalLen is not equal readableBytes.total");
+ }
+ // msgtype
+ int msgType = buffer.readByte() & 0x1f;
+
+ if (msgType == 4) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("debug decode");
+ }
+ } else if (msgType == 3 | msgType == 5) {
+ // bodylen
+ int bodyLength = buffer.readInt();
+ if (bodyLength >= totalLen) {
+ if (decExptCounter.shouldPrint()) {
+ logger.error("bodyLen greater than totalLen,
totalLen={},bodyLen={},from={}",
+ totalLen, bodyLength, ctx.channel());
+ }
+ buffer.resetReaderIndex();
+ throw new Exception("bodyLen is greater than
totalLen.totalLen");
+ }
+ byte[] bodyBytes;
+ if (bodyLength > 0) {
+ bodyBytes = new byte[bodyLength];
+ buffer.readBytes(bodyBytes);
+ }
+ // attrlen
+ String attrInfo = "";
+ int attrLength = buffer.readInt();
+ if (attrLength > 0) {
+ byte[] attrBytes = new byte[attrLength];
+ buffer.readBytes(attrBytes);
+ attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
+ }
+ out.add(new DecodeObject(MsgType.valueOf(msgType), attrInfo));
+ } else if (msgType == 7) {
+ int seqId = buffer.readInt();
+ int attrLen = buffer.readShort();
+ String attrInfo = "";
+ if (attrLen > 0) {
+ byte[] attrBytes = new byte[attrLen];
+ buffer.readBytes(attrBytes);
+ attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
+ }
+ buffer.readShort();
+ out.add(new DecodeObject(MsgType.valueOf(msgType), seqId,
attrInfo));
+ } else if (msgType == 8) {
+ // dataTime(4) + body_ver(1) + body_len(4) + body + attr_len(2) +
attr + magic(2)
+ buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and
body_len
+ final short load = buffer.readShort(); // read from body
+ int attrLen = buffer.readShort();
+ String attrInfo = "";
+ if (attrLen > 0) {
+ byte[] attrBytes = new byte[attrLen];
+ buffer.readBytes(attrBytes);
+ attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
+ }
+ buffer.skipBytes(2); // skip magic
+ out.add(new DecodeObject(MsgType.MSG_BIN_HEARTBEAT, attrInfo));
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
new file mode 100644
index 0000000000..d6cafdb85b
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * TCP protocol encoder class
+ *
+ * Used to encode the request package sent to DataProxy
+ */
+public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ProtocolEncoder.class);
+ private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx,
+ EncodeObject encObject, List<Object> out) throws Exception {
+ ByteBuf buf = null;
+ int totalLength;
+ try {
+ if (encObject.getMsgType() == MsgType.MSG_ACK_SERVICE) {
+ totalLength = 1 + 4 + 4 + encObject.getMsgSize();
+ buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+ buf.writeInt(totalLength);
+ buf.writeByte(encObject.getIntMsgType());
+ buf.writeInt(encObject.getBodyDataLength());
+ if (encObject.getBodyDataLength() > 0) {
+ buf.writeBytes(encObject.getBodyData());
+ }
+ buf.writeInt(encObject.getAttrDataLength());
+ if (encObject.getAttrDataLength() > 0) {
+ buf.writeBytes(encObject.getAttrData());
+ }
+ } else if (encObject.getMsgType() == MsgType.MSG_MULTI_BODY) {
+ totalLength = 1 + 4 + 4 + encObject.getMsgSize();
+ buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+ buf.writeInt(totalLength);
+ buf.writeByte(encObject.getIntMsgType());
+ buf.writeInt(encObject.getBodyDataLength());
+ if (encObject.getBodyDataLength() > 0) {
+ buf.writeBytes(encObject.getBodyData());
+ }
+ buf.writeInt(encObject.getAttrDataLength());
+ if (encObject.getAttrDataLength() > 0) {
+ buf.writeBytes(encObject.getAttrData());
+ }
+ } else if (encObject.getMsgType() == MsgType.MSG_BIN_MULTI_BODY) {
+ totalLength = 1 + 2 + 2 + 2 + 4 + 2 + 4 + 4 + 2 + 2 +
encObject.getMsgSize();
+ buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+ buf.writeInt(totalLength);
+ buf.writeByte(encObject.getIntMsgType());
+ buf.writeShort(encObject.getGroupIdNum());
+ buf.writeShort(encObject.getStreamIdNum());
+ buf.writeShort(encObject.getExtField());
+ buf.writeInt((int) encObject.getDtMs());
+ buf.writeShort(encObject.getMsgCnt());
+ buf.writeInt(encObject.getMessageId());
+ buf.writeInt(encObject.getBodyDataLength());
+ if (encObject.getBodyDataLength() > 0) {
+ buf.writeBytes(encObject.getBodyData());
+ }
+ buf.writeShort(encObject.getAttrDataLength());
+ if (encObject.getAttrDataLength() > 0) {
+ buf.writeBytes(encObject.getAttrData());
+ }
+ buf.writeShort(0xee01);
+ } else if (encObject.getMsgType() == MsgType.MSG_BIN_HEARTBEAT) {
+ totalLength = 1 + 4 + 1 + 4 + 2 +
encObject.getAttrDataLength() + 2;
+ buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+ buf.writeInt(totalLength);
+ buf.writeByte(encObject.getIntMsgType());
+ buf.writeInt((int) encObject.getDtMs());
+ buf.writeByte(2);
+ buf.writeInt(0);
+ buf.writeShort(encObject.getAttrDataLength());
+ if (encObject.getAttrDataLength() > 0) {
+ buf.writeBytes(encObject.getAttrData());
+ }
+ buf.writeShort(0xee01);
+ }
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ProtocolEncoder encode({}) message failure",
encObject.getMsgType(), ex);
+ }
+ }
+ if (buf != null) {
+ out.add(buf);
+ } else {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ProtocolEncoder write({}) buffer is null!",
encObject.getMsgType());
+ }
+ }
+ }
+}