This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 22ee4c3d5b [INLONG-11711][SDK] SortSDK shares the same PulsarClient
among different SortTasks to avoid performance bottlenecks caused by too many
PulsarClients (#11712)
22ee4c3d5b is described below
commit 22ee4c3d5bfdf2e2f511655be3b7b4323eedd99a
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Feb 10 09:39:55 2025 +0800
[INLONG-11711][SDK] SortSDK shares the same PulsarClient among different
SortTasks to avoid performance bottlenecks caused by too many PulsarClients
(#11712)
---
.../sdk/sort/manager/InlongSingleTopicManager.java | 4 +-
.../sdk/sort/manager/InlongTopicManager.java | 59 ++++++++++++----------
2 files changed, 34 insertions(+), 29 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
index d137a903d2..68d83c0286 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
@@ -66,7 +66,7 @@ public class InlongSingleTopicManager extends TopicManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongSingleTopicManager.class);
private final ConcurrentHashMap<String, TopicFetcher> fetchers = new
ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new
ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<String, PulsarClient> pulsarClients
= new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, TubeConsumerCreator> tubeFactories
= new ConcurrentHashMap<>();
private final PeriodicTask updateMetaDataWorker;
@@ -199,7 +199,7 @@ public class InlongSingleTopicManager extends TopicManager {
}
closeFetcher();
- closePulsarClient();
+ // closePulsarClient();
closeTubeSessionFactory();
LOGGER.info("close finished {}", sortTaskId);
return true;
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
index 6d3343293b..0d7086de81 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
@@ -65,7 +65,7 @@ public class InlongTopicManager extends TopicManager {
private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
private final Map<String, TopicFetcher> fetchers = new
ConcurrentHashMap<>();
- private final Map<String, PulsarClient> pulsarClients = new
ConcurrentHashMap<>();
+ private static final Map<String, PulsarClient> pulsarClients = new
ConcurrentHashMap<>();
private final Map<String, TubeConsumerCreator> tubeFactories = new
ConcurrentHashMap<>();
protected final ForkJoinPool pool;
@@ -88,7 +88,7 @@ public class InlongTopicManager extends TopicManager {
LOGGER.info("start to clean topic manager, sortTaskId={}",
sortTaskId);
stopAssign = true;
closeAllFetchers();
- closeAllPulsarClients();
+ // closeAllPulsarClients();
closeAllTubeFactories();
LOGGER.info("success to clean topic manager, sortTaskId={}",
sortTaskId);
return true;
@@ -359,33 +359,38 @@ public class InlongTopicManager extends TopicManager {
private void createPulsarClient(CacheZoneCluster cluster) {
LOGGER.info("start to init pulsar client for cluster={}", cluster);
- if (cluster.getBootstraps() != null) {
- try {
- String token = cluster.getToken();
- Authentication auth = null;
- if (StringUtils.isNoneBlank(token)) {
- auth = AuthenticationFactory.token(token);
- }
- PulsarClient pulsarClient = PulsarClient.builder()
- .serviceUrl(cluster.getBootstraps())
- .authentication(auth)
- .build();
- LOGGER.info("create pulsar client succ cluster:{}, token:{}",
- cluster.getClusterId(),
- cluster.getToken());
- PulsarClient oldClient =
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
- if (oldClient != null && !oldClient.isClosed()) {
- LOGGER.warn("close new pulsar client for cluster={}",
cluster);
- pulsarClient.close();
- }
- } catch (Exception e) {
- LOGGER.error("create pulsar client error for cluster={}",
cluster, e);
- return;
- }
- LOGGER.info("success to init pulsar client for cluster={}",
cluster);
- } else {
+ String clientKey = cluster.getBootstraps();
+ if (clientKey == null) {
LOGGER.error("bootstrap is null for cluster={}", cluster);
+ return;
+ }
+ if (pulsarClients.containsKey(clientKey)) {
+ LOGGER.info("Repeat to init pulsar client for cluster={}",
cluster);
+ return;
+ }
+ try {
+ String token = cluster.getToken();
+ Authentication auth = null;
+ if (StringUtils.isNoneBlank(token)) {
+ auth = AuthenticationFactory.token(token);
+ }
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(cluster.getBootstraps())
+ .authentication(auth)
+ .build();
+ LOGGER.info("create pulsar client succ cluster:{}, token:{}",
+ cluster.getClusterId(),
+ cluster.getToken());
+ PulsarClient oldClient =
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
+ if (oldClient != null && !oldClient.isClosed()) {
+ LOGGER.warn("close new pulsar client for cluster={}", cluster);
+ pulsarClient.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("create pulsar client error for cluster={}", cluster,
e);
+ return;
}
+ LOGGER.info("success to init pulsar client for cluster={}", cluster);
}
private List<CacheZoneCluster> getCacheZoneClusters(InlongTopicTypeEnum
type) {