This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b10872e58f [INLONG-11692][SDK] The metadata update function abstracted
to ConfigHolder (#11693)
b10872e58f is described below
commit b10872e58fdc08a30670be1a5da2f6361d58529c
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 21 10:03:59 2025 +0800
[INLONG-11692][SDK] The metadata update function abstracted to ConfigHolder
(#11693)
Co-authored-by: gosonzhang <[email protected]>
---
.../apache/inlong/agent/core/HeartbeatManager.java | 2 +-
.../plugin/sinks/filecollect/SenderManager.java | 2 +-
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 1 +
.../inlong/sdk/dataproxy/config/ConfigHolder.java | 32 ++++++++++++++++++++++
.../sdk/dataproxy/config/ProxyConfigManager.java | 22 +++++++++------
.../sdk/dataproxy/example/HttpClientExample.java | 2 +-
.../sdk/dataproxy/example/TcpClientExample.java | 2 +-
.../sdk/dataproxy/http/InternalHttpSender.java | 1 +
.../sdk/dataproxy/network/ClientHandler.java | 4 +--
.../dataproxy/network/ClientPipelineFactory.java | 4 +--
.../network/{ClientMgr.java => DefClientMgr.java} | 19 +++++++++----
.../sdk/dataproxy/network/HttpProxySender.java | 2 +-
.../inlong/sdk/dataproxy/network/NettyClient.java | 2 +-
.../inlong/sdk/dataproxy/network/Sender.java | 8 +++---
.../{ => sender}/http/HttpMsgSenderConfig.java | 2 +-
.../sdk/dataproxy/sender/tcp/TcpEventInfo.java | 2 +-
.../{ => sender/tcp}/TcpMsgSenderConfig.java | 2 +-
.../sdk/dataproxy/threads/MetricWorkerThread.java | 2 +-
.../sdk/dataproxy/threads/TimeoutScanThread.java | 2 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 2 +-
.../sdk/dataproxy/ProxyClientConfigTest.java | 2 +-
.../sdk/dataproxy/ProxyConfigManagerTest.java | 5 ++--
.../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 2 +-
23 files changed, 86 insertions(+), 38 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 22ce0f0f3a..870993a6f2 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -29,7 +29,7 @@ import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.commons.lang3.StringUtils;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index aff97981e7..b2303feafe 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -31,10 +31,10 @@ import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 290b856ebe..52f2116e88 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -29,6 +29,7 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ConfigHolder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ConfigHolder.java
new file mode 100644
index 0000000000..8489a3996c
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ConfigHolder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.config;
+
+import java.util.List;
+
+/**
+ * Configure Holder:
+ *
+ * Used to hold DataProxy meta configures
+ */
+public interface ConfigHolder {
+
+ void updateAllowedMaxPkgLength(int maxPkgLength);
+
+ void updateProxyNodes(boolean nodeChanged, List<HostInfo> newProxyNodes);
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index b587bfff0d..ff6878755b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -24,7 +24,6 @@ import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
-import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
@@ -101,7 +100,7 @@ public class ProxyConfigManager extends Thread {
private final String callerId;
private final Gson gson = new Gson();
- private final ClientMgr clientManager;
+ private final ConfigHolder configHolder;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private final AtomicBoolean shutDown = new AtomicBoolean(false);
// proxy configure info
@@ -125,13 +124,13 @@ public class ProxyConfigManager extends Thread {
this("MetaQuery", configure, null);
}
- public ProxyConfigManager(String callerId, ProxyClientConfig configure,
ClientMgr clientManager) {
+ public ProxyConfigManager(String callerId, ProxyClientConfig configure,
ConfigHolder configHolder) {
this.callerId = callerId;
- this.clientManager = clientManager;
+ this.configHolder = configHolder;
if (configure != null) {
this.storeAndBuildMetaConfigure(configure);
}
- if (this.clientManager != null) {
+ if (this.configHolder != null) {
this.setName("ConfigManager-" + this.callerId);
logger.info("ConfigManager({}) started, groupId={}",
this.callerId, mgrConfig.getInlongGroupId());
@@ -148,7 +147,7 @@ public class ProxyConfigManager extends Thread {
if (this.shutDown.get()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- if (this.clientManager != null) {
+ if (this.configHolder != null) {
return procResult.setFailResult(ErrorCode.ILLEGAL_CALL_STATE);
}
this.storeAndBuildMetaConfigure(configure);
@@ -156,7 +155,7 @@ public class ProxyConfigManager extends Thread {
}
public void shutDown() {
- if (clientManager == null) {
+ if (this.configHolder == null) {
return;
}
if (shutDown.compareAndSet(false, true)) {
@@ -479,6 +478,7 @@ public class ProxyConfigManager extends Thread {
newProxyNodeList.addAll(proxyInfoList);
} else {
this.proxyConfigEntry = proxyEntry;
+
configHolder.updateAllowedMaxPkgLength(proxyEntry.getMaxPacketLength());
newSwitchStat = proxyEntry.getSwitchStat();
newProxyNodeList = new ArrayList<>(proxyEntry.getSize());
for (Map.Entry<String, HostInfo> entry :
proxyEntry.getHostMap().entrySet()) {
@@ -491,7 +491,7 @@ public class ProxyConfigManager extends Thread {
if (nodeChanged || newSwitchStat != oldStat
|| (System.currentTimeMillis() - lstUpdateTime) >=
mgrConfig.getForceReChooseInrMs()) {
proxyInfoList = newProxyNodeList;
- clientManager.updateProxyInfoList(nodeChanged, proxyInfoList);
+ configHolder.updateProxyNodes(nodeChanged, proxyInfoList);
lstUpdateTime = System.currentTimeMillis();
oldStat = newSwitchStat;
}
@@ -813,6 +813,12 @@ public class ProxyConfigManager extends Thread {
private void storeAndBuildMetaConfigure(ProxyClientConfig config) {
this.mgrConfig = config;
+ this.proxyConfigEntry = null;
+ this.proxyInfoList.clear();
+ this.oldStat = 0;
+ this.localMd5 = null;
+ this.lstUpdateTime = 0;
+ this.userEncryptConfigEntry = null;
StringBuilder strBuff = new StringBuilder(512);
this.proxyConfigVisitUrl = strBuff
.append(mgrConfig.isVisitMgrByHttps() ? SdkConsts.PREFIX_HTTPS
: SdkConsts.PREFIX_HTTP)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 8db8fdab6c..502932d17d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
-import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.network.HttpProxySender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import java.util.ArrayList;
import java.util.List;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index 098ae51536..ab7e674f80 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy.example;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
index 8a558ddc13..51452d528a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
index 4ded287c8c..f00e209f7d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
@@ -32,9 +32,9 @@ public class ClientHandler extends
SimpleChannelInboundHandler<EncodeObject> {
private static final LogCounter thrownCnt = new LogCounter(10, 100000, 60
* 1000L);
private final Sender sender;
- private final ClientMgr clientMgr;
+ private final DefClientMgr clientMgr;
- public ClientHandler(Sender sender, ClientMgr clientMgr) {
+ public ClientHandler(Sender sender, DefClientMgr clientMgr) {
this.sender = sender;
this.clientMgr = clientMgr;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
index a23a06d894..44afa5f3a1 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
@@ -26,10 +26,10 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
- private final ClientMgr clientMgr;
+ private final DefClientMgr clientMgr;
private final Sender sender;
- public ClientPipelineFactory(ClientMgr clientMgr, Sender sender) {
+ public ClientPipelineFactory(DefClientMgr clientMgr, Sender sender) {
this.clientMgr = clientMgr;
this.sender = sender;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/DefClientMgr.java
similarity index 97%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/DefClientMgr.java
index 6fb1caf59f..ad54864659 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/DefClientMgr.java
@@ -17,14 +17,15 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
@@ -51,9 +52,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class ClientMgr {
+public class DefClientMgr implements ConfigHolder {
- private static final Logger logger =
LoggerFactory.getLogger(ClientMgr.class);
+ private static final Logger logger =
LoggerFactory.getLogger(DefClientMgr.class);
private static final LogCounter logCounter = new LogCounter(10, 100000, 60
* 1000L);
private static final LogCounter updConExptCnt = new LogCounter(10, 100000,
60 * 1000L);
private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
@@ -83,14 +84,14 @@ public class ClientMgr {
/**
* Build up the connection between the server and client.
*/
- public ClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender) {
+ public DefClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender) {
this(tcpConfig, sender, null);
}
/**
* Build up the connection between the server and client.
*/
- public ClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender,
ThreadFactory selfDefineFactory) {
+ public DefClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender,
ThreadFactory selfDefineFactory) {
this.tcpConfig = tcpConfig;
this.sender = sender;
// Initialize the bootstrap
@@ -301,7 +302,13 @@ public class ClientMgr {
}
}
- public void updateProxyInfoList(boolean nodeChanged, List<HostInfo>
newNodes) {
+ @Override
+ public void updateAllowedMaxPkgLength(int maxPkgLength) {
+ //
+ }
+
+ @Override
+ public void updateProxyNodes(boolean nodeChanged, List<HostInfo> newNodes)
{
if (newNodes == null || newNodes.isEmpty() || !this.started.get()) {
return;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index eec1326747..1b5653bc46 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -23,8 +23,8 @@ import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
-import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
import org.slf4j.Logger;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
index 30cb4c6567..070e36b09c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
@@ -17,9 +17,9 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import io.netty.bootstrap.Bootstrap;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 0d8115dc4f..1873ae979f 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -17,12 +17,12 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -64,7 +64,7 @@ public class Sender {
private final AtomicInteger currentBufferSize = new AtomicInteger(0);
private final TimeoutScanThread scanThread;
private final AtomicBoolean started = new AtomicBoolean(false);
- private final ClientMgr clientMgr;
+ private final DefClientMgr clientMgr;
private final String instanceId;
private final TcpMsgSenderConfig tcpConfig;
private MetricWorkerThread metricWorker = null;
@@ -82,7 +82,7 @@ public class Sender {
this.instanceId = "sender-" + senderIdGen.incrementAndGet();
this.asyncCallbackMaxSize = tcpConfig.getTotalAsyncCallbackSize();
this.threadPool = Executors.newCachedThreadPool();
- this.clientMgr = new ClientMgr(tcpConfig, this, selfDefineFactory);
+ this.clientMgr = new DefClientMgr(tcpConfig, this, selfDefineFactory);
this.scanThread = new TimeoutScanThread(this, tcpConfig);
if (tcpConfig.isEnableMetric()) {
metricWorker = new MetricWorkerThread(tcpConfig, this);
@@ -553,7 +553,7 @@ public class Sender {
return callbacks;
}
- public ClientMgr getClientMgr() {
+ public DefClientMgr getClientMgr() {
return clientMgr;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/HttpMsgSenderConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
similarity index 99%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/HttpMsgSenderConfig.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
index 4833968d18..6c7e5ac8b0 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/HttpMsgSenderConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy.http;
+package org.apache.inlong.sdk.dataproxy.sender.http;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.common.HttpContentType;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
index dfa255aab6..a51f41a105 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
@@ -27,7 +27,7 @@ import java.util.List;
import java.util.Map;
/**
- * HTTP Event Information class
+ * TCP Event Information class
*
* Used to encapsulate the data information reported by TCP
*/
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/TcpMsgSenderConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
similarity index 99%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/TcpMsgSenderConfig.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
index b9054c4084..c30207347e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/TcpMsgSenderConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy;
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 6ed3f55c91..122cda2eff 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sdk.dataproxy.threads;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
@@ -26,6 +25,7 @@ import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
index f7d4064896..5cb5131a83 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
@@ -17,11 +17,11 @@
package org.apache.inlong.sdk.dataproxy.threads;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.QueueObject;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.TimeScanObject;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import io.netty.channel.Channel;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 61dbb263df..bc229b744b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -19,8 +19,8 @@ package org.apache.inlong.sdk.dataproxy.utils;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
index 7165b2c8ad..0173651169 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
index da34136449..22ac070e7b 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -20,7 +20,8 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
-import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.DefClientMgr;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -36,7 +37,7 @@ public class ProxyConfigManagerTest {
Objects.requireNonNull(this.getClass().getClassLoader().getResource("proxylist.json")).toURI())
.toString();
private final TcpMsgSenderConfig clientConfig =
PowerMockito.mock(TcpMsgSenderConfig.class);
- private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class);
+ private final DefClientMgr clientMgr =
PowerMockito.mock(DefClientMgr.class);
private final ProxyConfigManager proxyConfigManager;
public ProxyConfigManagerTest() throws URISyntaxException {
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 9bf6be2014..af7a62fb6d 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -18,9 +18,9 @@
package org.apache.inlong.sdk.dirtydata;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import com.google.common.base.Preconditions;
import lombok.Builder;