This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 66ed237238 [INLONG-11325][SDK] DataProxy SDK supports elegant metadata 
updates (#11326)
66ed237238 is described below

commit 66ed23723853f0333573e61e648043243b7db82b
Author: castor <[email protected]>
AuthorDate: Sat Oct 12 14:07:43 2024 +0800

    [INLONG-11325][SDK] DataProxy SDK supports elegant metadata updates (#11326)
---
 .../inlong/sdk/dataproxy/network/ClientMgr.java    | 123 ++++++++++++++++++++-
 .../inlong/sdk/dataproxy/network/Sender.java       |   4 +
 2 files changed, 123 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index cba5367806..abb204517b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
@@ -316,6 +317,46 @@ public class ClientMgr {
         return bSuccess;
     }
 
+    /**
+     * create conn, as DataConn or HBConn
+     *
+     * @param host
+     * @return
+     */
+    private void initConnection(HostInfo host, NettyClient client) {
+        if (clientMapData.size() < aliveConnections) {
+            // create data channel
+            clientMapData.put(host, client);
+            clientList.add(client);
+            clientMap.put(host, client);
+            logger.info("build a connection success! {},channel {}", 
host.getHostName(), client.getChannel());
+            logger.info("client map size {},client list size {}", 
clientMapData.size(), clientList.size());
+        } else {
+            // data channel list is enough, create hb channel
+            clientMapHB.put(host, client);
+            clientMap.put(host, client);
+            logger.info("build a HBconnection success! {},channel {}", 
host.getHostName(), client.getChannel());
+        }
+    }
+
+    /**
+     * create conn list, as DataConn or HBConn
+     *
+     * @param host
+     * @return
+     */
+    private Map<HostInfo, NettyClient> initConnectionList(List<HostInfo> host) 
{
+        Map<HostInfo, NettyClient> hostInfoNettyClientMap = new HashMap<>();
+        for (HostInfo hostInfo : host) {
+            NettyClient client = new NettyClient(bootstrap, 
hostInfo.getHostName(),
+                    hostInfo.getPortNumber(), configure);
+            if (client.connect()) {
+                hostInfoNettyClientMap.put(hostInfo, client);
+            }
+        }
+        return hostInfoNettyClientMap;
+    }
+
     public void resetClient(Channel channel) {
         if (channel == null) {
             return;
@@ -523,10 +564,84 @@ public class ClientMgr {
     }
 
     private void updateAllConnection(List<HostInfo> hostInfos) {
-        closeAllConnection();
-        /* Build new channels */
-        for (HostInfo hostInfo : hostInfos) {
-            initConnection(hostInfo);
+
+        List<HostInfo> unHealthyHostList = findUnHealthyHostList(hostInfos);
+        List<HostInfo> newlyAddList = findNewlyAddList(hostInfos);
+        Map<HostInfo, NettyClient> hostInfoNettyClientMap = 
initConnectionList(newlyAddList);
+        logger.info("unhealthyHostList = {},newlyAddList = {}", 
unHealthyHostList, newlyAddList);
+        try {
+            writeLock();
+            replaceUnHealthyHostList(unHealthyHostList, 
hostInfoNettyClientMap);
+        } catch (Exception e) {
+            logger.error("update  Connection error", e);
+        } finally {
+            writeUnlock();
+        }
+
+    }
+
+    private List<HostInfo> findUnHealthyHostList(List<HostInfo> hostInfos) {
+        List<HostInfo> unHealthyHostList = new ArrayList<>();
+        if (!clientMap.isEmpty()) {
+            for (HostInfo hostInfo : clientMap.keySet()) {
+                if (hostInfo == null) {
+                    continue;
+                }
+                Optional<HostInfo> optionalHostInfo =
+                        hostInfos.stream().filter(hostInfo1 -> 
hostInfo1.equals(hostInfo))
+                                .findFirst();
+                NettyClient client = clientMap.get(hostInfo);
+                if (optionalHostInfo.isPresent() && client.isActive()) {
+                    continue;
+                }
+                unHealthyHostList.add(hostInfo);
+            }
+        }
+        return unHealthyHostList;
+    }
+
+    private List<HostInfo> findNewlyAddList(List<HostInfo> hostInfos) {
+        List<HostInfo> newlyAddList = new ArrayList<>();
+        if (!clientMap.isEmpty()) {
+            for (HostInfo hostInfo : hostInfos) {
+                if (hostInfo == null) {
+                    continue;
+                }
+                Optional<HostInfo> optionalHostInfo =
+                        clientMap.keySet().stream().filter(hostInfo1 -> 
hostInfo1.equals(hostInfo))
+                                .findFirst();
+                if (optionalHostInfo.isPresent()) {
+                    continue;
+                }
+                newlyAddList.add(hostInfo);
+            }
+        }
+        return newlyAddList;
+    }
+
+    private void replaceUnHealthyHostList(List<HostInfo> unHealthyHostList,
+            Map<HostInfo, NettyClient> hostInfoNettyClientMap) {
+        int index = 0;
+        List<HostInfo> hostInfos = new 
ArrayList<>(hostInfoNettyClientMap.keySet());
+        for (HostInfo unHealthyHost : unHealthyHostList) {
+            NettyClient client = clientMap.get(unHealthyHost);
+            logger.info("ready to close not in new HostInfo connections!");
+            if (client != null && client.isActive()) {
+                sender.waitForAckForChannel(client.getChannel());
+                sender.clearCallBackByChannel(client.getChannel());
+                boolean close = client.close();
+                clientList.remove(client);
+                logger.info("close connections! = {} for host = {}", close, 
unHealthyHost);
+            }
+            clientMap.remove(unHealthyHost);
+            clientMapData.remove(unHealthyHost);
+            clientMapHB.remove(unHealthyHost);
+            channelLoadMapData.remove(unHealthyHost);
+            channelLoadMapHB.remove(unHealthyHost);
+            if (index < hostInfos.size()) {
+                HostInfo hostInfo = hostInfos.get(index++);
+                initConnection(hostInfo, hostInfoNettyClientMap.get(hostInfo));
+            }
         }
     }
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index bae96e2e4c..d68a0c2330 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -689,6 +689,10 @@ public class Sender {
         callbacks.clear();
     }
 
+    public void clearCallBackByChannel(Channel channel) {
+        callbacks.remove(channel);
+    }
+
     public int getClusterId() {
         return clusterId;
     }

Reply via email to