This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 16084cc958 [INLONG-11700][SDK] Optimize TCP message reporting Sender
implementation (#11701)
16084cc958 is described below
commit 16084cc958095c56468216eb6461d9fa020ba8ba
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 21 21:36:27 2025 +0800
[INLONG-11700][SDK] Optimize TCP message reporting Sender implementation
(#11701)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/common/ErrorCode.java | 15 +-
.../inlong/sdk/dataproxy/common/SdkConsts.java | 3 +
.../sdk/dataproxy/network/tcp/ClientHandler.java | 101 ++++
.../network/tcp/ClientPipelineFactory.java | 52 ++
.../inlong/sdk/dataproxy/network/tcp/SendQos.java | 30 +
.../sdk/dataproxy/network/tcp/TcpClientMgr.java | 673 +++++++++++++++++++++
.../network/tcp/codec/ProtocolEncoder.java | 7 +
.../inlong/sdk/dataproxy/sender/BaseSender.java | 222 +++++++
.../dataproxy/sender/tcp/InLongTcpMsgSender.java | 355 +++++++++++
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 11 +-
10 files changed, 1462 insertions(+), 7 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index d2cb671a58..187a1b56b9 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -60,7 +60,20 @@ public enum ErrorCode {
PARSE_ENCRYPT_META_EXCEPTION(52, "Parse encrypt content failure"),
META_REQUIRED_FIELD_NOT_EXIST(53, "Required meta field not exist"),
META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"),
-
+ //
+ FETCH_PROXY_META_FAILURE(59, "Fetch dataproxy meta info failure"),
+ FETCH_ENCRYPT_META_FAILURE(60, "Fetch encrypt meta info failure"),
+ //
+ NO_NODE_META_INFOS(81, "No proxy node metadata info in local"),
+ EMPTY_ACTIVE_NODE_SET(82, "Empty active node set"),
+ EMPTY_WRITABLE_NODE_SET(83, "Empty writable node set"),
+ NO_VALID_REMOTE_NODE(84, "No valid remote node set"),
+ //
+ REPORT_INFO_EXCEED_MAX_LEN(91, "Report info exceed max allowed length"),
+ ENCODE_BODY_EXCEPTION(92, "Encode body exception"),
+ COMPRESS_BODY_EXCEPTION(93, "Compress body exception"),
+ ENCRYPT_BODY_EXCEPTION(94, "Encrypt body exception"),
+ GENERATE_SIGNATURE_EXCEPTION(95, "Generate signature exception"),
//
CONNECTION_UNAVAILABLE(111, "Connection unavailable"),
CONNECTION_BREAK(112, "Connection break"),
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index 1449a5f0d8..cf2d006742 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -148,6 +148,9 @@ public class SdkConsts {
public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
+ public static final int EXT_FIELD_FLAG_DISABLE_ID2NUM = 1 << 2;
+ public static final int EXT_FIELD_FLAG_SEP_BY_LF = 1 << 5;
+
public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
/* Reserved attribute data size(bytes). */
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientHandler.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientHandler.java
new file mode 100644
index 0000000000..3ad82919cf
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.DecodeObject;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TCP client handler class
+ *
+ * Used to process TCP response message.
+ */
+public class ClientHandler extends SimpleChannelInboundHandler<DecodeObject> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClientHandler.class);
+ private static final LogCounter exceptCnt = new LogCounter(10, 100000, 60
* 1000L);
+ private static final LogCounter thrownCnt = new LogCounter(10, 100000, 60
* 1000L);
+
+ private final TcpClientMgr tcpClientMgr;
+
+ public ClientHandler(TcpClientMgr tcpClientMgr) {
+ this.tcpClientMgr = tcpClientMgr;
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, DecodeObject
decObject) {
+ if (decObject.getMsgType() != MsgType.MSG_BIN_HEARTBEAT) {
+ tcpClientMgr.feedbackMsgResponse(ctx.channel().toString(),
decObject);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
+ if (exceptCnt.shouldPrint()) {
+ logger.warn("ClientHandler({})'s channel {} has error!",
+ tcpClientMgr.getSenderId(), ctx.channel(), e);
+ }
+ try {
+ tcpClientMgr.setChannelFrozen(ctx.channel().toString());
+ } catch (Throwable ex) {
+ if (thrownCnt.shouldPrint()) {
+ logger.warn("ClientHandler({}) exceptionCaught throw
exception",
+ tcpClientMgr.getSenderId(), ex);
+ }
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ ctx.fireChannelInactive();
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClientHandler({}) channelDisconnected {}",
+ tcpClientMgr.getSenderId(), ctx.channel());
+ }
+ try {
+ tcpClientMgr.notifyChannelDisconnected(ctx.channel().toString());
+ } catch (Throwable ex) {
+ if (thrownCnt.shouldPrint()) {
+ logger.warn("ClientHandler({}) channelInactive throw
exception",
+ tcpClientMgr.getSenderId(), ex);
+ }
+ }
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws
Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClientHandler({}) channelUnregistered {}",
+ tcpClientMgr.getSenderId(), ctx.channel());
+ }
+ try {
+ tcpClientMgr.notifyChannelDisconnected(ctx.channel().toString());
+ } catch (Throwable ex) {
+ if (thrownCnt.shouldPrint()) {
+ logger.warn("ClientHandler({}) channelUnregistered throw
exception",
+ tcpClientMgr.getSenderId(), ex);
+ }
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientPipelineFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientPipelineFactory.java
new file mode 100644
index 0000000000..ced8882ab2
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/ClientPipelineFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.ProtocolDecoder;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.ProtocolEncoder;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * TCP client Pipeline Factory class
+ *
+ * Used to build TCP pipeline
+ */
+public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
+
+ private final TcpClientMgr tcpClientMgr;
+
+ public ClientPipelineFactory(TcpClientMgr tcpClientMgr) {
+ this.tcpClientMgr = tcpClientMgr;
+ }
+
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+
+ // Setup channel except for the SsHandler for TLS enabled connections
+
+ ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+ 65536, 0, 4, 0, 0));
+
+ ch.pipeline().addLast("contentDecoder", new ProtocolDecoder());
+ ch.pipeline().addLast("contentEncoder", new ProtocolEncoder());
+ ch.pipeline().addLast("handler", new ClientHandler(tcpClientMgr));
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/SendQos.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/SendQos.java
new file mode 100644
index 0000000000..85b022a8c0
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/SendQos.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+/**
+ * Send Qos class
+ *
+ * Used to identify different send type
+ */
+public enum SendQos {
+
+ NO_ACK, // only request, without response
+ SOURCE_ACK, // request and return a response upon receipt by DataProxy,
default
+ SINK_ACK // request, and return a response after successful forwarding of
the request by DataProxy
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
new file mode 100644
index 0000000000..47dec283c4
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -0,0 +1,673 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.SequentialID;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.DecodeObject;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * TCP Client Manager class
+ *
+ * Used to manage TCP clients, including periodically selecting proxy nodes,
+ * finding available nodes when reporting messages, maintaining inflight
message
+ * sending status, finding responses to corresponding requests, etc.
+ */
+public class TcpClientMgr implements ClientMgr {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TcpClientMgr.class);
+ private static final LogCounter sendExceptCnt = new LogCounter(10, 100000,
60 * 1000L);
+ private static final LogCounter updConExptCnt = new LogCounter(10, 100000,
60 * 1000L);
+ private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
+ private static final LogCounter callbackExceptCnt = new LogCounter(10,
100000, 60 * 1000L);
+ private static final AtomicLong timerRefCnt = new AtomicLong(0);
+ private static Timer timerObj;
+ private final String senderId;
+ private final TcpMsgSenderConfig tcpConfig;
+ private final Bootstrap bootstrap;
+ private final SequentialID messageIdGen = new SequentialID();
+ private ConcurrentHashMap<String, TcpNettyClient> usingClientMaps = new
ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, TcpNettyClient> deletingClientMaps = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, HostInfo> connFailNodeMap = new
ConcurrentHashMap<>();
+ // current using nodes
+ private List<String> activeNodes = new ArrayList<>();
+ private volatile long lastUpdateTime = -1;
+
+ private final MaintThread maintThread;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicLong channelTermGen = new AtomicLong(0);
+ // request cache
+ private final ConcurrentHashMap<Integer, TcpCallFuture> reqObjects =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, Timeout> reqTimeouts =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ConcurrentHashMap<Integer,
Boolean>> channelMsgIdMap =
+ new ConcurrentHashMap<>();
+ // node select index
+ private final AtomicInteger reqSendIndex = new AtomicInteger(0);
+
+ /**
+ * Build up the connection between the server and client.
+ */
+ public TcpClientMgr(String senderId, TcpMsgSenderConfig tcpConfig,
ThreadFactory selfDefineFactory) {
+ this.senderId = senderId;
+ this.tcpConfig = tcpConfig;
+ // Initialize the bootstrap
+ this.bootstrap = buildBootstrap(selfDefineFactory);
+ this.maintThread = new MaintThread();
+ }
+
+ @Override
+ public boolean start(ProcessResult procResult) {
+ if (!started.compareAndSet(false, true)) {
+ return procResult.setSuccess();
+ }
+ if (timerRefCnt.incrementAndGet() == 1) {
+ timerObj = new HashedWheelTimer();
+ }
+ // start hb thread
+ this.maintThread.start();
+ logger.info("ClientMgr({}) started", senderId);
+ return procResult.setSuccess();
+ }
+
+ @Override
+ public void stop() {
+ if (!started.compareAndSet(true, false)) {
+ return;
+ }
+ if (timerRefCnt.decrementAndGet() == 0) {
+ timerObj.stop();
+ }
+ this.bootstrap.config().group().shutdownGracefully();
+
+ this.maintThread.shutDown();
+ if (!channelMsgIdMap.isEmpty()) {
+ long startTime = System.currentTimeMillis();
+ while (!channelMsgIdMap.isEmpty()) {
+ if (System.currentTimeMillis() - startTime >=
tcpConfig.getConCloseWaitPeriodMs()) {
+ break;
+ }
+ ProxyUtils.sleepSomeTime(100L);
+ }
+ }
+ this.activeNodes.clear();
+ logger.info("ClientMgr({}) stopped!", senderId);
+ }
+
+ @Override
+ public int getInflightMsgCnt() {
+ return this.reqTimeouts.size();
+ }
+
+ @Override
+ public int getActiveNodeCnt() {
+ return activeNodes.size();
+ }
+
+ @Override
+ public void updateProxyInfoList(boolean nodeChanged,
ConcurrentHashMap<String, HostInfo> hostInfoMap) {
+ if (hostInfoMap.isEmpty() || !this.started.get()) {
+ return;
+ }
+ long curTime = System.currentTimeMillis();
+ try {
+ // shuffle candidate nodes
+ List<HostInfo> candidateNodes = new
ArrayList<>(hostInfoMap.size());
+ candidateNodes.addAll(hostInfoMap.values());
+ Collections.sort(candidateNodes);
+ Collections.shuffle(candidateNodes);
+ int curTotalCnt = candidateNodes.size();
+ int needActiveCnt = Math.min(this.tcpConfig.getAliveConnections(),
curTotalCnt);
+ // build next step nodes
+ TcpNettyClient client;
+ int maxCycleCnt = 3;
+ this.connFailNodeMap.clear();
+ List<String> realHosts = new ArrayList<>();
+ ConcurrentHashMap<String, TcpNettyClient> tmpClientMaps = new
ConcurrentHashMap<>();
+ do {
+ int selectCnt = 0;
+ for (HostInfo hostInfo : candidateNodes) {
+ if (realHosts.contains(hostInfo.getReferenceName())) {
+ continue;
+ }
+ try {
+ client = new TcpNettyClient(this.senderId,
+ this.bootstrap, hostInfo, this.tcpConfig);
+ if (!client.connect(false,
channelTermGen.incrementAndGet())) {
+
this.connFailNodeMap.put(hostInfo.getReferenceName(), hostInfo);
+ client.close(false);
+ continue;
+ }
+ realHosts.add(hostInfo.getReferenceName());
+ tmpClientMaps.put(hostInfo.getReferenceName(), client);
+ if (++selectCnt >= needActiveCnt) {
+ break;
+ }
+ } catch (Throwable ex) {
+ if (updConExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) build client {}
exception",
+ senderId, hostInfo.getReferenceName(), ex);
+ }
+ }
+ }
+ if (!realHosts.isEmpty()) {
+ break;
+ }
+ ProxyUtils.sleepSomeTime(1000L);
+ } while (--maxCycleCnt > 0);
+ // update active nodes
+ if (realHosts.isEmpty()) {
+ if (nodeChanged) {
+ logger.error("ClientMgr({}) changed nodes, but all connect
failure, nodes={}!",
+ this.senderId, candidateNodes);
+ } else {
+ logger.error("ClientMgr({}) re-choose nodes, but all
connect failure, nodes={}!",
+ this.senderId, candidateNodes);
+ }
+ } else {
+ this.lastUpdateTime = System.currentTimeMillis();
+ this.deletingClientMaps = this.usingClientMaps;
+ this.usingClientMaps = tmpClientMaps;
+ this.activeNodes = realHosts;
+ if (nodeChanged) {
+ logger.info("ClientMgr({}) changed nodes, wast {}ms,
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+ senderId, (System.currentTimeMillis() - curTime),
+ needActiveCnt, realHosts.size(), realHosts,
connFailNodeMap.keySet());
+ } else {
+ logger.info("ClientMgr({}) re-choose nodes, wast {}ms,
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+ senderId, (System.currentTimeMillis() - curTime),
+ needActiveCnt, realHosts.size(), realHosts,
connFailNodeMap.keySet());
+ }
+ }
+ } catch (Throwable ex) {
+ if (updConExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) update nodes throw exception",
senderId, ex);
+ }
+ }
+ }
+
+ public boolean reportEvent(SendQos sendQos, TcpNettyClient client,
+ EncodeObject encObject, MsgSendCallback callback, ProcessResult
procResult) {
+ if (!this.started.get()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ long clientTerm = client.getChanTermId();
+ if (sendQos == SendQos.NO_ACK) {
+ // process no ack report
+ if (client.write(clientTerm, encObject, procResult)) {
+ client.decInFlightMsgCnt(clientTerm);
+ }
+ return procResult.isSuccess();
+ }
+ TcpCallFuture newFuture = new TcpCallFuture(encObject,
+ client.getClientAddr(), clientTerm, client.getChanStr(),
callback);
+ TcpCallFuture curFuture =
reqObjects.putIfAbsent(encObject.getMessageId(), newFuture);
+ if (curFuture != null) {
+ if (sendExceptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) found message id {} has existed.",
+ senderId, encObject.getMessageId());
+ }
+ return procResult.setFailResult(ErrorCode.DUPLICATED_MESSAGE_ID);
+ }
+ // build channel -- messageId map
+ ConcurrentHashMap<Integer, Boolean> msgIdMap =
channelMsgIdMap.get(client.getChanStr());
+ if (msgIdMap == null) {
+ ConcurrentHashMap<Integer, Boolean> tmpMsgIdMap = new
ConcurrentHashMap<>();
+ msgIdMap = channelMsgIdMap.putIfAbsent(client.getChanStr(),
tmpMsgIdMap);
+ if (msgIdMap == null) {
+ msgIdMap = tmpMsgIdMap;
+ }
+ }
+ msgIdMap.put(encObject.getMessageId(), newFuture.isAsyncCall());
+ // send message
+ if (newFuture.isAsyncCall()) {
+ // process async report
+ reqTimeouts.put(encObject.getMessageId(),
+ timerObj.newTimeout(new
TimeoutTask(encObject.getMessageId()),
+ tcpConfig.getRequestTimeoutMs(),
TimeUnit.MILLISECONDS));
+ if (!client.write(clientTerm, encObject, procResult)) {
+ Timeout timeout = reqTimeouts.remove(encObject.getMessageId());
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ rmvMsgStubInfo(encObject.getMessageId());
+ }
+ return procResult.setSuccess();
+ } else {
+ // process sync report
+ if (!client.write(clientTerm, encObject, procResult)) {
+ rmvMsgStubInfo(encObject.getMessageId());
+ return false;
+ }
+ boolean retValue = newFuture.get(procResult,
+ tcpConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
+ if (rmvMsgStubInfo(encObject.getMessageId())) {
+ if (procResult.getErrCode() ==
ErrorCode.SEND_WAIT_TIMEOUT.getErrCode()) {
+ client.setBusy(clientTerm);
+ }
+ client.decInFlightMsgCnt(clientTerm);
+ }
+ return retValue;
+ }
+ }
+
+ private boolean rmvMsgStubInfo(int messageId) {
+ TcpCallFuture curFuture = reqObjects.remove(messageId);
+ if (curFuture == null) {
+ return false;
+ }
+ ConcurrentHashMap<Integer, Boolean> msgIdMap =
+ channelMsgIdMap.get(curFuture.getChanStr());
+ if (msgIdMap != null) {
+ msgIdMap.remove(curFuture.getMessageId());
+ }
+ return true;
+ }
+
+ public boolean getClientByRoundRobin(ProcessResult procResult) {
+ if (!this.started.get()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ List<String> curNodes = this.activeNodes;
+ int curNodeSize = curNodes.size();
+ if (curNodeSize == 0) {
+ return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+ }
+ String curNode;
+ TcpNettyClient client;
+ TcpNettyClient back1thClient = null;
+ int nullClientCnt = 0;
+ int unWritableCnt = 0;
+ int startPos = reqSendIndex.getAndIncrement();
+ for (int step = 0; step < curNodeSize; step++) {
+ curNode = curNodes.get(Math.abs(startPos++) % curNodeSize);
+ client = usingClientMaps.get(curNode);
+ if (client == null) {
+ nullClientCnt++;
+ continue;
+ }
+ if (client.isActive()) {
+ if (client.getChannel().isWritable()) {
+ if (tcpConfig.getMaxMsgInFlightPerConn() > 0
+ && client.getMsgInflightCnt() >
tcpConfig.getMaxMsgInFlightPerConn()) {
+ back1thClient = client;
+ } else {
+ client.incClientUsingCnt();
+ return procResult.setSuccess(client);
+ }
+ } else {
+ unWritableCnt++;
+ }
+ }
+ }
+ if (nullClientCnt == curNodeSize) {
+ return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+ } else if (unWritableCnt + nullClientCnt == curNodeSize) {
+ return procResult.setFailResult(ErrorCode.EMPTY_WRITABLE_NODE_SET);
+ }
+ if (back1thClient != null) {
+ back1thClient.incClientUsingCnt();
+ return procResult.setSuccess(back1thClient);
+ }
+ return procResult.setFailResult(ErrorCode.NO_VALID_REMOTE_NODE);
+ }
+
+ public void feedbackMsgResponse(String channelStr, DecodeObject decObject)
{
+ Timeout timeoutTask =
this.reqTimeouts.remove(decObject.getMessageId());
+ if (timeoutTask != null) {
+ timeoutTask.cancel();
+ }
+ TcpCallFuture callFuture =
this.reqObjects.remove(decObject.getMessageId());
+ if (callFuture == null) {
+ return;
+ }
+ long curTime = System.currentTimeMillis();
+ ConcurrentHashMap<Integer, Boolean> inflightMsgIds =
channelMsgIdMap.get(channelStr);
+ if (inflightMsgIds != null) {
+ inflightMsgIds.remove(decObject.getMessageId());
+ }
+ try {
+ callFuture.onMessageAck(decObject.getSendResult());
+ } catch (Throwable ex) {
+ if (callbackExceptCnt.shouldPrint()) {
+ logger.info("ClientMgr({}) response come, callback exception!",
+ senderId, ex);
+ }
+ } finally {
+ this.descInflightMsgCnt(callFuture);
+ }
+ }
+
+ public void setChannelFrozen(String channelStr) {
+ Map<Integer, Boolean> inflightMsgIds = channelMsgIdMap.get(channelStr);
+ if (inflightMsgIds == null) {
+ return;
+ }
+ TcpCallFuture callFuture;
+ TcpNettyClient nettyTcpClient;
+ for (Integer messageId : inflightMsgIds.keySet()) {
+ if (messageId == null) {
+ continue;
+ }
+ callFuture = this.reqObjects.get(messageId);
+ if (callFuture == null) {
+ continue;
+ }
+ // find and process in using clients
+ nettyTcpClient = usingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() ==
callFuture.getChanTerm()) {
+ nettyTcpClient.setFrozen(callFuture.getChanTerm());
+ return;
+ }
+ // find and process in deleting clients
+ nettyTcpClient =
deletingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() ==
callFuture.getChanTerm()) {
+ nettyTcpClient.setFrozen(callFuture.getChanTerm());
+ return;
+ }
+ break;
+ }
+ }
+
+ public void notifyChannelDisconnected(String channelStr) {
+ Map<Integer, Boolean> inflightMsgIds =
+ channelMsgIdMap.remove(channelStr);
+ if (inflightMsgIds == null || inflightMsgIds.isEmpty()) {
+ return;
+ }
+ Timeout timeoutTask;
+ TcpNettyClient nettyTcpClient;
+ long curTime;
+ for (Integer messageId : inflightMsgIds.keySet()) {
+ if (messageId == null) {
+ continue;
+ }
+ timeoutTask = this.reqTimeouts.remove(messageId);
+ if (timeoutTask != null) {
+ timeoutTask.cancel();
+ }
+ TcpCallFuture callFuture = this.reqObjects.remove(messageId);
+ if (callFuture == null) {
+ continue;
+ }
+ curTime = System.currentTimeMillis();
+ // find and process in using clients
+ nettyTcpClient = usingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() ==
callFuture.getChanTerm()) {
+ try {
+ nettyTcpClient.getChannel().eventLoop().execute(
+ () -> callFuture.onMessageAck(new
ProcessResult(ErrorCode.CONNECTION_BREAK)));
+ } catch (Throwable ex) {
+ if (callbackExceptCnt.shouldPrint()) {
+ logger.info("ClientMgr({}) disconnected, callback
exception!",
+ senderId, ex);
+ }
+ } finally {
+ nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ }
+ return;
+ }
+ // find and process in deleting clients
+ nettyTcpClient =
deletingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() ==
callFuture.getChanTerm()) {
+ try {
+ nettyTcpClient.getChannel().eventLoop().execute(
+ () -> callFuture.onMessageAck(new
ProcessResult(ErrorCode.CONNECTION_BREAK)));
+ } catch (Throwable ex) {
+ if (callbackExceptCnt.shouldPrint()) {
+ logger.info("ClientMgr({}) disconnected, callback2
exception!",
+ senderId, ex);
+ }
+ } finally {
+ nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ }
+ }
+ }
+ }
+
+ public String getSenderId() {
+ return this.senderId;
+ }
+
+ public int getNextMsgId() {
+ return messageIdGen.getNextInt();
+ }
+
+ private void descInflightMsgCnt(TcpCallFuture callFuture) {
+ TcpNettyClient nettyTcpClient =
usingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() == callFuture.getChanTerm())
{
+ nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ return;
+ }
+ nettyTcpClient = deletingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() == callFuture.getChanTerm())
{
+ nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ }
+ }
+
+ private Bootstrap buildBootstrap(ThreadFactory selfFactory) {
+ if (selfFactory == null) {
+ selfFactory = new DefaultThreadFactory(
+ "sdk-netty-workers", Thread.currentThread().isDaemon());
+ }
+ EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(
+ tcpConfig.getNettyWorkerThreadNum(),
tcpConfig.isEnableEpollBusyWait(), selfFactory);
+ Bootstrap tmpBootstrap = new Bootstrap();
+ tmpBootstrap.group(eventLoopGroup);
+ tmpBootstrap.option(ChannelOption.TCP_NODELAY, true);
+ tmpBootstrap.option(ChannelOption.SO_REUSEADDR, true);
+
tmpBootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
+ if (tcpConfig.getRcvBufferSize() > 0) {
+ tmpBootstrap.option(ChannelOption.SO_RCVBUF,
tcpConfig.getRcvBufferSize());
+ }
+ if (tcpConfig.getSendBufferSize() > 0) {
+ tmpBootstrap.option(ChannelOption.SO_SNDBUF,
tcpConfig.getSendBufferSize());
+ }
+ tmpBootstrap.handler(new ClientPipelineFactory(this));
+ return tmpBootstrap;
+ }
+
+ private class MaintThread extends Thread {
+
+ private volatile boolean bShutDown;
+
+ public MaintThread() {
+ bShutDown = false;
+ this.setName("ClientMgrMaint-" + senderId);
+ }
+
+ public void shutDown() {
+ logger.info("ClientMgr({}) shutdown MaintThread!", senderId);
+ bShutDown = true;
+ this.interrupt();
+ }
+
+ @Override
+ public void run() {
+ logger.info("ClientMgr({}) start MaintThread!", senderId);
+ long curTime;
+ long checkRound = 0L;
+ boolean printFailNodes;
+ ProcessResult procResult = new ProcessResult();
+ Set<String> failNodes = new HashSet<>();
+ while (!bShutDown) {
+ printFailNodes = ((Math.abs(checkRound++) % 90L) == 0L);
+ try {
+ curTime = System.currentTimeMillis();
+ // clean deleting nodes
+ if (deletingClientMaps != null &&
!deletingClientMaps.isEmpty()) {
+ if (lastUpdateTime > 0
+ && curTime - lastUpdateTime >
tcpConfig.getConCloseWaitPeriodMs()) {
+ for (TcpNettyClient client :
deletingClientMaps.values()) {
+ if (client == null) {
+ continue;
+ }
+ client.close(false);
+ }
+ deletingClientMaps.clear();
+ }
+ }
+ // check and keepalive using nodes
+ for (TcpNettyClient nettyTcpClient :
usingClientMaps.values()) {
+ if (nettyTcpClient == null) {
+ continue;
+ }
+ if (nettyTcpClient.isActive()) {
+ if (nettyTcpClient.isIdleClient(curTime)) {
+ nettyTcpClient.sendHeartBeatMsg(procResult);
+ }
+ } else {
+ if ((nettyTcpClient.getClientUsingCnt() <= 0)
+ && (nettyTcpClient.getMsgInflightCnt() <=
0)
+ || (curTime -
nettyTcpClient.getChanInvalidTime() > tcpConfig
+ .getConCloseWaitPeriodMs())) {
+ nettyTcpClient.reconnect(false,
channelTermGen.incrementAndGet());
+ }
+ }
+ if (printFailNodes) {
+ if (nettyTcpClient.isConFailNodes()) {
+ failNodes.add(nettyTcpClient.getClientAddr());
+ }
+ }
+ }
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("ClientMgr({}) MaintThread throw
exception", senderId, ex);
+ }
+ }
+ if (printFailNodes && !failNodes.isEmpty()) {
+ logger.warn("ClientMgr({}) found continue connect fail
nodes: {}",
+ senderId, failNodes);
+ failNodes.clear();
+ }
+ if (bShutDown) {
+ break;
+ }
+ ProxyUtils.sleepSomeTime(2000L);
+ }
+ logger.info("ClientMgr({}) exit MaintThread!", senderId);
+ }
+ }
+
+ /**
+ * Time out task call back handle
+ */
+ public class TimeoutTask implements TimerTask {
+
+ private final int messageId;
+
+ public TimeoutTask(int messageId) {
+ this.messageId = messageId;
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ Timeout timeoutMsg = reqTimeouts.remove(messageId);
+ if (timeoutMsg != null) {
+ timeoutMsg.cancel();
+ }
+ TcpCallFuture future = reqObjects.remove(messageId);
+ if (future == null) {
+ return;
+ }
+ long curTime = System.currentTimeMillis();
+ ConcurrentHashMap<Integer, Boolean> messageIdMap;
+ // find and process in using clients
+ TcpNettyClient nettyTcpClient =
usingClientMaps.get(future.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() == future.getChanTerm())
{
+ messageIdMap = channelMsgIdMap.get(future.getChanStr());
+ if (messageIdMap != null) {
+ messageIdMap.remove(messageId);
+ }
+ try {
+ nettyTcpClient.getChannel().eventLoop().execute(
+ () -> future.onMessageAck(new
ProcessResult(ErrorCode.SEND_WAIT_TIMEOUT)));
+ } catch (Throwable ex) {
+ if (callbackExceptCnt.shouldPrint()) {
+ logger.info("ClientMgr({}) msg timeout, callback
exception!",
+ senderId, ex);
+ }
+ } finally {
+ nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
+ }
+ return;
+ }
+ // find and process in deleting clients
+ nettyTcpClient = deletingClientMaps.get(future.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() == future.getChanTerm())
{
+ messageIdMap = channelMsgIdMap.get(future.getChanStr());
+ if (messageIdMap != null) {
+ messageIdMap.remove(messageId);
+ }
+ try {
+ nettyTcpClient.getChannel().eventLoop().execute(
+ () -> future.onMessageAck(new
ProcessResult(ErrorCode.SEND_WAIT_TIMEOUT)));
+ } catch (Throwable ex) {
+ if (callbackExceptCnt.shouldPrint()) {
+ logger.info("ClientMgr({}) msg timeout, callback2
exception!",
+ senderId, ex);
+ }
+ } finally {
+ nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
+ }
+ }
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
index d6cafdb85b..f0763b9a76 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
@@ -46,6 +46,7 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
int totalLength;
try {
if (encObject.getMsgType() == MsgType.MSG_ACK_SERVICE) {
+ // msgType(1) + bodyLength(4) + attrsLength(4)
totalLength = 1 + 4 + 4 + encObject.getMsgSize();
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
@@ -59,6 +60,7 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
buf.writeBytes(encObject.getAttrData());
}
} else if (encObject.getMsgType() == MsgType.MSG_MULTI_BODY) {
+ // msgType(1) + bodyLength(4) + attrsLength(4)
totalLength = 1 + 4 + 4 + encObject.getMsgSize();
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
@@ -72,6 +74,9 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
buf.writeBytes(encObject.getAttrData());
}
} else if (encObject.getMsgType() == MsgType.MSG_BIN_MULTI_BODY) {
+ // msgType(1) + groupNum(2) + streamNum(2) + extField(2)
+ // + dataTime(4) + msgCnt(2) + uniqueId(4) + bodyLength(4)
+ // + attrsLength(2) + magic(2)
totalLength = 1 + 2 + 2 + 2 + 4 + 2 + 4 + 4 + 2 + 2 +
encObject.getMsgSize();
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
@@ -92,6 +97,8 @@ public class ProtocolEncoder extends
MessageToMessageEncoder<EncodeObject> {
}
buf.writeShort(0xee01);
} else if (encObject.getMsgType() == MsgType.MSG_BIN_HEARTBEAT) {
+ // msgType(1) + dataTime(4) + version(1) + bodyLength(4)
+ // + attrsLength(2) + magic(2)
totalLength = 1 + 4 + 1 + 4 + 2 +
encObject.getAttrDataLength() + 2;
buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
buf.writeInt(totalLength);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
new file mode 100644
index 0000000000..a5f44f17ee
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Base Sender class:
+ *
+ * Used to manage Sender metadata information, including Sender ID,
+ * status, network interaction, metadata query object, and
+ * Proxy node metadata information obtained from Manager, etc.
+ */
+public abstract class BaseSender implements ConfigHolder {
+
+ private final int SENDER_STATUS_UNINITIALIZED = -2;
+ private final int SENDER_STATUS_INITIALIZING = -1;
+ private final int SENDER_STATUS_STARTED = 0;
+ private final int SENDER_STATUS_CLOSED = 1;
+
+ protected static final Logger logger =
LoggerFactory.getLogger(BaseSender.class);
+ protected static final LogCounter exceptCnt = new LogCounter(10, 100000,
60 * 1000L);
+ // sender id generator
+ private static final AtomicLong senderIdGen = new AtomicLong(0L);
+ //
+ protected final AtomicInteger senderStatus = new
AtomicInteger(SENDER_STATUS_UNINITIALIZED);
+ protected final String senderId;
+ protected final ProxyClientConfig baseConfig;
+ protected ClientMgr clientMgr;
+ protected ProxyConfigManager configManager;
+ private final ReentrantReadWriteLock fsLock = new
ReentrantReadWriteLock(true);
+ // proxy node meta infos
+ private final ConcurrentHashMap<String, HostInfo> proxyNodeInfos = new
ConcurrentHashMap<>();
+ // groupId and streamId num info
+ private volatile int allowedPkgLength = -1;
+ protected volatile boolean idTransNum = false;
+ protected volatile int groupIdNum = 0;
+ private Map<String, Integer> streamIdMap = new HashMap<>();
+
+ protected BaseSender(ProxyClientConfig configure) {
+ this.baseConfig = configure.clone();
+ this.senderId = configure.getDataRptProtocol() + "-" +
senderIdGen.incrementAndGet();
+ }
+
+ public boolean start(ProcessResult procResult) {
+ if (!this.senderStatus.compareAndSet(
+ SENDER_STATUS_UNINITIALIZED, SENDER_STATUS_INITIALIZING)) {
+ return procResult.setFailResult(ErrorCode.OK);
+ }
+ // start client manager
+ if (!this.clientMgr.start(procResult)) {
+ return false;
+ }
+ // query meta info from manager
+ if (!this.configManager.doProxyEntryQueryWork(procResult)) {
+ this.clientMgr.stop();
+ String errInfo = "queryCode=" + procResult.getErrCode()
+ + ", detail=" + procResult.getErrMsg();
+ return
procResult.setFailResult(ErrorCode.FETCH_PROXY_META_FAILURE, errInfo);
+ }
+ if (this.baseConfig.isEnableReportEncrypt()
+ &&
!this.configManager.doEncryptConfigEntryQueryWork(procResult)) {
+ this.clientMgr.stop();
+ String errInfo = "queryCode=" + procResult.getErrCode()
+ + ", detail=" + procResult.getErrMsg();
+ return
procResult.setFailResult(ErrorCode.FETCH_ENCRYPT_META_FAILURE, errInfo);
+ }
+ // start configure manager
+ this.configManager.start();
+ this.senderStatus.set(SENDER_STATUS_STARTED);
+ logger.info("Sender({}) instance started!", senderId);
+ return procResult.setFailResult(ErrorCode.OK);
+ }
+
+ public void close() {
+ int currentStatus = senderStatus.get();
+ if (currentStatus == SENDER_STATUS_CLOSED) {
+ return;
+ }
+ if (!senderStatus.compareAndSet(currentStatus, SENDER_STATUS_CLOSED)) {
+ return;
+ }
+ configManager.shutDown();
+ clientMgr.stop();
+ logger.info("Sender({}) instance stopped!", senderId);
+ }
+
+ @Override
+ public void updateAllowedMaxPkgLength(int maxPkgLength) {
+ this.allowedPkgLength = maxPkgLength;
+ }
+
+ @Override
+ public void updateProxyNodes(boolean nodeChanged, List<HostInfo>
newProxyNodes) {
+ if (this.senderStatus.get() == SENDER_STATUS_CLOSED
+ || newProxyNodes == null || newProxyNodes.isEmpty()) {
+ return;
+ }
+ this.fsLock.writeLock().lock();
+ try {
+ boolean found;
+ List<String> rmvNodes = new ArrayList<>();
+ for (String hostRefName : this.proxyNodeInfos.keySet()) {
+ found = false;
+ for (HostInfo hostInfo : newProxyNodes) {
+ if (hostRefName.equals(hostInfo.getReferenceName())) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ rmvNodes.add(hostRefName);
+ }
+ }
+ for (HostInfo hostInfo : newProxyNodes) {
+ if
(this.proxyNodeInfos.containsKey(hostInfo.getReferenceName())) {
+ continue;
+ }
+ this.proxyNodeInfos.put(hostInfo.getReferenceName(), hostInfo);
+ }
+ for (String rmvNode : rmvNodes) {
+ this.proxyNodeInfos.remove(rmvNode);
+ }
+ clientMgr.updateProxyInfoList(nodeChanged, this.proxyNodeInfos);
+ } finally {
+ this.fsLock.writeLock().unlock();
+ }
+ }
+
+ public boolean isStarted() {
+ return senderStatus.get() == SENDER_STATUS_STARTED;
+ }
+
+ public String getMetaConfigKey() {
+ return this.baseConfig.getGroupMetaConfigKey();
+ }
+
+ public String getSenderId() {
+ return senderId;
+ }
+
+ public ProxyClientConfig getConfigure() {
+ return baseConfig;
+ }
+
+ public int getAllowedPkgLength() {
+ return allowedPkgLength;
+ }
+
+ public String getGroupId() {
+ return baseConfig.getInlongGroupId();
+ }
+
+ public boolean isMetaInfoUnReady() {
+ return this.proxyNodeInfos.isEmpty();
+ }
+
+ public Map<String, HostInfo> getProxyNodeInfos() {
+ return proxyNodeInfos;
+ }
+
+ public int getProxyNodeCnt() {
+ return proxyNodeInfos.size();
+ }
+
+ public abstract int getActiveNodeCnt();
+
+ public abstract int getInflightMsgCnt();
+
+ public void updateGroupIdAndStreamIdNumInfo(
+ int groupIdNum, Map<String, Integer> streamIdMap) {
+ this.groupIdNum = groupIdNum;
+ this.streamIdMap = streamIdMap;
+ if (groupIdNum != 0 && streamIdMap != null && !streamIdMap.isEmpty()) {
+ this.idTransNum = true;
+ }
+ }
+
+ protected int getStreamIdNum(String streamId) {
+ if (idTransNum) {
+ Integer tmpNum = streamIdMap.get(streamId);
+ if (tmpNum != null) {
+ return tmpNum;
+ }
+ }
+ return 0;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
new file mode 100644
index 0000000000..97076dd977
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
+
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
+import org.apache.inlong.sdk.dataproxy.network.tcp.SendQos;
+import org.apache.inlong.sdk.dataproxy.network.tcp.TcpClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.tcp.TcpNettyClient;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.xerial.snappy.Snappy;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * TCP Message Sender class
+ *
+ * Used to define the TCP sender common methods
+ */
+public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender {
+
+ protected static final LogCounter tcpExceptCnt = new LogCounter(10,
100000, 60 * 1000L);
+ private final TcpMsgSenderConfig tcpConfig;
+ private final TcpClientMgr tcpClientMgr;
+
+ protected InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory
selfDefineFactory) {
+ super(configure);
+ this.tcpConfig = (TcpMsgSenderConfig) baseConfig;
+ this.clientMgr = new TcpClientMgr(this.getSenderId(), this.tcpConfig,
selfDefineFactory);
+ this.tcpClientMgr = (TcpClientMgr) clientMgr;
+ }
+
+ @Override
+ public boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult
procResult) {
+ if (eventInfo == null) {
+ throw new NullPointerException("eventInfo is null");
+ }
+ if (procResult == null) {
+ throw new NullPointerException("procResult is null");
+ }
+ if (!this.isStarted()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ return processEvent(SendQos.NO_ACK, eventInfo, null, procResult);
+ }
+
+ @Override
+ public boolean syncSendMessage(boolean sendInB2B,
+ TcpEventInfo eventInfo, ProcessResult procResult) {
+ if (eventInfo == null) {
+ throw new NullPointerException("eventInfo is null");
+ }
+ if (procResult == null) {
+ throw new NullPointerException("procResult is null");
+ }
+ if (!this.isStarted()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ if (sendInB2B) {
+ return processEvent(SendQos.SOURCE_ACK, eventInfo, null,
procResult);
+ } else {
+ return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
+ }
+ }
+
+ @Override
+ public boolean asyncSendMessage(boolean sendInB2B,
+ TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult
procResult) {
+ if (eventInfo == null) {
+ throw new NullPointerException("eventInfo is null");
+ }
+ if (callback == null) {
+ throw new NullPointerException("callback is null");
+ }
+ if (procResult == null) {
+ throw new NullPointerException("procResult is null");
+ }
+ if (!this.isStarted()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ if (sendInB2B) {
+ return processEvent(SendQos.SOURCE_ACK, eventInfo, callback,
procResult);
+ } else {
+ return processEvent(SendQos.SINK_ACK, eventInfo, callback,
procResult);
+ }
+ }
+
+ @Override
+ public int getActiveNodeCnt() {
+ return tcpClientMgr.getActiveNodeCnt();
+ }
+
+ @Override
+ public int getInflightMsgCnt() {
+ return tcpClientMgr.getInflightMsgCnt();
+ }
+
+ private boolean processEvent(SendQos sendQos,
+ TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult
procResult) {
+ if (this.isMetaInfoUnReady()) {
+ return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
+ }
+ EncodeObject encObject = new EncodeObject(eventInfo.getGroupId(),
+ eventInfo.getStreamId(), tcpConfig.getSdkMsgType(),
eventInfo.getDtMs());
+ // pre-process attributes
+ processEventAttrsInfo(sendQos, eventInfo, encObject);
+ // check package length
+ if (!isValidPkgLength(encObject.getAttrDataLength(),
+ eventInfo.getBodySize(), this.getAllowedPkgLength(),
procResult)) {
+ return false;
+ }
+ // process body
+ if (!procEventBodyInfo(eventInfo, procResult, encObject)) {
+ return false;
+ }
+ // get client object
+ if (!tcpClientMgr.getClientByRoundRobin(procResult)) {
+ return false;
+ }
+ TcpNettyClient client = (TcpNettyClient) procResult.getRetData();
+ encObject.setMessageIdInfo(tcpClientMgr.getNextMsgId());
+ try {
+ return tcpClientMgr.reportEvent(sendQos, client, encObject,
callback, procResult);
+ } finally {
+ client.decClientUsingCnt();
+ }
+ }
+
+ private boolean isValidPkgLength(
+ int attrLength, int bodyLength, int allowedLen, ProcessResult
procResult) {
+ // Not valid if the maximum limit is less than or equal to 0
+ if (allowedLen < 0) {
+ return true;
+ }
+ // Reserve space for attribute
+ if (bodyLength + attrLength > allowedLen -
SdkConsts.RESERVED_ATTRIBUTE_LENGTH) {
+ String errMsg = String.format("OverMaxLen: attrLen(%d) +
bodyLen(%d) > allowedLen(%d) - fixedLen(%d)",
+ attrLength, bodyLength, allowedLen,
SdkConsts.RESERVED_ATTRIBUTE_LENGTH);
+ if (tcpExceptCnt.shouldPrint()) {
+ logger.warn(errMsg);
+ }
+ return
procResult.setFailResult(ErrorCode.REPORT_INFO_EXCEED_MAX_LEN, errMsg);
+ }
+ return true;
+ }
+
+ private void processEventAttrsInfo(
+ SendQos sendQos, TcpEventInfo eventInfo, EncodeObject
encodeObject) {
+ // get msgType
+ int intMsgType = encodeObject.getMsgType().getValue();
+ boolean enableDataComp = tcpConfig.isEnableDataCompress()
+ && eventInfo.getBodySize() >=
tcpConfig.getMinCompEnableLength();
+ // add fixed attributes
+ Map<String, String> newAttrs = new HashMap<>(eventInfo.getAttrs());
+ newAttrs.put(AttributeConstants.MSG_RPT_TIME,
String.valueOf(encodeObject.getRtms()));
+ newAttrs.put(AttributeConstants.PROXY_SDK_VERSION,
ProxyUtils.getJarVersion());
+
+ if (sendQos == SendQos.NO_ACK) {
+ newAttrs.put(AttributeConstants.MESSAGE_IS_ACK,
String.valueOf(true));
+ } else if (sendQos == SendQos.SINK_ACK) {
+ newAttrs.put(AttributeConstants.MESSAGE_PROXY_SEND,
String.valueOf(true));
+ }
+ if (tcpConfig.getSdkMsgType() == MsgType.MSG_ACK_SERVICE
+ || tcpConfig.getSdkMsgType() == MsgType.MSG_MULTI_BODY) {
+ // add msgType 3/5 attributes
+ newAttrs.put(AttributeConstants.GROUP_ID, eventInfo.getGroupId());
+ newAttrs.put(AttributeConstants.STREAM_ID,
eventInfo.getStreamId());
+ newAttrs.put(AttributeConstants.DATA_TIME,
String.valueOf(eventInfo.getDtMs()));
+ newAttrs.put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(eventInfo.getMsgCnt()));
+ if (enableDataComp) {
+ newAttrs.put(AttributeConstants.COMPRESS_TYPE, "snappy");
+ }
+ } else {
+ // add msgType 7 attributes
+ if (enableDataComp) {
+ intMsgType |= SdkConsts.FLAG_ALLOW_COMPRESS;
+ }
+ int extField = 0;
+ if (tcpConfig.isSeparateEventByLF()) {
+ extField |= SdkConsts.EXT_FIELD_FLAG_SEP_BY_LF;
+ }
+ boolean id2Num = false;
+ int streamIdNum = 0;
+ if (this.idTransNum && (this.groupIdNum != 0)
+ &&
eventInfo.getGroupId().equals(tcpConfig.getInlongGroupId())) {
+ streamIdNum = getStreamIdNum(eventInfo.getStreamId());
+ id2Num = streamIdNum != 0;
+ }
+ if (id2Num) {
+ encodeObject.setGroupAndStreamId2Num(this.groupIdNum,
streamIdNum);
+ } else {
+ extField |= SdkConsts.EXT_FIELD_FLAG_DISABLE_ID2NUM;
+ newAttrs.put(AttributeConstants.GROUP_ID,
eventInfo.getGroupId());
+ newAttrs.put(AttributeConstants.STREAM_ID,
eventInfo.getStreamId());
+ }
+ encodeObject.setExtField(extField);
+ }
+ byte[] aesKey = null;
+ // set encrypt attributes
+ if (tcpConfig.isEnableReportEncrypt()) {
+ EncryptConfigEntry encryptEntry =
configManager.getUserEncryptConfigEntry();
+ newAttrs.put("_userName", tcpConfig.getRptUserName());
+ newAttrs.put("_encyVersion", encryptEntry.getVersion());
+ newAttrs.put("_encyAesKey", encryptEntry.getRsaEncryptedKey());
+ aesKey = encryptEntry.getAesKey();
+ intMsgType |= SdkConsts.FLAG_ALLOW_ENCRYPT;
+ }
+ encodeObject.setAttrInfo(intMsgType, enableDataComp, aesKey, newAttrs);
+ }
+
+ private boolean procEventBodyInfo(TcpEventInfo eventInfo, ProcessResult
procResult, EncodeObject encObject) {
+ // encode message body
+ byte[] body = encBodyList(senderId, encObject.getMsgType(),
+ tcpConfig.isSeparateEventByLF(), eventInfo, procResult);
+ if (body == null) {
+ return false;
+ }
+ // compress body
+ if (encObject.isCompress()) {
+ body = compressBodyInfo(senderId, body, procResult);
+ if (body == null) {
+ return false;
+ }
+ }
+ // encrypt body
+ if (tcpConfig.isEnableReportEncrypt()) {
+ body = aesEncryptBodyInfo(senderId, body, encObject.getAesKey(),
procResult);
+ if (body == null) {
+ return false;
+ }
+ }
+ encObject.setBodyData(eventInfo.getMsgCnt(), body);
+ return true;
+ }
+
+ private byte[] encBodyList(String senderId,
+ MsgType msgType, boolean sepByLF, TcpEventInfo eventInfo,
ProcessResult procResult) {
+ try {
+ int totalCnt = 0;
+ ByteArrayOutputStream bodyOut = new ByteArrayOutputStream();
+ if (msgType == MsgType.MSG_ACK_SERVICE) {
+ for (byte[] entry : eventInfo.getBodyList()) {
+ if (totalCnt++ > 0) {
+
bodyOut.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
+ }
+ bodyOut.write(entry);
+ }
+ } else if (msgType == MsgType.MSG_MULTI_BODY) {
+ for (byte[] entry : eventInfo.getBodyList()) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+ byteBuffer.putInt(entry.length);
+ bodyOut.write(byteBuffer.array());
+ bodyOut.write(entry);
+ }
+ } else {
+ if (sepByLF) {
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
+ for (byte[] entry : eventInfo.getBodyList()) {
+ if (totalCnt++ > 0) {
+
data.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
+ }
+ data.write(entry);
+ }
+ ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+ dataBuffer.putInt(data.toByteArray().length);
+ bodyOut.write(dataBuffer.array());
+ bodyOut.write(data.toByteArray());
+ } else {
+ for (byte[] entry : eventInfo.getBodyList()) {
+ ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+ dataBuffer.putInt(entry.length);
+ bodyOut.write(dataBuffer.array());
+ bodyOut.write(entry);
+ }
+ }
+ }
+ return bodyOut.toByteArray();
+ } catch (Throwable ex) {
+ procResult.setFailResult(ErrorCode.ENCODE_BODY_EXCEPTION,
ex.getMessage());
+ if (tcpExceptCnt.shouldPrint()) {
+ logger.warn("Sender({}) encode body exception", senderId, ex);
+ }
+ return null;
+ }
+ }
+
+ private byte[] compressBodyInfo(String senderId, byte[] body,
ProcessResult procResult) {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ out.write(body);
+ int guessLen = Snappy.maxCompressedLength(out.size());
+ byte[] tmpData = new byte[guessLen];
+ int len = Snappy.compress(out.toByteArray(), 0, out.size(),
+ tmpData, 0);
+ body = new byte[len];
+ System.arraycopy(tmpData, 0, body, 0, len);
+ return body;
+ } catch (Throwable ex) {
+ procResult.setFailResult(ErrorCode.COMPRESS_BODY_EXCEPTION,
ex.getMessage());
+ if (tcpExceptCnt.shouldPrint()) {
+ logger.warn("Sender({}) compress body exception", senderId,
ex);
+ }
+ return null;
+ }
+ }
+
+ private byte[] aesEncryptBodyInfo(String senderId,
+ byte[] plainText, byte[] aesKey, ProcessResult procResult) {
+ try {
+ SecretKeySpec secretKeySpec = new SecretKeySpec(aesKey,
EncryptUtil.AES);
+ Cipher cipher = Cipher.getInstance(EncryptUtil.AES);
+ cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec);
+ return cipher.doFinal(plainText);
+ } catch (Throwable ex) {
+ procResult.setFailResult(ErrorCode.ENCRYPT_BODY_EXCEPTION,
ex.getMessage());
+ if (tcpExceptCnt.shouldPrint()) {
+ logger.warn("Sender({}) aesEncrypt body exception", senderId,
ex);
+ }
+ return null;
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index bc229b744b..2eace638a2 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -57,7 +57,7 @@ public class ProxyUtils {
private static String sdkVersion;
static {
- localHost = getLocalIp();
+ getLocalIp();
getJarVersion();
Collections.addAll(SdkReservedWords,
AttributeConstants.GROUP_ID, AttributeConstants.STREAM_ID,
@@ -68,13 +68,12 @@ public class ProxyUtils {
AttributeConstants.NODE_IP, AttributeConstants.MESSAGE_ID,
AttributeConstants.MESSAGE_IS_ACK,
AttributeConstants.MESSAGE_PROXY_SEND,
AttributeConstants.MESSAGE_PROCESS_ERRCODE,
AttributeConstants.MESSAGE_PROCESS_ERRMSG,
- AttributeConstants.MSG_RPT_TIME,
AttributeConstants.AUDIT_VERSION,
- AttributeConstants.PROXY_SDK_VERSION, KEY_FILE_STATUS_CHECK,
- KEY_SECRET_ID, KEY_SIGNATURE, KEY_TIME_STAMP, KEY_NONCE,
KEY_USERNAME,
- KEY_CLIENT_IP, KEY_ENCY_VERSION, KEY_ENCY_AES_KEY);
+ AttributeConstants.MSG_RPT_TIME,
AttributeConstants.PROXY_SDK_VERSION,
+ KEY_FILE_STATUS_CHECK, KEY_SECRET_ID, KEY_SIGNATURE,
KEY_TIME_STAMP,
+ KEY_NONCE, KEY_USERNAME, KEY_CLIENT_IP, KEY_ENCY_VERSION,
KEY_ENCY_AES_KEY);
/*
* Collections.addAll(SdkReservedWords, "groupId", "streamId", "dt",
"msgUUID", "cp", "cnt", "mt", "m", "sid",
- * "t", "NodeIP", "messageId", "isAck", "proxySend", "errCode",
"errMsg", "rtms", "sdkVersion", "auditVersion",
+ * "t", "NodeIP", "messageId", "isAck", "proxySend", "errCode",
"errMsg", "rtms", "sdkVersion",
* "_file_status_check", "_secretId", "_signature", "_timeStamp",
"_nonce", "_userName", "_clientIP",
* "_encyVersion", "_encyAesKey");
*/