gosonzhang commented on code in PR #11326:
URL: https://github.com/apache/inlong/pull/11326#discussion_r1795010070


##########
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java:
##########
@@ -523,10 +524,71 @@ private void closeAllConnection() {
     }
 
     private void updateAllConnection(List<HostInfo> hostInfos) {
-        closeAllConnection();
-        /* Build new channels */
-        for (HostInfo hostInfo : hostInfos) {
-            initConnection(hostInfo);
+
+        try {
+            writeLock();
+            List<HostInfo> notExistHosts = new ArrayList<>();
+            if (!clientMap.isEmpty()) {
+                logger.info("ready to close not in new HostInfo connections!");
+                for (HostInfo hostInfo : clientMap.keySet()) {
+                    if (hostInfo == null) {
+                        continue;
+                    }
+                    Optional<HostInfo> optionalHostInfo =
+                            hostInfos.stream().filter(hostInfo1 -> 
hostInfo1.equals(hostInfo))
+                                    .findFirst();
+                    if (optionalHostInfo.isPresent()) {
+                        continue;

Review Comment:
   If the target has established a link, you need to check whether the link is 
available. If it is not available, you also need to clean it up.



##########
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java:
##########
@@ -523,10 +524,71 @@ private void closeAllConnection() {
     }
 
     private void updateAllConnection(List<HostInfo> hostInfos) {
-        closeAllConnection();
-        /* Build new channels */
-        for (HostInfo hostInfo : hostInfos) {
-            initConnection(hostInfo);
+
+        try {
+            writeLock();
+            List<HostInfo> notExistHosts = new ArrayList<>();
+            if (!clientMap.isEmpty()) {
+                logger.info("ready to close not in new HostInfo connections!");
+                for (HostInfo hostInfo : clientMap.keySet()) {
+                    if (hostInfo == null) {
+                        continue;
+                    }
+                    Optional<HostInfo> optionalHostInfo =
+                            hostInfos.stream().filter(hostInfo1 -> 
hostInfo1.equals(hostInfo))
+                                    .findFirst();
+                    if (optionalHostInfo.isPresent()) {
+                        continue;
+                    }
+                    NettyClient client = clientMap.get(hostInfo);
+                    if (client != null && client.isActive()) {
+                        sender.waitForAckForChannel(client.getChannel());
+                        client.close();
+                        clientList.remove(client);
+                        sender.clearCallBackByChannel(client.getChannel());
+                    }
+                    logger.info("ready to close not in new HostInfo 
connections!");
+                    notExistHosts.add(hostInfo);
+                }
+            }
+            removeNotExistHost(notExistHosts);
+
+            updateAndInitConnection(hostInfos);

Review Comment:
   updateAndInitConnection() should precede removeNotExistHost()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to