This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 16084cc958 [INLONG-11700][SDK] Optimize TCP message reporting Sender 
implementation (#11701)
16084cc958 is described below

commit 16084cc958095c56468216eb6461d9fa020ba8ba
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 21 21:36:27 2025 +0800

    [INLONG-11700][SDK] Optimize TCP message reporting Sender implementation 
(#11701)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/common/ErrorCode.java     |  15 +-
 .../inlong/sdk/dataproxy/common/SdkConsts.java     |   3 +
 .../sdk/dataproxy/network/tcp/ClientHandler.java   | 101 ++++
 .../network/tcp/ClientPipelineFactory.java         |  52 ++
 .../inlong/sdk/dataproxy/network/tcp/SendQos.java  |  30 +
 .../sdk/dataproxy/network/tcp/TcpClientMgr.java    | 673 +++++++++++++++++++++
 .../network/tcp/codec/ProtocolEncoder.java         |   7 +
 .../inlong/sdk/dataproxy/sender/BaseSender.java    | 222 +++++++
 .../dataproxy/sender/tcp/InLongTcpMsgSender.java   | 355 +++++++++++
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |  11 +-
 10 files changed, 1462 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index d2cb671a58..187a1b56b9 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -60,7 +60,20 @@ public enum ErrorCode {
     PARSE_ENCRYPT_META_EXCEPTION(52, "Parse encrypt content failure"),
     META_REQUIRED_FIELD_NOT_EXIST(53, "Required meta field not exist"),
     META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"),
-
+    //
+    FETCH_PROXY_META_FAILURE(59, "Fetch dataproxy meta info failure"),
+    FETCH_ENCRYPT_META_FAILURE(60, "Fetch encrypt meta info failure"),
+    //
+    NO_NODE_META_INFOS(81, "No proxy node metadata info in local"),
+    EMPTY_ACTIVE_NODE_SET(82, "Empty active node set"),
+    EMPTY_WRITABLE_NODE_SET(83, "Empty writable node set"),
+    NO_VALID_REMOTE_NODE(84, "No valid remote node set"),
+    //
+    REPORT_INFO_EXCEED_MAX_LEN(91, "Report info exceed max allowed length"),
+    ENCODE_BODY_EXCEPTION(92, "Encode body exception"),
+    COMPRESS_BODY_EXCEPTION(93, "Compress body exception"),
+    ENCRYPT_BODY_EXCEPTION(94, "Encrypt body exception"),
+    GENERATE_SIGNATURE_EXCEPTION(95, "Generate signature exception"),
     //
     CONNECTION_UNAVAILABLE(111, "Connection unavailable"),
     CONNECTION_BREAK(112, "Connection break"),
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index 1449a5f0d8..cf2d006742 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -148,6 +148,9 @@ public class SdkConsts {
     public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
     public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
 
+    public static final int EXT_FIELD_FLAG_DISABLE_ID2NUM = 1 << 2;
+    public static final int EXT_FIELD_FLAG_SEP_BY_LF = 1 << 5;
+
     public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
 
     /* Reserved attribute data size(bytes). */
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientHandler.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientHandler.java
new file mode 100644
index 0000000000..3ad82919cf
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.DecodeObject;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TCP client handler class
+ *
+ * Used to process TCP response message.
+ */
+public class ClientHandler extends SimpleChannelInboundHandler<DecodeObject> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ClientHandler.class);
+    private static final LogCounter exceptCnt = new LogCounter(10, 100000, 60 
* 1000L);
+    private static final LogCounter thrownCnt = new LogCounter(10, 100000, 60 
* 1000L);
+
+    private final TcpClientMgr tcpClientMgr;
+
+    public ClientHandler(TcpClientMgr tcpClientMgr) {
+        this.tcpClientMgr = tcpClientMgr;
+    }
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, DecodeObject 
decObject) {
+        if (decObject.getMsgType() != MsgType.MSG_BIN_HEARTBEAT) {
+            tcpClientMgr.feedbackMsgResponse(ctx.channel().toString(), 
decObject);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
+        if (exceptCnt.shouldPrint()) {
+            logger.warn("ClientHandler({})'s channel {} has error!",
+                    tcpClientMgr.getSenderId(), ctx.channel(), e);
+        }
+        try {
+            tcpClientMgr.setChannelFrozen(ctx.channel().toString());
+        } catch (Throwable ex) {
+            if (thrownCnt.shouldPrint()) {
+                logger.warn("ClientHandler({}) exceptionCaught throw 
exception",
+                        tcpClientMgr.getSenderId(), ex);
+            }
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        ctx.fireChannelInactive();
+        if (logger.isDebugEnabled()) {
+            logger.debug("ClientHandler({}) channelDisconnected {}",
+                    tcpClientMgr.getSenderId(), ctx.channel());
+        }
+        try {
+            tcpClientMgr.notifyChannelDisconnected(ctx.channel().toString());
+        } catch (Throwable ex) {
+            if (thrownCnt.shouldPrint()) {
+                logger.warn("ClientHandler({}) channelInactive throw 
exception",
+                        tcpClientMgr.getSenderId(), ex);
+            }
+        }
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
+        if (logger.isDebugEnabled()) {
+            logger.debug("ClientHandler({}) channelUnregistered {}",
+                    tcpClientMgr.getSenderId(), ctx.channel());
+        }
+        try {
+            tcpClientMgr.notifyChannelDisconnected(ctx.channel().toString());
+        } catch (Throwable ex) {
+            if (thrownCnt.shouldPrint()) {
+                logger.warn("ClientHandler({}) channelUnregistered throw 
exception",
+                        tcpClientMgr.getSenderId(), ex);
+            }
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientPipelineFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientPipelineFactory.java
new file mode 100644
index 0000000000..ced8882ab2
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientPipelineFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.ProtocolDecoder;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.ProtocolEncoder;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * TCP client Pipeline Factory class
+ *
+ * Used to build TCP pipeline
+ */
+public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
+
+    private final TcpClientMgr tcpClientMgr;
+
+    public ClientPipelineFactory(TcpClientMgr tcpClientMgr) {
+        this.tcpClientMgr = tcpClientMgr;
+    }
+
+    @Override
+    public void initChannel(SocketChannel ch) throws Exception {
+
+        // Setup channel except for the SsHandler for TLS enabled connections
+
+        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+                65536, 0, 4, 0, 0));
+
+        ch.pipeline().addLast("contentDecoder", new ProtocolDecoder());
+        ch.pipeline().addLast("contentEncoder", new ProtocolEncoder());
+        ch.pipeline().addLast("handler", new ClientHandler(tcpClientMgr));
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/SendQos.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/SendQos.java
new file mode 100644
index 0000000000..85b022a8c0
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/SendQos.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+/**
+ * Send Qos class
+ *
+ * Used to identify different send type
+ */
+public enum SendQos {
+
+    NO_ACK, // only request, without response
+    SOURCE_ACK, // request and return a response upon receipt by DataProxy, 
default
+    SINK_ACK // request, and return a response after successful forwarding of 
the request by DataProxy
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
new file mode 100644
index 0000000000..47dec283c4
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -0,0 +1,673 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.SequentialID;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.DecodeObject;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * TCP Client Manager class
+ *
+ * Used to manage TCP clients, including periodically selecting proxy nodes,
+ *  finding available nodes when reporting messages, maintaining inflight 
message
+ *  sending status, finding responses to corresponding requests, etc.
+ */
+public class TcpClientMgr implements ClientMgr {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TcpClientMgr.class);
+    private static final LogCounter sendExceptCnt = new LogCounter(10, 100000, 
60 * 1000L);
+    private static final LogCounter updConExptCnt = new LogCounter(10, 100000, 
60 * 1000L);
+    private static final LogCounter exptCounter = new LogCounter(10, 100000, 
60 * 1000L);
+    private static final LogCounter callbackExceptCnt = new LogCounter(10, 
100000, 60 * 1000L);
+    private static final AtomicLong timerRefCnt = new AtomicLong(0);
+    private static Timer timerObj;
+    private final String senderId;
+    private final TcpMsgSenderConfig tcpConfig;
+    private final Bootstrap bootstrap;
+    private final SequentialID messageIdGen = new SequentialID();
+    private ConcurrentHashMap<String, TcpNettyClient> usingClientMaps = new 
ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, TcpNettyClient> deletingClientMaps = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, HostInfo> connFailNodeMap = new 
ConcurrentHashMap<>();
+    // current using nodes
+    private List<String> activeNodes = new ArrayList<>();
+    private volatile long lastUpdateTime = -1;
+
+    private final MaintThread maintThread;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicLong channelTermGen = new AtomicLong(0);
+    // request cache
+    private final ConcurrentHashMap<Integer, TcpCallFuture> reqObjects =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Integer, Timeout> reqTimeouts =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, 
Boolean>> channelMsgIdMap =
+            new ConcurrentHashMap<>();
+    // node select index
+    private final AtomicInteger reqSendIndex = new AtomicInteger(0);
+
+    /**
+     * Build up the connection between the server and client.
+     */
+    public TcpClientMgr(String senderId, TcpMsgSenderConfig tcpConfig, 
ThreadFactory selfDefineFactory) {
+        this.senderId = senderId;
+        this.tcpConfig = tcpConfig;
+        // Initialize the bootstrap
+        this.bootstrap = buildBootstrap(selfDefineFactory);
+        this.maintThread = new MaintThread();
+    }
+
+    @Override
+    public boolean start(ProcessResult procResult) {
+        if (!started.compareAndSet(false, true)) {
+            return procResult.setSuccess();
+        }
+        if (timerRefCnt.incrementAndGet() == 1) {
+            timerObj = new HashedWheelTimer();
+        }
+        // start hb thread
+        this.maintThread.start();
+        logger.info("ClientMgr({}) started", senderId);
+        return procResult.setSuccess();
+    }
+
+    @Override
+    public void stop() {
+        if (!started.compareAndSet(true, false)) {
+            return;
+        }
+        if (timerRefCnt.decrementAndGet() == 0) {
+            timerObj.stop();
+        }
+        this.bootstrap.config().group().shutdownGracefully();
+
+        this.maintThread.shutDown();
+        if (!channelMsgIdMap.isEmpty()) {
+            long startTime = System.currentTimeMillis();
+            while (!channelMsgIdMap.isEmpty()) {
+                if (System.currentTimeMillis() - startTime >= 
tcpConfig.getConCloseWaitPeriodMs()) {
+                    break;
+                }
+                ProxyUtils.sleepSomeTime(100L);
+            }
+        }
+        this.activeNodes.clear();
+        logger.info("ClientMgr({}) stopped!", senderId);
+    }
+
+    @Override
+    public int getInflightMsgCnt() {
+        return this.reqTimeouts.size();
+    }
+
+    @Override
+    public int getActiveNodeCnt() {
+        return activeNodes.size();
+    }
+
+    @Override
+    public void updateProxyInfoList(boolean nodeChanged, 
ConcurrentHashMap<String, HostInfo> hostInfoMap) {
+        if (hostInfoMap.isEmpty() || !this.started.get()) {
+            return;
+        }
+        long curTime = System.currentTimeMillis();
+        try {
+            // shuffle candidate nodes
+            List<HostInfo> candidateNodes = new 
ArrayList<>(hostInfoMap.size());
+            candidateNodes.addAll(hostInfoMap.values());
+            Collections.sort(candidateNodes);
+            Collections.shuffle(candidateNodes);
+            int curTotalCnt = candidateNodes.size();
+            int needActiveCnt = Math.min(this.tcpConfig.getAliveConnections(), 
curTotalCnt);
+            // build next step nodes
+            TcpNettyClient client;
+            int maxCycleCnt = 3;
+            this.connFailNodeMap.clear();
+            List<String> realHosts = new ArrayList<>();
+            ConcurrentHashMap<String, TcpNettyClient> tmpClientMaps = new 
ConcurrentHashMap<>();
+            do {
+                int selectCnt = 0;
+                for (HostInfo hostInfo : candidateNodes) {
+                    if (realHosts.contains(hostInfo.getReferenceName())) {
+                        continue;
+                    }
+                    try {
+                        client = new TcpNettyClient(this.senderId,
+                                this.bootstrap, hostInfo, this.tcpConfig);
+                        if (!client.connect(false, 
channelTermGen.incrementAndGet())) {
+                            
this.connFailNodeMap.put(hostInfo.getReferenceName(), hostInfo);
+                            client.close(false);
+                            continue;
+                        }
+                        realHosts.add(hostInfo.getReferenceName());
+                        tmpClientMaps.put(hostInfo.getReferenceName(), client);
+                        if (++selectCnt >= needActiveCnt) {
+                            break;
+                        }
+                    } catch (Throwable ex) {
+                        if (updConExptCnt.shouldPrint()) {
+                            logger.warn("ClientMgr({}) build client {} 
exception",
+                                    senderId, hostInfo.getReferenceName(), ex);
+                        }
+                    }
+                }
+                if (!realHosts.isEmpty()) {
+                    break;
+                }
+                ProxyUtils.sleepSomeTime(1000L);
+            } while (--maxCycleCnt > 0);
+            // update active nodes
+            if (realHosts.isEmpty()) {
+                if (nodeChanged) {
+                    logger.error("ClientMgr({}) changed nodes, but all connect 
failure, nodes={}!",
+                            this.senderId, candidateNodes);
+                } else {
+                    logger.error("ClientMgr({}) re-choose nodes, but all 
connect failure, nodes={}!",
+                            this.senderId, candidateNodes);
+                }
+            } else {
+                this.lastUpdateTime = System.currentTimeMillis();
+                this.deletingClientMaps = this.usingClientMaps;
+                this.usingClientMaps = tmpClientMaps;
+                this.activeNodes = realHosts;
+                if (nodeChanged) {
+                    logger.info("ClientMgr({}) changed nodes, wast {}ms, 
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+                            senderId, (System.currentTimeMillis() - curTime),
+                            needActiveCnt, realHosts.size(), realHosts, 
connFailNodeMap.keySet());
+                } else {
+                    logger.info("ClientMgr({}) re-choose nodes, wast {}ms, 
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+                            senderId, (System.currentTimeMillis() - curTime),
+                            needActiveCnt, realHosts.size(), realHosts, 
connFailNodeMap.keySet());
+                }
+            }
+        } catch (Throwable ex) {
+            if (updConExptCnt.shouldPrint()) {
+                logger.warn("ClientMgr({}) update nodes throw exception", 
senderId, ex);
+            }
+        }
+    }
+
+    public boolean reportEvent(SendQos sendQos, TcpNettyClient client,
+            EncodeObject encObject, MsgSendCallback callback, ProcessResult 
procResult) {
+        if (!this.started.get()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        long clientTerm = client.getChanTermId();
+        if (sendQos == SendQos.NO_ACK) {
+            // process no ack report
+            if (client.write(clientTerm, encObject, procResult)) {
+                client.decInFlightMsgCnt(clientTerm);
+            }
+            return procResult.isSuccess();
+        }
+        TcpCallFuture newFuture = new TcpCallFuture(encObject,
+                client.getClientAddr(), clientTerm, client.getChanStr(), 
callback);
+        TcpCallFuture curFuture = 
reqObjects.putIfAbsent(encObject.getMessageId(), newFuture);
+        if (curFuture != null) {
+            if (sendExceptCnt.shouldPrint()) {
+                logger.warn("ClientMgr({}) found message id {} has existed.",
+                        senderId, encObject.getMessageId());
+            }
+            return procResult.setFailResult(ErrorCode.DUPLICATED_MESSAGE_ID);
+        }
+        // build channel -- messageId map
+        ConcurrentHashMap<Integer, Boolean> msgIdMap = 
channelMsgIdMap.get(client.getChanStr());
+        if (msgIdMap == null) {
+            ConcurrentHashMap<Integer, Boolean> tmpMsgIdMap = new 
ConcurrentHashMap<>();
+            msgIdMap = channelMsgIdMap.putIfAbsent(client.getChanStr(), 
tmpMsgIdMap);
+            if (msgIdMap == null) {
+                msgIdMap = tmpMsgIdMap;
+            }
+        }
+        msgIdMap.put(encObject.getMessageId(), newFuture.isAsyncCall());
+        // send message
+        if (newFuture.isAsyncCall()) {
+            // process async report
+            reqTimeouts.put(encObject.getMessageId(),
+                    timerObj.newTimeout(new 
TimeoutTask(encObject.getMessageId()),
+                            tcpConfig.getRequestTimeoutMs(), 
TimeUnit.MILLISECONDS));
+            if (!client.write(clientTerm, encObject, procResult)) {
+                Timeout timeout = reqTimeouts.remove(encObject.getMessageId());
+                if (timeout != null) {
+                    timeout.cancel();
+                }
+                rmvMsgStubInfo(encObject.getMessageId());
+            }
+            return procResult.setSuccess();
+        } else {
+            // process sync report
+            if (!client.write(clientTerm, encObject, procResult)) {
+                rmvMsgStubInfo(encObject.getMessageId());
+                return false;
+            }
+            boolean retValue = newFuture.get(procResult,
+                    tcpConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
+            if (rmvMsgStubInfo(encObject.getMessageId())) {
+                if (procResult.getErrCode() == 
ErrorCode.SEND_WAIT_TIMEOUT.getErrCode()) {
+                    client.setBusy(clientTerm);
+                }
+                client.decInFlightMsgCnt(clientTerm);
+            }
+            return retValue;
+        }
+    }
+
+    private boolean rmvMsgStubInfo(int messageId) {
+        TcpCallFuture curFuture = reqObjects.remove(messageId);
+        if (curFuture == null) {
+            return false;
+        }
+        ConcurrentHashMap<Integer, Boolean> msgIdMap =
+                channelMsgIdMap.get(curFuture.getChanStr());
+        if (msgIdMap != null) {
+            msgIdMap.remove(curFuture.getMessageId());
+        }
+        return true;
+    }
+
+    public boolean getClientByRoundRobin(ProcessResult procResult) {
+        if (!this.started.get()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        List<String> curNodes = this.activeNodes;
+        int curNodeSize = curNodes.size();
+        if (curNodeSize == 0) {
+            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+        }
+        String curNode;
+        TcpNettyClient client;
+        TcpNettyClient back1thClient = null;
+        int nullClientCnt = 0;
+        int unWritableCnt = 0;
+        int startPos = reqSendIndex.getAndIncrement();
+        for (int step = 0; step < curNodeSize; step++) {
+            curNode = curNodes.get(Math.abs(startPos++) % curNodeSize);
+            client = usingClientMaps.get(curNode);
+            if (client == null) {
+                nullClientCnt++;
+                continue;
+            }
+            if (client.isActive()) {
+                if (client.getChannel().isWritable()) {
+                    if (tcpConfig.getMaxMsgInFlightPerConn() > 0
+                            && client.getMsgInflightCnt() > 
tcpConfig.getMaxMsgInFlightPerConn()) {
+                        back1thClient = client;
+                    } else {
+                        client.incClientUsingCnt();
+                        return procResult.setSuccess(client);
+                    }
+                } else {
+                    unWritableCnt++;
+                }
+            }
+        }
+        if (nullClientCnt == curNodeSize) {
+            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+        } else if (unWritableCnt + nullClientCnt == curNodeSize) {
+            return procResult.setFailResult(ErrorCode.EMPTY_WRITABLE_NODE_SET);
+        }
+        if (back1thClient != null) {
+            back1thClient.incClientUsingCnt();
+            return procResult.setSuccess(back1thClient);
+        }
+        return procResult.setFailResult(ErrorCode.NO_VALID_REMOTE_NODE);
+    }
+
+    public void feedbackMsgResponse(String channelStr, DecodeObject decObject) 
{
+        Timeout timeoutTask = 
this.reqTimeouts.remove(decObject.getMessageId());
+        if (timeoutTask != null) {
+            timeoutTask.cancel();
+        }
+        TcpCallFuture callFuture = 
this.reqObjects.remove(decObject.getMessageId());
+        if (callFuture == null) {
+            return;
+        }
+        long curTime = System.currentTimeMillis();
+        ConcurrentHashMap<Integer, Boolean> inflightMsgIds = 
channelMsgIdMap.get(channelStr);
+        if (inflightMsgIds != null) {
+            inflightMsgIds.remove(decObject.getMessageId());
+        }
+        try {
+            callFuture.onMessageAck(decObject.getSendResult());
+        } catch (Throwable ex) {
+            if (callbackExceptCnt.shouldPrint()) {
+                logger.info("ClientMgr({}) response come, callback exception!",
+                        senderId, ex);
+            }
+        } finally {
+            this.descInflightMsgCnt(callFuture);
+        }
+    }
+
+    public void setChannelFrozen(String channelStr) {
+        Map<Integer, Boolean> inflightMsgIds = channelMsgIdMap.get(channelStr);
+        if (inflightMsgIds == null) {
+            return;
+        }
+        TcpCallFuture callFuture;
+        TcpNettyClient nettyTcpClient;
+        for (Integer messageId : inflightMsgIds.keySet()) {
+            if (messageId == null) {
+                continue;
+            }
+            callFuture = this.reqObjects.get(messageId);
+            if (callFuture == null) {
+                continue;
+            }
+            // find and process in using clients
+            nettyTcpClient = usingClientMaps.get(callFuture.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == 
callFuture.getChanTerm()) {
+                nettyTcpClient.setFrozen(callFuture.getChanTerm());
+                return;
+            }
+            // find and process in deleting clients
+            nettyTcpClient = 
deletingClientMaps.get(callFuture.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == 
callFuture.getChanTerm()) {
+                nettyTcpClient.setFrozen(callFuture.getChanTerm());
+                return;
+            }
+            break;
+        }
+    }
+
+    public void notifyChannelDisconnected(String channelStr) {
+        Map<Integer, Boolean> inflightMsgIds =
+                channelMsgIdMap.remove(channelStr);
+        if (inflightMsgIds == null || inflightMsgIds.isEmpty()) {
+            return;
+        }
+        Timeout timeoutTask;
+        TcpNettyClient nettyTcpClient;
+        long curTime;
+        for (Integer messageId : inflightMsgIds.keySet()) {
+            if (messageId == null) {
+                continue;
+            }
+            timeoutTask = this.reqTimeouts.remove(messageId);
+            if (timeoutTask != null) {
+                timeoutTask.cancel();
+            }
+            TcpCallFuture callFuture = this.reqObjects.remove(messageId);
+            if (callFuture == null) {
+                continue;
+            }
+            curTime = System.currentTimeMillis();
+            // find and process in using clients
+            nettyTcpClient = usingClientMaps.get(callFuture.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == 
callFuture.getChanTerm()) {
+                try {
+                    nettyTcpClient.getChannel().eventLoop().execute(
+                            () -> callFuture.onMessageAck(new 
ProcessResult(ErrorCode.CONNECTION_BREAK)));
+                } catch (Throwable ex) {
+                    if (callbackExceptCnt.shouldPrint()) {
+                        logger.info("ClientMgr({}) disconnected, callback 
exception!",
+                                senderId, ex);
+                    }
+                } finally {
+                    nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                }
+                return;
+            }
+            // find and process in deleting clients
+            nettyTcpClient = 
deletingClientMaps.get(callFuture.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == 
callFuture.getChanTerm()) {
+                try {
+                    nettyTcpClient.getChannel().eventLoop().execute(
+                            () -> callFuture.onMessageAck(new 
ProcessResult(ErrorCode.CONNECTION_BREAK)));
+                } catch (Throwable ex) {
+                    if (callbackExceptCnt.shouldPrint()) {
+                        logger.info("ClientMgr({}) disconnected, callback2 
exception!",
+                                senderId, ex);
+                    }
+                } finally {
+                    nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                }
+            }
+        }
+    }
+
+    public String getSenderId() {
+        return this.senderId;
+    }
+
+    public int getNextMsgId() {
+        return messageIdGen.getNextInt();
+    }
+
+    private void descInflightMsgCnt(TcpCallFuture callFuture) {
+        TcpNettyClient nettyTcpClient = 
usingClientMaps.get(callFuture.getClientAddr());
+        if (nettyTcpClient != null
+                && nettyTcpClient.getChanTermId() == callFuture.getChanTerm()) 
{
+            nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+            return;
+        }
+        nettyTcpClient = deletingClientMaps.get(callFuture.getClientAddr());
+        if (nettyTcpClient != null
+                && nettyTcpClient.getChanTermId() == callFuture.getChanTerm()) 
{
+            nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+        }
+    }
+
+    private Bootstrap buildBootstrap(ThreadFactory selfFactory) {
+        if (selfFactory == null) {
+            selfFactory = new DefaultThreadFactory(
+                    "sdk-netty-workers", Thread.currentThread().isDaemon());
+        }
+        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(
+                tcpConfig.getNettyWorkerThreadNum(), 
tcpConfig.isEnableEpollBusyWait(), selfFactory);
+        Bootstrap tmpBootstrap = new Bootstrap();
+        tmpBootstrap.group(eventLoopGroup);
+        tmpBootstrap.option(ChannelOption.TCP_NODELAY, true);
+        tmpBootstrap.option(ChannelOption.SO_REUSEADDR, true);
+        
tmpBootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
+        if (tcpConfig.getRcvBufferSize() > 0) {
+            tmpBootstrap.option(ChannelOption.SO_RCVBUF, 
tcpConfig.getRcvBufferSize());
+        }
+        if (tcpConfig.getSendBufferSize() > 0) {
+            tmpBootstrap.option(ChannelOption.SO_SNDBUF, 
tcpConfig.getSendBufferSize());
+        }
+        tmpBootstrap.handler(new ClientPipelineFactory(this));
+        return tmpBootstrap;
+    }
+
+    private class MaintThread extends Thread {
+
+        private volatile boolean bShutDown;
+
+        public MaintThread() {
+            bShutDown = false;
+            this.setName("ClientMgrMaint-" + senderId);
+        }
+
+        public void shutDown() {
+            logger.info("ClientMgr({}) shutdown MaintThread!", senderId);
+            bShutDown = true;
+            this.interrupt();
+        }
+
+        @Override
+        public void run() {
+            logger.info("ClientMgr({}) start MaintThread!", senderId);
+            long curTime;
+            long checkRound = 0L;
+            boolean printFailNodes;
+            ProcessResult procResult = new ProcessResult();
+            Set<String> failNodes = new HashSet<>();
+            while (!bShutDown) {
+                printFailNodes = ((Math.abs(checkRound++) % 90L) == 0L);
+                try {
+                    curTime = System.currentTimeMillis();
+                    // clean deleting nodes
+                    if (deletingClientMaps != null && 
!deletingClientMaps.isEmpty()) {
+                        if (lastUpdateTime > 0
+                                && curTime - lastUpdateTime > 
tcpConfig.getConCloseWaitPeriodMs()) {
+                            for (TcpNettyClient client : 
deletingClientMaps.values()) {
+                                if (client == null) {
+                                    continue;
+                                }
+                                client.close(false);
+                            }
+                            deletingClientMaps.clear();
+                        }
+                    }
+                    // check and keepalive using nodes
+                    for (TcpNettyClient nettyTcpClient : 
usingClientMaps.values()) {
+                        if (nettyTcpClient == null) {
+                            continue;
+                        }
+                        if (nettyTcpClient.isActive()) {
+                            if (nettyTcpClient.isIdleClient(curTime)) {
+                                nettyTcpClient.sendHeartBeatMsg(procResult);
+                            }
+                        } else {
+                            if ((nettyTcpClient.getClientUsingCnt() <= 0)
+                                    && (nettyTcpClient.getMsgInflightCnt() <= 
0)
+                                    || (curTime - 
nettyTcpClient.getChanInvalidTime() > tcpConfig
+                                            .getConCloseWaitPeriodMs())) {
+                                nettyTcpClient.reconnect(false, 
channelTermGen.incrementAndGet());
+                            }
+                        }
+                        if (printFailNodes) {
+                            if (nettyTcpClient.isConFailNodes()) {
+                                failNodes.add(nettyTcpClient.getClientAddr());
+                            }
+                        }
+                    }
+                } catch (Throwable ex) {
+                    if (exptCounter.shouldPrint()) {
+                        logger.warn("ClientMgr({}) MaintThread throw 
exception", senderId, ex);
+                    }
+                }
+                if (printFailNodes && !failNodes.isEmpty()) {
+                    logger.warn("ClientMgr({}) found continue connect fail 
nodes: {}",
+                            senderId, failNodes);
+                    failNodes.clear();
+                }
+                if (bShutDown) {
+                    break;
+                }
+                ProxyUtils.sleepSomeTime(2000L);
+            }
+            logger.info("ClientMgr({}) exit MaintThread!", senderId);
+        }
+    }
+
+    /**
+     * Time out task call back handle
+     */
+    public class TimeoutTask implements TimerTask {
+
+        private final int messageId;
+
+        public TimeoutTask(int messageId) {
+            this.messageId = messageId;
+        }
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            Timeout timeoutMsg = reqTimeouts.remove(messageId);
+            if (timeoutMsg != null) {
+                timeoutMsg.cancel();
+            }
+            TcpCallFuture future = reqObjects.remove(messageId);
+            if (future == null) {
+                return;
+            }
+            long curTime = System.currentTimeMillis();
+            ConcurrentHashMap<Integer, Boolean> messageIdMap;
+            // find and process in using clients
+            TcpNettyClient nettyTcpClient = 
usingClientMaps.get(future.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == future.getChanTerm()) 
{
+                messageIdMap = channelMsgIdMap.get(future.getChanStr());
+                if (messageIdMap != null) {
+                    messageIdMap.remove(messageId);
+                }
+                try {
+                    nettyTcpClient.getChannel().eventLoop().execute(
+                            () -> future.onMessageAck(new 
ProcessResult(ErrorCode.SEND_WAIT_TIMEOUT)));
+                } catch (Throwable ex) {
+                    if (callbackExceptCnt.shouldPrint()) {
+                        logger.info("ClientMgr({}) msg timeout, callback 
exception!",
+                                senderId, ex);
+                    }
+                } finally {
+                    nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
+                }
+                return;
+            }
+            // find and process in deleting clients
+            nettyTcpClient = deletingClientMaps.get(future.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == future.getChanTerm()) 
{
+                messageIdMap = channelMsgIdMap.get(future.getChanStr());
+                if (messageIdMap != null) {
+                    messageIdMap.remove(messageId);
+                }
+                try {
+                    nettyTcpClient.getChannel().eventLoop().execute(
+                            () -> future.onMessageAck(new 
ProcessResult(ErrorCode.SEND_WAIT_TIMEOUT)));
+                } catch (Throwable ex) {
+                    if (callbackExceptCnt.shouldPrint()) {
+                        logger.info("ClientMgr({}) msg timeout, callback2 
exception!",
+                                senderId, ex);
+                    }
+                } finally {
+                    nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
+                }
+            }
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
index d6cafdb85b..f0763b9a76 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
@@ -46,6 +46,7 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
         int totalLength;
         try {
             if (encObject.getMsgType() == MsgType.MSG_ACK_SERVICE) {
+                // msgType(1) + bodyLength(4) + attrsLength(4)
                 totalLength = 1 + 4 + 4 + encObject.getMsgSize();
                 buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
                 buf.writeInt(totalLength);
@@ -59,6 +60,7 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                     buf.writeBytes(encObject.getAttrData());
                 }
             } else if (encObject.getMsgType() == MsgType.MSG_MULTI_BODY) {
+                // msgType(1) + bodyLength(4) + attrsLength(4)
                 totalLength = 1 + 4 + 4 + encObject.getMsgSize();
                 buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
                 buf.writeInt(totalLength);
@@ -72,6 +74,9 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                     buf.writeBytes(encObject.getAttrData());
                 }
             } else if (encObject.getMsgType() == MsgType.MSG_BIN_MULTI_BODY) {
+                // msgType(1) + groupNum(2) + streamNum(2) + extField(2)
+                // + dataTime(4) + msgCnt(2) + uniqueId(4) + bodyLength(4)
+                // + attrsLength(2) + magic(2)
                 totalLength = 1 + 2 + 2 + 2 + 4 + 2 + 4 + 4 + 2 + 2 + 
encObject.getMsgSize();
                 buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
                 buf.writeInt(totalLength);
@@ -92,6 +97,8 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                 }
                 buf.writeShort(0xee01);
             } else if (encObject.getMsgType() == MsgType.MSG_BIN_HEARTBEAT) {
+                // msgType(1) + dataTime(4) + version(1) + bodyLength(4)
+                // + attrsLength(2) + magic(2)
                 totalLength = 1 + 4 + 1 + 4 + 2 + 
encObject.getAttrDataLength() + 2;
                 buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
                 buf.writeInt(totalLength);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
new file mode 100644
index 0000000000..a5f44f17ee
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Base Sender class:
+ *
+ * Used to manage Sender metadata information, including Sender ID,
+ *  status, network interaction, metadata query object, and
+ *  Proxy node metadata information obtained from Manager, etc.
+ */
+public abstract class BaseSender implements ConfigHolder {
+
+    private final int SENDER_STATUS_UNINITIALIZED = -2;
+    private final int SENDER_STATUS_INITIALIZING = -1;
+    private final int SENDER_STATUS_STARTED = 0;
+    private final int SENDER_STATUS_CLOSED = 1;
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(BaseSender.class);
+    protected static final LogCounter exceptCnt = new LogCounter(10, 100000, 
60 * 1000L);
+    // sender id generator
+    private static final AtomicLong senderIdGen = new AtomicLong(0L);
+    //
+    protected final AtomicInteger senderStatus = new 
AtomicInteger(SENDER_STATUS_UNINITIALIZED);
+    protected final String senderId;
+    protected final ProxyClientConfig baseConfig;
+    protected ClientMgr clientMgr;
+    protected ProxyConfigManager configManager;
+    private final ReentrantReadWriteLock fsLock = new 
ReentrantReadWriteLock(true);
+    // proxy node meta infos
+    private final ConcurrentHashMap<String, HostInfo> proxyNodeInfos = new 
ConcurrentHashMap<>();
+    // groupId and streamId num info
+    private volatile int allowedPkgLength = -1;
+    protected volatile boolean idTransNum = false;
+    protected volatile int groupIdNum = 0;
+    private Map<String, Integer> streamIdMap = new HashMap<>();
+
+    protected BaseSender(ProxyClientConfig configure) {
+        this.baseConfig = configure.clone();
+        this.senderId = configure.getDataRptProtocol() + "-" + 
senderIdGen.incrementAndGet();
+    }
+
+    public boolean start(ProcessResult procResult) {
+        if (!this.senderStatus.compareAndSet(
+                SENDER_STATUS_UNINITIALIZED, SENDER_STATUS_INITIALIZING)) {
+            return procResult.setFailResult(ErrorCode.OK);
+        }
+        // start client manager
+        if (!this.clientMgr.start(procResult)) {
+            return false;
+        }
+        // query meta info from manager
+        if (!this.configManager.doProxyEntryQueryWork(procResult)) {
+            this.clientMgr.stop();
+            String errInfo = "queryCode=" + procResult.getErrCode()
+                    + ", detail=" + procResult.getErrMsg();
+            return 
procResult.setFailResult(ErrorCode.FETCH_PROXY_META_FAILURE, errInfo);
+        }
+        if (this.baseConfig.isEnableReportEncrypt()
+                && 
!this.configManager.doEncryptConfigEntryQueryWork(procResult)) {
+            this.clientMgr.stop();
+            String errInfo = "queryCode=" + procResult.getErrCode()
+                    + ", detail=" + procResult.getErrMsg();
+            return 
procResult.setFailResult(ErrorCode.FETCH_ENCRYPT_META_FAILURE, errInfo);
+        }
+        // start configure manager
+        this.configManager.start();
+        this.senderStatus.set(SENDER_STATUS_STARTED);
+        logger.info("Sender({}) instance started!", senderId);
+        return procResult.setFailResult(ErrorCode.OK);
+    }
+
+    public void close() {
+        int currentStatus = senderStatus.get();
+        if (currentStatus == SENDER_STATUS_CLOSED) {
+            return;
+        }
+        if (!senderStatus.compareAndSet(currentStatus, SENDER_STATUS_CLOSED)) {
+            return;
+        }
+        configManager.shutDown();
+        clientMgr.stop();
+        logger.info("Sender({}) instance stopped!", senderId);
+    }
+
+    @Override
+    public void updateAllowedMaxPkgLength(int maxPkgLength) {
+        this.allowedPkgLength = maxPkgLength;
+    }
+
+    @Override
+    public void updateProxyNodes(boolean nodeChanged, List<HostInfo> 
newProxyNodes) {
+        if (this.senderStatus.get() == SENDER_STATUS_CLOSED
+                || newProxyNodes == null || newProxyNodes.isEmpty()) {
+            return;
+        }
+        this.fsLock.writeLock().lock();
+        try {
+            boolean found;
+            List<String> rmvNodes = new ArrayList<>();
+            for (String hostRefName : this.proxyNodeInfos.keySet()) {
+                found = false;
+                for (HostInfo hostInfo : newProxyNodes) {
+                    if (hostRefName.equals(hostInfo.getReferenceName())) {
+                        found = true;
+                        break;
+                    }
+                }
+                if (!found) {
+                    rmvNodes.add(hostRefName);
+                }
+            }
+            for (HostInfo hostInfo : newProxyNodes) {
+                if 
(this.proxyNodeInfos.containsKey(hostInfo.getReferenceName())) {
+                    continue;
+                }
+                this.proxyNodeInfos.put(hostInfo.getReferenceName(), hostInfo);
+            }
+            for (String rmvNode : rmvNodes) {
+                this.proxyNodeInfos.remove(rmvNode);
+            }
+            clientMgr.updateProxyInfoList(nodeChanged, this.proxyNodeInfos);
+        } finally {
+            this.fsLock.writeLock().unlock();
+        }
+    }
+
+    public boolean isStarted() {
+        return senderStatus.get() == SENDER_STATUS_STARTED;
+    }
+
+    public String getMetaConfigKey() {
+        return this.baseConfig.getGroupMetaConfigKey();
+    }
+
+    public String getSenderId() {
+        return senderId;
+    }
+
+    public ProxyClientConfig getConfigure() {
+        return baseConfig;
+    }
+
+    public int getAllowedPkgLength() {
+        return allowedPkgLength;
+    }
+
+    public String getGroupId() {
+        return baseConfig.getInlongGroupId();
+    }
+
+    public boolean isMetaInfoUnReady() {
+        return this.proxyNodeInfos.isEmpty();
+    }
+
+    public Map<String, HostInfo> getProxyNodeInfos() {
+        return proxyNodeInfos;
+    }
+
+    public int getProxyNodeCnt() {
+        return proxyNodeInfos.size();
+    }
+
+    public abstract int getActiveNodeCnt();
+
+    public abstract int getInflightMsgCnt();
+
+    public void updateGroupIdAndStreamIdNumInfo(
+            int groupIdNum, Map<String, Integer> streamIdMap) {
+        this.groupIdNum = groupIdNum;
+        this.streamIdMap = streamIdMap;
+        if (groupIdNum != 0 && streamIdMap != null && !streamIdMap.isEmpty()) {
+            this.idTransNum = true;
+        }
+    }
+
+    protected int getStreamIdNum(String streamId) {
+        if (idTransNum) {
+            Integer tmpNum = streamIdMap.get(streamId);
+            if (tmpNum != null) {
+                return tmpNum;
+            }
+        }
+        return 0;
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
new file mode 100644
index 0000000000..97076dd977
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
+
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
+import org.apache.inlong.sdk.dataproxy.network.tcp.SendQos;
+import org.apache.inlong.sdk.dataproxy.network.tcp.TcpClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.tcp.TcpNettyClient;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.xerial.snappy.Snappy;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * TCP Message Sender class
+ *
+ * Used to define the TCP sender common methods
+ */
+public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender {
+
+    protected static final LogCounter tcpExceptCnt = new LogCounter(10, 
100000, 60 * 1000L);
+    private final TcpMsgSenderConfig tcpConfig;
+    private final TcpClientMgr tcpClientMgr;
+
+    protected InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory 
selfDefineFactory) {
+        super(configure);
+        this.tcpConfig = (TcpMsgSenderConfig) baseConfig;
+        this.clientMgr = new TcpClientMgr(this.getSenderId(), this.tcpConfig, 
selfDefineFactory);
+        this.tcpClientMgr = (TcpClientMgr) clientMgr;
+    }
+
+    @Override
+    public boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult 
procResult) {
+        if (eventInfo == null) {
+            throw new NullPointerException("eventInfo is null");
+        }
+        if (procResult == null) {
+            throw new NullPointerException("procResult is null");
+        }
+        if (!this.isStarted()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        return processEvent(SendQos.NO_ACK, eventInfo, null, procResult);
+    }
+
+    @Override
+    public boolean syncSendMessage(boolean sendInB2B,
+            TcpEventInfo eventInfo, ProcessResult procResult) {
+        if (eventInfo == null) {
+            throw new NullPointerException("eventInfo is null");
+        }
+        if (procResult == null) {
+            throw new NullPointerException("procResult is null");
+        }
+        if (!this.isStarted()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        if (sendInB2B) {
+            return processEvent(SendQos.SOURCE_ACK, eventInfo, null, 
procResult);
+        } else {
+            return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
+        }
+    }
+
+    @Override
+    public boolean asyncSendMessage(boolean sendInB2B,
+            TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult 
procResult) {
+        if (eventInfo == null) {
+            throw new NullPointerException("eventInfo is null");
+        }
+        if (callback == null) {
+            throw new NullPointerException("callback is null");
+        }
+        if (procResult == null) {
+            throw new NullPointerException("procResult is null");
+        }
+        if (!this.isStarted()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        if (sendInB2B) {
+            return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, 
procResult);
+        } else {
+            return processEvent(SendQos.SINK_ACK, eventInfo, callback, 
procResult);
+        }
+    }
+
+    @Override
+    public int getActiveNodeCnt() {
+        return tcpClientMgr.getActiveNodeCnt();
+    }
+
+    @Override
+    public int getInflightMsgCnt() {
+        return tcpClientMgr.getInflightMsgCnt();
+    }
+
+    private boolean processEvent(SendQos sendQos,
+            TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult 
procResult) {
+        if (this.isMetaInfoUnReady()) {
+            return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
+        }
+        EncodeObject encObject = new EncodeObject(eventInfo.getGroupId(),
+                eventInfo.getStreamId(), tcpConfig.getSdkMsgType(), 
eventInfo.getDtMs());
+        // pre-process attributes
+        processEventAttrsInfo(sendQos, eventInfo, encObject);
+        // check package length
+        if (!isValidPkgLength(encObject.getAttrDataLength(),
+                eventInfo.getBodySize(), this.getAllowedPkgLength(), 
procResult)) {
+            return false;
+        }
+        // process body
+        if (!procEventBodyInfo(eventInfo, procResult, encObject)) {
+            return false;
+        }
+        // get client object
+        if (!tcpClientMgr.getClientByRoundRobin(procResult)) {
+            return false;
+        }
+        TcpNettyClient client = (TcpNettyClient) procResult.getRetData();
+        encObject.setMessageIdInfo(tcpClientMgr.getNextMsgId());
+        try {
+            return tcpClientMgr.reportEvent(sendQos, client, encObject, 
callback, procResult);
+        } finally {
+            client.decClientUsingCnt();
+        }
+    }
+
+    private boolean isValidPkgLength(
+            int attrLength, int bodyLength, int allowedLen, ProcessResult 
procResult) {
+        // Not valid if the maximum limit is less than or equal to 0
+        if (allowedLen < 0) {
+            return true;
+        }
+        // Reserve space for attribute
+        if (bodyLength + attrLength > allowedLen - 
SdkConsts.RESERVED_ATTRIBUTE_LENGTH) {
+            String errMsg = String.format("OverMaxLen: attrLen(%d) + 
bodyLen(%d) > allowedLen(%d) - fixedLen(%d)",
+                    attrLength, bodyLength, allowedLen, 
SdkConsts.RESERVED_ATTRIBUTE_LENGTH);
+            if (tcpExceptCnt.shouldPrint()) {
+                logger.warn(errMsg);
+            }
+            return 
procResult.setFailResult(ErrorCode.REPORT_INFO_EXCEED_MAX_LEN, errMsg);
+        }
+        return true;
+    }
+
+    private void processEventAttrsInfo(
+            SendQos sendQos, TcpEventInfo eventInfo, EncodeObject 
encodeObject) {
+        // get msgType
+        int intMsgType = encodeObject.getMsgType().getValue();
+        boolean enableDataComp = tcpConfig.isEnableDataCompress()
+                && eventInfo.getBodySize() >= 
tcpConfig.getMinCompEnableLength();
+        // add fixed attributes
+        Map<String, String> newAttrs = new HashMap<>(eventInfo.getAttrs());
+        newAttrs.put(AttributeConstants.MSG_RPT_TIME, 
String.valueOf(encodeObject.getRtms()));
+        newAttrs.put(AttributeConstants.PROXY_SDK_VERSION, 
ProxyUtils.getJarVersion());
+
+        if (sendQos == SendQos.NO_ACK) {
+            newAttrs.put(AttributeConstants.MESSAGE_IS_ACK, 
String.valueOf(true));
+        } else if (sendQos == SendQos.SINK_ACK) {
+            newAttrs.put(AttributeConstants.MESSAGE_PROXY_SEND, 
String.valueOf(true));
+        }
+        if (tcpConfig.getSdkMsgType() == MsgType.MSG_ACK_SERVICE
+                || tcpConfig.getSdkMsgType() == MsgType.MSG_MULTI_BODY) {
+            // add msgType 3/5 attributes
+            newAttrs.put(AttributeConstants.GROUP_ID, eventInfo.getGroupId());
+            newAttrs.put(AttributeConstants.STREAM_ID, 
eventInfo.getStreamId());
+            newAttrs.put(AttributeConstants.DATA_TIME, 
String.valueOf(eventInfo.getDtMs()));
+            newAttrs.put(AttributeConstants.MESSAGE_COUNT, 
String.valueOf(eventInfo.getMsgCnt()));
+            if (enableDataComp) {
+                newAttrs.put(AttributeConstants.COMPRESS_TYPE, "snappy");
+            }
+        } else {
+            // add msgType 7 attributes
+            if (enableDataComp) {
+                intMsgType |= SdkConsts.FLAG_ALLOW_COMPRESS;
+            }
+            int extField = 0;
+            if (tcpConfig.isSeparateEventByLF()) {
+                extField |= SdkConsts.EXT_FIELD_FLAG_SEP_BY_LF;
+            }
+            boolean id2Num = false;
+            int streamIdNum = 0;
+            if (this.idTransNum && (this.groupIdNum != 0)
+                    && 
eventInfo.getGroupId().equals(tcpConfig.getInlongGroupId())) {
+                streamIdNum = getStreamIdNum(eventInfo.getStreamId());
+                id2Num = streamIdNum != 0;
+            }
+            if (id2Num) {
+                encodeObject.setGroupAndStreamId2Num(this.groupIdNum, 
streamIdNum);
+            } else {
+                extField |= SdkConsts.EXT_FIELD_FLAG_DISABLE_ID2NUM;
+                newAttrs.put(AttributeConstants.GROUP_ID, 
eventInfo.getGroupId());
+                newAttrs.put(AttributeConstants.STREAM_ID, 
eventInfo.getStreamId());
+            }
+            encodeObject.setExtField(extField);
+        }
+        byte[] aesKey = null;
+        // set encrypt attributes
+        if (tcpConfig.isEnableReportEncrypt()) {
+            EncryptConfigEntry encryptEntry = 
configManager.getUserEncryptConfigEntry();
+            newAttrs.put("_userName", tcpConfig.getRptUserName());
+            newAttrs.put("_encyVersion", encryptEntry.getVersion());
+            newAttrs.put("_encyAesKey", encryptEntry.getRsaEncryptedKey());
+            aesKey = encryptEntry.getAesKey();
+            intMsgType |= SdkConsts.FLAG_ALLOW_ENCRYPT;
+        }
+        encodeObject.setAttrInfo(intMsgType, enableDataComp, aesKey, newAttrs);
+    }
+
+    private boolean procEventBodyInfo(TcpEventInfo eventInfo, ProcessResult 
procResult, EncodeObject encObject) {
+        // encode message body
+        byte[] body = encBodyList(senderId, encObject.getMsgType(),
+                tcpConfig.isSeparateEventByLF(), eventInfo, procResult);
+        if (body == null) {
+            return false;
+        }
+        // compress body
+        if (encObject.isCompress()) {
+            body = compressBodyInfo(senderId, body, procResult);
+            if (body == null) {
+                return false;
+            }
+        }
+        // encrypt body
+        if (tcpConfig.isEnableReportEncrypt()) {
+            body = aesEncryptBodyInfo(senderId, body, encObject.getAesKey(), 
procResult);
+            if (body == null) {
+                return false;
+            }
+        }
+        encObject.setBodyData(eventInfo.getMsgCnt(), body);
+        return true;
+    }
+
+    private byte[] encBodyList(String senderId,
+            MsgType msgType, boolean sepByLF, TcpEventInfo eventInfo, 
ProcessResult procResult) {
+        try {
+            int totalCnt = 0;
+            ByteArrayOutputStream bodyOut = new ByteArrayOutputStream();
+            if (msgType == MsgType.MSG_ACK_SERVICE) {
+                for (byte[] entry : eventInfo.getBodyList()) {
+                    if (totalCnt++ > 0) {
+                        
bodyOut.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
+                    }
+                    bodyOut.write(entry);
+                }
+            } else if (msgType == MsgType.MSG_MULTI_BODY) {
+                for (byte[] entry : eventInfo.getBodyList()) {
+                    ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+                    byteBuffer.putInt(entry.length);
+                    bodyOut.write(byteBuffer.array());
+                    bodyOut.write(entry);
+                }
+            } else {
+                if (sepByLF) {
+                    ByteArrayOutputStream data = new ByteArrayOutputStream();
+                    for (byte[] entry : eventInfo.getBodyList()) {
+                        if (totalCnt++ > 0) {
+                            
data.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
+                        }
+                        data.write(entry);
+                    }
+                    ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+                    dataBuffer.putInt(data.toByteArray().length);
+                    bodyOut.write(dataBuffer.array());
+                    bodyOut.write(data.toByteArray());
+                } else {
+                    for (byte[] entry : eventInfo.getBodyList()) {
+                        ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+                        dataBuffer.putInt(entry.length);
+                        bodyOut.write(dataBuffer.array());
+                        bodyOut.write(entry);
+                    }
+                }
+            }
+            return bodyOut.toByteArray();
+        } catch (Throwable ex) {
+            procResult.setFailResult(ErrorCode.ENCODE_BODY_EXCEPTION, 
ex.getMessage());
+            if (tcpExceptCnt.shouldPrint()) {
+                logger.warn("Sender({}) encode body exception", senderId, ex);
+            }
+            return null;
+        }
+    }
+
+    private byte[] compressBodyInfo(String senderId, byte[] body, 
ProcessResult procResult) {
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            out.write(body);
+            int guessLen = Snappy.maxCompressedLength(out.size());
+            byte[] tmpData = new byte[guessLen];
+            int len = Snappy.compress(out.toByteArray(), 0, out.size(),
+                    tmpData, 0);
+            body = new byte[len];
+            System.arraycopy(tmpData, 0, body, 0, len);
+            return body;
+        } catch (Throwable ex) {
+            procResult.setFailResult(ErrorCode.COMPRESS_BODY_EXCEPTION, 
ex.getMessage());
+            if (tcpExceptCnt.shouldPrint()) {
+                logger.warn("Sender({}) compress body exception", senderId, 
ex);
+            }
+            return null;
+        }
+    }
+
+    private byte[] aesEncryptBodyInfo(String senderId,
+            byte[] plainText, byte[] aesKey, ProcessResult procResult) {
+        try {
+            SecretKeySpec secretKeySpec = new SecretKeySpec(aesKey, 
EncryptUtil.AES);
+            Cipher cipher = Cipher.getInstance(EncryptUtil.AES);
+            cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec);
+            return cipher.doFinal(plainText);
+        } catch (Throwable ex) {
+            procResult.setFailResult(ErrorCode.ENCRYPT_BODY_EXCEPTION, 
ex.getMessage());
+            if (tcpExceptCnt.shouldPrint()) {
+                logger.warn("Sender({}) aesEncrypt body exception", senderId, 
ex);
+            }
+            return null;
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index bc229b744b..2eace638a2 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -57,7 +57,7 @@ public class ProxyUtils {
     private static String sdkVersion;
 
     static {
-        localHost = getLocalIp();
+        getLocalIp();
         getJarVersion();
         Collections.addAll(SdkReservedWords,
                 AttributeConstants.GROUP_ID, AttributeConstants.STREAM_ID,
@@ -68,13 +68,12 @@ public class ProxyUtils {
                 AttributeConstants.NODE_IP, AttributeConstants.MESSAGE_ID,
                 AttributeConstants.MESSAGE_IS_ACK, 
AttributeConstants.MESSAGE_PROXY_SEND,
                 AttributeConstants.MESSAGE_PROCESS_ERRCODE, 
AttributeConstants.MESSAGE_PROCESS_ERRMSG,
-                AttributeConstants.MSG_RPT_TIME, 
AttributeConstants.AUDIT_VERSION,
-                AttributeConstants.PROXY_SDK_VERSION, KEY_FILE_STATUS_CHECK,
-                KEY_SECRET_ID, KEY_SIGNATURE, KEY_TIME_STAMP, KEY_NONCE, 
KEY_USERNAME,
-                KEY_CLIENT_IP, KEY_ENCY_VERSION, KEY_ENCY_AES_KEY);
+                AttributeConstants.MSG_RPT_TIME, 
AttributeConstants.PROXY_SDK_VERSION,
+                KEY_FILE_STATUS_CHECK, KEY_SECRET_ID, KEY_SIGNATURE, 
KEY_TIME_STAMP,
+                KEY_NONCE, KEY_USERNAME, KEY_CLIENT_IP, KEY_ENCY_VERSION, 
KEY_ENCY_AES_KEY);
         /*
          * Collections.addAll(SdkReservedWords, "groupId", "streamId", "dt", 
"msgUUID", "cp", "cnt", "mt", "m", "sid",
-         * "t", "NodeIP", "messageId", "isAck", "proxySend", "errCode", 
"errMsg", "rtms", "sdkVersion", "auditVersion",
+         * "t", "NodeIP", "messageId", "isAck", "proxySend", "errCode", 
"errMsg", "rtms", "sdkVersion",
          * "_file_status_check", "_secretId", "_signature", "_timeStamp", 
"_nonce", "_userName", "_clientIP",
          * "_encyVersion", "_encyAesKey");
          */

Reply via email to