This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 66ed237238 [INLONG-11325][SDK] DataProxy SDK supports elegant metadata
updates (#11326)
66ed237238 is described below
commit 66ed23723853f0333573e61e648043243b7db82b
Author: castor <[email protected]>
AuthorDate: Sat Oct 12 14:07:43 2024 +0800
[INLONG-11325][SDK] DataProxy SDK supports elegant metadata updates (#11326)
---
.../inlong/sdk/dataproxy/network/ClientMgr.java | 123 ++++++++++++++++++++-
.../inlong/sdk/dataproxy/network/Sender.java | 4 +
2 files changed, 123 insertions(+), 4 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index cba5367806..abb204517b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
@@ -316,6 +317,46 @@ public class ClientMgr {
return bSuccess;
}
+ /**
+ * create conn, as DataConn or HBConn
+ *
+ * @param host
+ * @return
+ */
+ private void initConnection(HostInfo host, NettyClient client) {
+ if (clientMapData.size() < aliveConnections) {
+ // create data channel
+ clientMapData.put(host, client);
+ clientList.add(client);
+ clientMap.put(host, client);
+ logger.info("build a connection success! {},channel {}",
host.getHostName(), client.getChannel());
+ logger.info("client map size {},client list size {}",
clientMapData.size(), clientList.size());
+ } else {
+ // data channel list is enough, create hb channel
+ clientMapHB.put(host, client);
+ clientMap.put(host, client);
+ logger.info("build a HBconnection success! {},channel {}",
host.getHostName(), client.getChannel());
+ }
+ }
+
+ /**
+ * create conn list, as DataConn or HBConn
+ *
+ * @param host
+ * @return
+ */
+ private Map<HostInfo, NettyClient> initConnectionList(List<HostInfo> host)
{
+ Map<HostInfo, NettyClient> hostInfoNettyClientMap = new HashMap<>();
+ for (HostInfo hostInfo : host) {
+ NettyClient client = new NettyClient(bootstrap,
hostInfo.getHostName(),
+ hostInfo.getPortNumber(), configure);
+ if (client.connect()) {
+ hostInfoNettyClientMap.put(hostInfo, client);
+ }
+ }
+ return hostInfoNettyClientMap;
+ }
+
public void resetClient(Channel channel) {
if (channel == null) {
return;
@@ -523,10 +564,84 @@ public class ClientMgr {
}
private void updateAllConnection(List<HostInfo> hostInfos) {
- closeAllConnection();
- /* Build new channels */
- for (HostInfo hostInfo : hostInfos) {
- initConnection(hostInfo);
+
+ List<HostInfo> unHealthyHostList = findUnHealthyHostList(hostInfos);
+ List<HostInfo> newlyAddList = findNewlyAddList(hostInfos);
+ Map<HostInfo, NettyClient> hostInfoNettyClientMap =
initConnectionList(newlyAddList);
+ logger.info("unhealthyHostList = {},newlyAddList = {}",
unHealthyHostList, newlyAddList);
+ try {
+ writeLock();
+ replaceUnHealthyHostList(unHealthyHostList,
hostInfoNettyClientMap);
+ } catch (Exception e) {
+ logger.error("update Connection error", e);
+ } finally {
+ writeUnlock();
+ }
+
+ }
+
+ private List<HostInfo> findUnHealthyHostList(List<HostInfo> hostInfos) {
+ List<HostInfo> unHealthyHostList = new ArrayList<>();
+ if (!clientMap.isEmpty()) {
+ for (HostInfo hostInfo : clientMap.keySet()) {
+ if (hostInfo == null) {
+ continue;
+ }
+ Optional<HostInfo> optionalHostInfo =
+ hostInfos.stream().filter(hostInfo1 ->
hostInfo1.equals(hostInfo))
+ .findFirst();
+ NettyClient client = clientMap.get(hostInfo);
+ if (optionalHostInfo.isPresent() && client.isActive()) {
+ continue;
+ }
+ unHealthyHostList.add(hostInfo);
+ }
+ }
+ return unHealthyHostList;
+ }
+
+ private List<HostInfo> findNewlyAddList(List<HostInfo> hostInfos) {
+ List<HostInfo> newlyAddList = new ArrayList<>();
+ if (!clientMap.isEmpty()) {
+ for (HostInfo hostInfo : hostInfos) {
+ if (hostInfo == null) {
+ continue;
+ }
+ Optional<HostInfo> optionalHostInfo =
+ clientMap.keySet().stream().filter(hostInfo1 ->
hostInfo1.equals(hostInfo))
+ .findFirst();
+ if (optionalHostInfo.isPresent()) {
+ continue;
+ }
+ newlyAddList.add(hostInfo);
+ }
+ }
+ return newlyAddList;
+ }
+
+ private void replaceUnHealthyHostList(List<HostInfo> unHealthyHostList,
+ Map<HostInfo, NettyClient> hostInfoNettyClientMap) {
+ int index = 0;
+ List<HostInfo> hostInfos = new
ArrayList<>(hostInfoNettyClientMap.keySet());
+ for (HostInfo unHealthyHost : unHealthyHostList) {
+ NettyClient client = clientMap.get(unHealthyHost);
+ logger.info("ready to close not in new HostInfo connections!");
+ if (client != null && client.isActive()) {
+ sender.waitForAckForChannel(client.getChannel());
+ sender.clearCallBackByChannel(client.getChannel());
+ boolean close = client.close();
+ clientList.remove(client);
+ logger.info("close connections! = {} for host = {}", close,
unHealthyHost);
+ }
+ clientMap.remove(unHealthyHost);
+ clientMapData.remove(unHealthyHost);
+ clientMapHB.remove(unHealthyHost);
+ channelLoadMapData.remove(unHealthyHost);
+ channelLoadMapHB.remove(unHealthyHost);
+ if (index < hostInfos.size()) {
+ HostInfo hostInfo = hostInfos.get(index++);
+ initConnection(hostInfo, hostInfoNettyClientMap.get(hostInfo));
+ }
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index bae96e2e4c..d68a0c2330 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -689,6 +689,10 @@ public class Sender {
callbacks.clear();
}
+ public void clearCallBackByChannel(Channel channel) {
+ callbacks.remove(channel);
+ }
+
public int getClusterId() {
return clusterId;
}