This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b10872e58f [INLONG-11692][SDK] The metadata update function abstracted 
to ConfigHolder (#11693)
b10872e58f is described below

commit b10872e58fdc08a30670be1a5da2f6361d58529c
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 21 10:03:59 2025 +0800

    [INLONG-11692][SDK] The metadata update function abstracted to ConfigHolder 
(#11693)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../apache/inlong/agent/core/HeartbeatManager.java |  2 +-
 .../plugin/sinks/filecollect/SenderManager.java    |  2 +-
 .../inlong/sdk/dataproxy/DefaultMessageSender.java |  1 +
 .../inlong/sdk/dataproxy/config/ConfigHolder.java  | 32 ++++++++++++++++++++++
 .../sdk/dataproxy/config/ProxyConfigManager.java   | 22 +++++++++------
 .../sdk/dataproxy/example/HttpClientExample.java   |  2 +-
 .../sdk/dataproxy/example/TcpClientExample.java    |  2 +-
 .../sdk/dataproxy/http/InternalHttpSender.java     |  1 +
 .../sdk/dataproxy/network/ClientHandler.java       |  4 +--
 .../dataproxy/network/ClientPipelineFactory.java   |  4 +--
 .../network/{ClientMgr.java => DefClientMgr.java}  | 19 +++++++++----
 .../sdk/dataproxy/network/HttpProxySender.java     |  2 +-
 .../inlong/sdk/dataproxy/network/NettyClient.java  |  2 +-
 .../inlong/sdk/dataproxy/network/Sender.java       |  8 +++---
 .../{ => sender}/http/HttpMsgSenderConfig.java     |  2 +-
 .../sdk/dataproxy/sender/tcp/TcpEventInfo.java     |  2 +-
 .../{ => sender/tcp}/TcpMsgSenderConfig.java       |  2 +-
 .../sdk/dataproxy/threads/MetricWorkerThread.java  |  2 +-
 .../sdk/dataproxy/threads/TimeoutScanThread.java   |  2 +-
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |  2 +-
 .../sdk/dataproxy/ProxyClientConfigTest.java       |  2 +-
 .../sdk/dataproxy/ProxyConfigManagerTest.java      |  5 ++--
 .../inlong/sdk/dirtydata/InlongSdkDirtySender.java |  2 +-
 23 files changed, 86 insertions(+), 38 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 22ce0f0f3a..870993a6f2 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -29,7 +29,7 @@ import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.commons.lang3.StringUtils;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index aff97981e7..b2303feafe 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -31,10 +31,10 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 import org.slf4j.Logger;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 290b856ebe..52f2116e88 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -29,6 +29,7 @@ import 
org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.network.Sender;
 import org.apache.inlong.sdk.dataproxy.network.SequentialID;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ConfigHolder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ConfigHolder.java
new file mode 100644
index 0000000000..8489a3996c
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ConfigHolder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.config;
+
+import java.util.List;
+
+/**
+ * Configure Holder:
+ *
+ * Used to hold DataProxy meta configures
+ */
+public interface ConfigHolder {
+
+    void updateAllowedMaxPkgLength(int maxPkgLength);
+
+    void updateProxyNodes(boolean nodeChanged, List<HostInfo> newProxyNodes);
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index b587bfff0d..ff6878755b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -24,7 +24,6 @@ import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
-import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
@@ -101,7 +100,7 @@ public class ProxyConfigManager extends Thread {
 
     private final String callerId;
     private final Gson gson = new Gson();
-    private final ClientMgr clientManager;
+    private final ConfigHolder configHolder;
     private final ThreadLocalRandom random = ThreadLocalRandom.current();
     private final AtomicBoolean shutDown = new AtomicBoolean(false);
     // proxy configure info
@@ -125,13 +124,13 @@ public class ProxyConfigManager extends Thread {
         this("MetaQuery", configure, null);
     }
 
-    public ProxyConfigManager(String callerId, ProxyClientConfig configure, 
ClientMgr clientManager) {
+    public ProxyConfigManager(String callerId, ProxyClientConfig configure, 
ConfigHolder configHolder) {
         this.callerId = callerId;
-        this.clientManager = clientManager;
+        this.configHolder = configHolder;
         if (configure != null) {
             this.storeAndBuildMetaConfigure(configure);
         }
-        if (this.clientManager != null) {
+        if (this.configHolder != null) {
             this.setName("ConfigManager-" + this.callerId);
             logger.info("ConfigManager({}) started, groupId={}",
                     this.callerId, mgrConfig.getInlongGroupId());
@@ -148,7 +147,7 @@ public class ProxyConfigManager extends Thread {
         if (this.shutDown.get()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
-        if (this.clientManager != null) {
+        if (this.configHolder != null) {
             return procResult.setFailResult(ErrorCode.ILLEGAL_CALL_STATE);
         }
         this.storeAndBuildMetaConfigure(configure);
@@ -156,7 +155,7 @@ public class ProxyConfigManager extends Thread {
     }
 
     public void shutDown() {
-        if (clientManager == null) {
+        if (this.configHolder == null) {
             return;
         }
         if (shutDown.compareAndSet(false, true)) {
@@ -479,6 +478,7 @@ public class ProxyConfigManager extends Thread {
             newProxyNodeList.addAll(proxyInfoList);
         } else {
             this.proxyConfigEntry = proxyEntry;
+            
configHolder.updateAllowedMaxPkgLength(proxyEntry.getMaxPacketLength());
             newSwitchStat = proxyEntry.getSwitchStat();
             newProxyNodeList = new ArrayList<>(proxyEntry.getSize());
             for (Map.Entry<String, HostInfo> entry : 
proxyEntry.getHostMap().entrySet()) {
@@ -491,7 +491,7 @@ public class ProxyConfigManager extends Thread {
         if (nodeChanged || newSwitchStat != oldStat
                 || (System.currentTimeMillis() - lstUpdateTime) >= 
mgrConfig.getForceReChooseInrMs()) {
             proxyInfoList = newProxyNodeList;
-            clientManager.updateProxyInfoList(nodeChanged, proxyInfoList);
+            configHolder.updateProxyNodes(nodeChanged, proxyInfoList);
             lstUpdateTime = System.currentTimeMillis();
             oldStat = newSwitchStat;
         }
@@ -813,6 +813,12 @@ public class ProxyConfigManager extends Thread {
 
     private void storeAndBuildMetaConfigure(ProxyClientConfig config) {
         this.mgrConfig = config;
+        this.proxyConfigEntry = null;
+        this.proxyInfoList.clear();
+        this.oldStat = 0;
+        this.localMd5 = null;
+        this.lstUpdateTime = 0;
+        this.userEncryptConfigEntry = null;
         StringBuilder strBuff = new StringBuilder(512);
         this.proxyConfigVisitUrl = strBuff
                 .append(mgrConfig.isVisitMgrByHttps() ? SdkConsts.PREFIX_HTTPS 
: SdkConsts.PREFIX_HTTP)
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 8db8fdab6c..502932d17d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.sdk.dataproxy.example;
 
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
-import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.network.HttpProxySender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index 098ae51536..ab7e674f80 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.sdk.dataproxy.example;
 
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
index 8a558ddc13..51452d528a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
 import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
index 4ded287c8c..f00e209f7d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientHandler.java
@@ -32,9 +32,9 @@ public class ClientHandler extends 
SimpleChannelInboundHandler<EncodeObject> {
     private static final LogCounter thrownCnt = new LogCounter(10, 100000, 60 
* 1000L);
 
     private final Sender sender;
-    private final ClientMgr clientMgr;
+    private final DefClientMgr clientMgr;
 
-    public ClientHandler(Sender sender, ClientMgr clientMgr) {
+    public ClientHandler(Sender sender, DefClientMgr clientMgr) {
         this.sender = sender;
         this.clientMgr = clientMgr;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
index a23a06d894..44afa5f3a1 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientPipelineFactory.java
@@ -26,10 +26,10 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
 
-    private final ClientMgr clientMgr;
+    private final DefClientMgr clientMgr;
     private final Sender sender;
 
-    public ClientPipelineFactory(ClientMgr clientMgr, Sender sender) {
+    public ClientPipelineFactory(DefClientMgr clientMgr, Sender sender) {
         this.clientMgr = clientMgr;
         this.sender = sender;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/DefClientMgr.java
similarity index 97%
rename from 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
rename to 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/DefClientMgr.java
index 6fb1caf59f..ad54864659 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/DefClientMgr.java
@@ -17,14 +17,15 @@
 
 package org.apache.inlong.sdk.dataproxy.network;
 
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
@@ -51,9 +52,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class ClientMgr {
+public class DefClientMgr implements ConfigHolder {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ClientMgr.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(DefClientMgr.class);
     private static final LogCounter logCounter = new LogCounter(10, 100000, 60 
* 1000L);
     private static final LogCounter updConExptCnt = new LogCounter(10, 100000, 
60 * 1000L);
     private static final LogCounter exptCounter = new LogCounter(10, 100000, 
60 * 1000L);
@@ -83,14 +84,14 @@ public class ClientMgr {
     /**
      * Build up the connection between the server and client.
      */
-    public ClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender) {
+    public DefClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender) {
         this(tcpConfig, sender, null);
     }
 
     /**
      * Build up the connection between the server and client.
      */
-    public ClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender, 
ThreadFactory selfDefineFactory) {
+    public DefClientMgr(TcpMsgSenderConfig tcpConfig, Sender sender, 
ThreadFactory selfDefineFactory) {
         this.tcpConfig = tcpConfig;
         this.sender = sender;
         // Initialize the bootstrap
@@ -301,7 +302,13 @@ public class ClientMgr {
         }
     }
 
-    public void updateProxyInfoList(boolean nodeChanged, List<HostInfo> 
newNodes) {
+    @Override
+    public void updateAllowedMaxPkgLength(int maxPkgLength) {
+        //
+    }
+
+    @Override
+    public void updateProxyNodes(boolean nodeChanged, List<HostInfo> newNodes) 
{
         if (newNodes == null || newNodes.isEmpty() || !this.started.get()) {
             return;
         }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index eec1326747..1b5653bc46 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -23,8 +23,8 @@ import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
-import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
 
 import org.slf4j.Logger;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
index 30cb4c6567..070e36b09c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.sdk.dataproxy.network;
 
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 
 import io.netty.bootstrap.Bootstrap;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 0d8115dc4f..1873ae979f 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.sdk.dataproxy.network;
 
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
 import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -64,7 +64,7 @@ public class Sender {
     private final AtomicInteger currentBufferSize = new AtomicInteger(0);
     private final TimeoutScanThread scanThread;
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final ClientMgr clientMgr;
+    private final DefClientMgr clientMgr;
     private final String instanceId;
     private final TcpMsgSenderConfig tcpConfig;
     private MetricWorkerThread metricWorker = null;
@@ -82,7 +82,7 @@ public class Sender {
         this.instanceId = "sender-" + senderIdGen.incrementAndGet();
         this.asyncCallbackMaxSize = tcpConfig.getTotalAsyncCallbackSize();
         this.threadPool = Executors.newCachedThreadPool();
-        this.clientMgr = new ClientMgr(tcpConfig, this, selfDefineFactory);
+        this.clientMgr = new DefClientMgr(tcpConfig, this, selfDefineFactory);
         this.scanThread = new TimeoutScanThread(this, tcpConfig);
         if (tcpConfig.isEnableMetric()) {
             metricWorker = new MetricWorkerThread(tcpConfig, this);
@@ -553,7 +553,7 @@ public class Sender {
         return callbacks;
     }
 
-    public ClientMgr getClientMgr() {
+    public DefClientMgr getClientMgr() {
         return clientMgr;
     }
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/HttpMsgSenderConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
similarity index 99%
rename from 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/HttpMsgSenderConfig.java
rename to 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
index 4833968d18..6c7e5ac8b0 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/HttpMsgSenderConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.dataproxy.http;
+package org.apache.inlong.sdk.dataproxy.sender.http;
 
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.sdk.dataproxy.common.HttpContentType;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
index dfa255aab6..a51f41a105 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java
@@ -27,7 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * HTTP Event Information class
+ * TCP Event Information class
  *
  * Used to encapsulate the data information reported by TCP
  */
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/TcpMsgSenderConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
similarity index 99%
rename from 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/TcpMsgSenderConfig.java
rename to 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
index b9054c4084..c30207347e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/TcpMsgSenderConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.dataproxy;
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
 
 import org.apache.inlong.common.msg.MsgType;
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 6ed3f55c91..122cda2eff 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sdk.dataproxy.threads;
 
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
@@ -26,6 +25,7 @@ import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
 import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
 import org.apache.inlong.sdk.dataproxy.network.Sender;
 import org.apache.inlong.sdk.dataproxy.network.SequentialID;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
 import org.slf4j.Logger;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
index f7d4064896..5cb5131a83 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.sdk.dataproxy.threads;
 
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.network.QueueObject;
 import org.apache.inlong.sdk.dataproxy.network.Sender;
 import org.apache.inlong.sdk.dataproxy.network.TimeScanObject;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 
 import io.netty.channel.Channel;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 61dbb263df..bc229b744b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -19,8 +19,8 @@ package org.apache.inlong.sdk.dataproxy.utils;
 
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.MsgType;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
index 7165b2c8ad..0173651169 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sdk.dataproxy;
 
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
index da34136449..22ac070e7b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -20,7 +20,8 @@ package org.apache.inlong.sdk.dataproxy;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
-import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.DefClientMgr;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,7 +37,7 @@ public class ProxyConfigManagerTest {
             
Objects.requireNonNull(this.getClass().getClassLoader().getResource("proxylist.json")).toURI())
             .toString();
     private final TcpMsgSenderConfig clientConfig = 
PowerMockito.mock(TcpMsgSenderConfig.class);
-    private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class);
+    private final DefClientMgr clientMgr = 
PowerMockito.mock(DefClientMgr.class);
     private final ProxyConfigManager proxyConfigManager;
 
     public ProxyConfigManagerTest() throws URISyntaxException {
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 9bf6be2014..af7a62fb6d 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -18,9 +18,9 @@
 package org.apache.inlong.sdk.dirtydata;
 
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import com.google.common.base.Preconditions;
 import lombok.Builder;

Reply via email to