This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 65a1acd73340c2124a9d6ea06cff23f08dc45b82
Author: Ming Luo <itestmyc...@gmail.com>
AuthorDate: Wed May 27 23:38:34 2020 -0400

    auth token for debezium and kafka connect adaptor
---
 .../pulsar/io/debezium/PulsarDatabaseHistory.java  | 22 +++++++++++++++++++---
 .../io/kafka/connect/PulsarKafkaWorkerConfig.java  | 10 ++++++++++
 .../io/kafka/connect/PulsarOffsetBackingStore.java | 14 +++++++++++---
 3 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index ed7de1f..81459f3 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -37,6 +37,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -69,14 +71,24 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         .withDescription("Pulsar service url")
         .withValidation(Field::isRequired);
 
+    public static final Field PULSAR_TOKEN = 
Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.token")
+        .withDisplayName("Pulsar auth token")
+        .withType(Type.STRING)
+        .withWidth(Width.LONG)
+        .withImportance(Importance.HIGH)
+        .withDescription("Pulsar authentication token")
+        .withValidation(Field::isOptional);
+
     public static Field.Set ALL_FIELDS = Field.setOf(
         TOPIC,
         SERVICE_URL,
+        PULSAR_TOKEN,
         DatabaseHistory.NAME);
 
     private final DocumentReader reader = DocumentReader.defaultReader();
     private String topicName;
     private String serviceUrl;
+    private String token;
     private String dbHistoryName;
     private volatile PulsarClient pulsarClient;
     private volatile Producer<String> producer;
@@ -95,6 +107,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         }
         this.topicName = config.getString(TOPIC);
         this.serviceUrl = config.getString(SERVICE_URL);
+        this.token = config.getString(PULSAR_TOKEN);
         // Copy the relevant portions of the configuration and add useful 
defaults ...
         this.dbHistoryName = config.getString(DatabaseHistory.NAME, 
UUID.randomUUID().toString());
 
@@ -118,9 +131,12 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
     void setupClientIfNeeded() {
         if (null == this.pulsarClient) {
             try {
-                pulsarClient = PulsarClient.builder()
-                    .serviceUrl(serviceUrl)
-                    .build();
+                ClientBuilder builder = 
PulsarClient.builder().serviceUrl(serviceUrl);
+
+                if (token != null && token != "") {
+                    builder = 
builder.authentication(AuthenticationFactory.token(token));
+                }
+                pulsarClient = builder.build();
             } catch (PulsarClientException e) {
                 throw new RuntimeException("Failed to create pulsar client to 
pulsar cluster at "
                     + serviceUrl, e);
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
index 624c59a..92092741 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
@@ -45,6 +45,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
     private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar 
service url";
 
     /**
+     * <code>pulsar.auth.token</code>
+     */
+    public static final String PULSAR_AUTH_TOKEN_CONFIG = "pulsar.auth.token";
+    private static final String PULSAR_AUTH_TOKEN_CONFIG_DOC = "pulsar auth 
token";
+
+    /**
      * <code>topic.namespace</code>
      */
     public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace";
@@ -60,6 +66,10 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
                 Type.STRING,
                 Importance.HIGH,
                 PULSAR_SERVICE_URL_CONFIG_DOC)
+            .define(PULSAR_AUTH_TOKEN_CONFIG,
+                Type.STRING,
+                Importance.HIGH,
+                PULSAR_AUTH_TOKEN_CONFIG_DOC)
             .define(TOPIC_NAMESPACE_CONFIG,
                 Type.STRING,
                 "public/default",
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index d7daf82..99303b8 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -36,6 +36,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -54,6 +56,7 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
     private PulsarClient client;
     private String serviceUrl;
     private String topic;
+    private String token;
     private Producer<byte[]> producer;
     private Reader<byte[]> reader;
     private volatile CompletableFuture<Void> outstandingReadToEnd = null;
@@ -63,6 +66,7 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
         this.topic = 
workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
         checkArgument(!isBlank(topic), "Offset storage topic must be 
specified");
         this.serviceUrl = 
workerConfig.getString(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
+        this.token = 
workerConfig.getString(PulsarKafkaWorkerConfig.PULSAR_AUTH_TOKEN_CONFIG);
         checkArgument(!isBlank(serviceUrl), "Pulsar service url must be 
specified at `"
             + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`");
         this.data = new HashMap<>();
@@ -137,9 +141,13 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
     @Override
     public void start() {
         try {
-            client = PulsarClient.builder()
-                .serviceUrl(serviceUrl)
-                .build();
+            ClientBuilder builder = 
PulsarClient.builder().serviceUrl(serviceUrl);
+
+            if (token != null && token != "") {
+                builder = 
builder.authentication(AuthenticationFactory.token(token));
+            }
+            client = builder.build();
+
             log.info("Successfully created pulsar client to {}", serviceUrl);
             producer = client.newProducer(Schema.BYTES)
                 .topic(topic)

Reply via email to