This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f1ccde411e [INLONG-11156][SDK] SortSDK support that the token
configuration of pulsar cluster is null (#11158)
f1ccde411e is described below
commit f1ccde411e4693830c9ac7d7799f1941fbe560b0
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Sep 20 15:35:57 2024 +0800
[INLONG-11156][SDK] SortSDK support that the token configuration of pulsar
cluster is null (#11158)
---
.../inlong/sdk/sort/manager/InlongMultiTopicManager.java | 12 +++++++++++-
.../sdk/sort/manager/InlongSingleTopicManager.java | 16 +++++++++++-----
.../inlong/sdk/sort/manager/InlongTopicManager.java | 12 +++++++++++-
3 files changed, 33 insertions(+), 7 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
index 744b38d223..2d098345ca 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
@@ -35,6 +35,7 @@ import
org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -213,10 +214,19 @@ public class InlongMultiTopicManager extends TopicManager
{
topic.getInLongCluster().getBootstraps(), consumerSize);
for (int i = 0; i < consumerSize; i++) {
try {
+ String token = topic.getInLongCluster().getToken();
+ Authentication auth = null;
+ if (StringUtils.isNoneBlank(token)) {
+ auth = AuthenticationFactory.token(token);
+ }
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(topic.getInLongCluster().getBootstraps())
-
.authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken()))
+ .authentication(auth)
.build();
+ LOGGER.info("create pulsar client succ cluster:{}, topic:{},
token:{}",
+ topic.getInLongCluster().getClusterId(),
+ topic.getTopic(),
+ topic.getInLongCluster().getToken());
TopicFetcher fetcher = TopicFetcherBuilder.newPulsarBuilder()
.pulsarClient(pulsarClient)
.topic(topics)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
index 503304c77f..d137a903d2 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
@@ -33,6 +33,7 @@ import
org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
@@ -363,15 +364,20 @@ public class InlongSingleTopicManager extends
TopicManager {
if
(!pulsarClients.containsKey(topic.getInLongCluster().getClusterId())) {
if (topic.getInLongCluster().getBootstraps() != null) {
try {
+ String token = topic.getInLongCluster().getToken();
+ Authentication auth = null;
+ if (StringUtils.isNoneBlank(token)) {
+ auth = AuthenticationFactory.token(token);
+ }
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(topic.getInLongCluster().getBootstraps())
-
.authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken()))
+ .authentication(auth)
.build();
pulsarClients.put(topic.getInLongCluster().getClusterId(),
pulsarClient);
- LOGGER.debug("create pulsar client succ {}",
- new
String[]{topic.getInLongCluster().getClusterId(),
- topic.getInLongCluster().getBootstraps(),
- topic.getInLongCluster().getToken()});
+ LOGGER.info("create pulsar client succ cluster:{},
topic:{}, token:{}",
+ topic.getInLongCluster().getClusterId(),
+ topic.getTopic(),
+ topic.getInLongCluster().getToken());
} catch (Exception e) {
LOGGER.error("create pulsar client error {}", topic);
LOGGER.error(e.getMessage(), e);
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
index f594126152..6d3343293b 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
@@ -32,6 +32,8 @@ import
org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
@@ -359,10 +361,18 @@ public class InlongTopicManager extends TopicManager {
LOGGER.info("start to init pulsar client for cluster={}", cluster);
if (cluster.getBootstraps() != null) {
try {
+ String token = cluster.getToken();
+ Authentication auth = null;
+ if (StringUtils.isNoneBlank(token)) {
+ auth = AuthenticationFactory.token(token);
+ }
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(cluster.getBootstraps())
-
.authentication(AuthenticationFactory.token(cluster.getToken()))
+ .authentication(auth)
.build();
+ LOGGER.info("create pulsar client succ cluster:{}, token:{}",
+ cluster.getClusterId(),
+ cluster.getToken());
PulsarClient oldClient =
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
if (oldClient != null && !oldClient.isClosed()) {
LOGGER.warn("close new pulsar client for cluster={}",
cluster);