This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b1ebf9357 [INLONG-5604][Manager] Add token info for Pulsar source 
(#5609)
b1ebf9357 is described below

commit b1ebf9357cb8810ded71e1bce3e9ffa9134ed5e1
Author: healchow <[email protected]>
AuthorDate: Fri Aug 19 17:36:49 2022 +0800

    [INLONG-5604][Manager] Add token info for Pulsar source (#5609)
---
 .../manager/service/source/pulsar/PulsarSourceOperator.java | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index b6b89b0b9..03fb7f96b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -56,6 +56,12 @@ import java.util.Map;
 @Service
 public class PulsarSourceOperator extends AbstractSourceOperator {
 
+    private static final String AUTH_CLASSNAME_KEY = 
"properties.auth-plugin-classname";
+    private static final String AUTH_CLASSNAME_VALUE = 
"org.apache.pulsar.client.impl.auth.AuthenticationToken";
+    private static final String AUTH_PARAMS_KEY = "properties.auth-params";
+    // the %s must be replaced by the actual value
+    private static final String AUTH_PARAMS_VALUE = "token:%s";
+
     @Autowired
     private ObjectMapper objectMapper;
     @Autowired
@@ -121,6 +127,13 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
             pulsarSource.setServiceUrl(serviceUrl);
             pulsarSource.setInlongComponent(true);
 
+            // set the token info
+            if (StringUtils.isNotBlank(pulsarCluster.getToken())) {
+                Map<String, Object> properties = pulsarSource.getProperties();
+                properties.putIfAbsent(AUTH_CLASSNAME_KEY, 
AUTH_CLASSNAME_VALUE);
+                properties.putIfAbsent(AUTH_PARAMS_KEY, 
String.format(AUTH_PARAMS_VALUE, pulsarCluster.getToken()));
+            }
+
             for (StreamSource sourceInfo : streamSources) {
                 if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
                     continue;

Reply via email to