This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 22ee4c3d5b [INLONG-11711][SDK] SortSDK shares the same PulsarClient 
among different SortTasks to avoid performance bottlenecks caused by too many 
PulsarClients (#11712)
22ee4c3d5b is described below

commit 22ee4c3d5bfdf2e2f511655be3b7b4323eedd99a
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Feb 10 09:39:55 2025 +0800

    [INLONG-11711][SDK] SortSDK shares the same PulsarClient among different 
SortTasks to avoid performance bottlenecks caused by too many PulsarClients 
(#11712)
---
 .../sdk/sort/manager/InlongSingleTopicManager.java |  4 +-
 .../sdk/sort/manager/InlongTopicManager.java       | 59 ++++++++++++----------
 2 files changed, 34 insertions(+), 29 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
index d137a903d2..68d83c0286 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
@@ -66,7 +66,7 @@ public class InlongSingleTopicManager extends TopicManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongSingleTopicManager.class);
 
     private final ConcurrentHashMap<String, TopicFetcher> fetchers = new 
ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new 
ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<String, PulsarClient> pulsarClients 
= new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, TubeConsumerCreator> tubeFactories 
= new ConcurrentHashMap<>();
 
     private final PeriodicTask updateMetaDataWorker;
@@ -199,7 +199,7 @@ public class InlongSingleTopicManager extends TopicManager {
             }
 
             closeFetcher();
-            closePulsarClient();
+            // closePulsarClient();
             closeTubeSessionFactory();
             LOGGER.info("close finished {}", sortTaskId);
             return true;
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
index 6d3343293b..0d7086de81 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
@@ -65,7 +65,7 @@ public class InlongTopicManager extends TopicManager {
 
     private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
     private final Map<String, TopicFetcher> fetchers = new 
ConcurrentHashMap<>();
-    private final Map<String, PulsarClient> pulsarClients = new 
ConcurrentHashMap<>();
+    private static final Map<String, PulsarClient> pulsarClients = new 
ConcurrentHashMap<>();
     private final Map<String, TubeConsumerCreator> tubeFactories = new 
ConcurrentHashMap<>();
 
     protected final ForkJoinPool pool;
@@ -88,7 +88,7 @@ public class InlongTopicManager extends TopicManager {
             LOGGER.info("start to clean topic manager, sortTaskId={}", 
sortTaskId);
             stopAssign = true;
             closeAllFetchers();
-            closeAllPulsarClients();
+            // closeAllPulsarClients();
             closeAllTubeFactories();
             LOGGER.info("success to clean topic manager, sortTaskId={}", 
sortTaskId);
             return true;
@@ -359,33 +359,38 @@ public class InlongTopicManager extends TopicManager {
 
     private void createPulsarClient(CacheZoneCluster cluster) {
         LOGGER.info("start to init pulsar client for cluster={}", cluster);
-        if (cluster.getBootstraps() != null) {
-            try {
-                String token = cluster.getToken();
-                Authentication auth = null;
-                if (StringUtils.isNoneBlank(token)) {
-                    auth = AuthenticationFactory.token(token);
-                }
-                PulsarClient pulsarClient = PulsarClient.builder()
-                        .serviceUrl(cluster.getBootstraps())
-                        .authentication(auth)
-                        .build();
-                LOGGER.info("create pulsar client succ cluster:{}, token:{}",
-                        cluster.getClusterId(),
-                        cluster.getToken());
-                PulsarClient oldClient = 
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
-                if (oldClient != null && !oldClient.isClosed()) {
-                    LOGGER.warn("close new pulsar client for cluster={}", 
cluster);
-                    pulsarClient.close();
-                }
-            } catch (Exception e) {
-                LOGGER.error("create pulsar client error for cluster={}", 
cluster, e);
-                return;
-            }
-            LOGGER.info("success to init pulsar client for cluster={}", 
cluster);
-        } else {
+        String clientKey = cluster.getBootstraps();
+        if (clientKey == null) {
             LOGGER.error("bootstrap is null for cluster={}", cluster);
+            return;
+        }
+        if (pulsarClients.containsKey(clientKey)) {
+            LOGGER.info("Repeat to init pulsar client for cluster={}", 
cluster);
+            return;
+        }
+        try {
+            String token = cluster.getToken();
+            Authentication auth = null;
+            if (StringUtils.isNoneBlank(token)) {
+                auth = AuthenticationFactory.token(token);
+            }
+            PulsarClient pulsarClient = PulsarClient.builder()
+                    .serviceUrl(cluster.getBootstraps())
+                    .authentication(auth)
+                    .build();
+            LOGGER.info("create pulsar client succ cluster:{}, token:{}",
+                    cluster.getClusterId(),
+                    cluster.getToken());
+            PulsarClient oldClient = 
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
+            if (oldClient != null && !oldClient.isClosed()) {
+                LOGGER.warn("close new pulsar client for cluster={}", cluster);
+                pulsarClient.close();
+            }
+        } catch (Exception e) {
+            LOGGER.error("create pulsar client error for cluster={}", cluster, 
e);
+            return;
         }
+        LOGGER.info("success to init pulsar client for cluster={}", cluster);
     }
 
     private List<CacheZoneCluster> getCacheZoneClusters(InlongTopicTypeEnum 
type) {

Reply via email to