This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 65a1acd73340c2124a9d6ea06cff23f08dc45b82 Author: Ming Luo <itestmyc...@gmail.com> AuthorDate: Wed May 27 23:38:34 2020 -0400 auth token for debezium and kafka connect adaptor --- .../pulsar/io/debezium/PulsarDatabaseHistory.java | 22 +++++++++++++++++++--- .../io/kafka/connect/PulsarKafkaWorkerConfig.java | 10 ++++++++++ .../io/kafka/connect/PulsarOffsetBackingStore.java | 14 +++++++++++--- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java index ed7de1f..81459f3 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -37,6 +37,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -69,14 +71,24 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { .withDescription("Pulsar service url") .withValidation(Field::isRequired); + public static final Field PULSAR_TOKEN = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.token") + .withDisplayName("Pulsar auth token") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("Pulsar authentication token") + .withValidation(Field::isOptional); + public static Field.Set ALL_FIELDS = Field.setOf( TOPIC, SERVICE_URL, + PULSAR_TOKEN, DatabaseHistory.NAME); private final DocumentReader reader = DocumentReader.defaultReader(); private String topicName; private String serviceUrl; + private String token; private String dbHistoryName; private volatile PulsarClient pulsarClient; private volatile Producer<String> producer; @@ -95,6 +107,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { } this.topicName = config.getString(TOPIC); this.serviceUrl = config.getString(SERVICE_URL); + this.token = config.getString(PULSAR_TOKEN); // Copy the relevant portions of the configuration and add useful defaults ... this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString()); @@ -118,9 +131,12 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { void setupClientIfNeeded() { if (null == this.pulsarClient) { try { - pulsarClient = PulsarClient.builder() - .serviceUrl(serviceUrl) - .build(); + ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl); + + if (token != null && token != "") { + builder = builder.authentication(AuthenticationFactory.token(token)); + } + pulsarClient = builder.build(); } catch (PulsarClientException e) { throw new RuntimeException("Failed to create pulsar client to pulsar cluster at " + serviceUrl, e); diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java index 624c59a..92092741 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java @@ -45,6 +45,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig { private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar service url"; /** + * <code>pulsar.auth.token</code> + */ + public static final String PULSAR_AUTH_TOKEN_CONFIG = "pulsar.auth.token"; + private static final String PULSAR_AUTH_TOKEN_CONFIG_DOC = "pulsar auth token"; + + /** * <code>topic.namespace</code> */ public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace"; @@ -60,6 +66,10 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig { Type.STRING, Importance.HIGH, PULSAR_SERVICE_URL_CONFIG_DOC) + .define(PULSAR_AUTH_TOKEN_CONFIG, + Type.STRING, + Importance.HIGH, + PULSAR_AUTH_TOKEN_CONFIG_DOC) .define(TOPIC_NAMESPACE_CONFIG, Type.STRING, "public/default", diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index d7daf82..99303b8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -36,6 +36,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.Callback; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -54,6 +56,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { private PulsarClient client; private String serviceUrl; private String topic; + private String token; private Producer<byte[]> producer; private Reader<byte[]> reader; private volatile CompletableFuture<Void> outstandingReadToEnd = null; @@ -63,6 +66,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG); checkArgument(!isBlank(topic), "Offset storage topic must be specified"); this.serviceUrl = workerConfig.getString(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG); + this.token = workerConfig.getString(PulsarKafkaWorkerConfig.PULSAR_AUTH_TOKEN_CONFIG); checkArgument(!isBlank(serviceUrl), "Pulsar service url must be specified at `" + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`"); this.data = new HashMap<>(); @@ -137,9 +141,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { @Override public void start() { try { - client = PulsarClient.builder() - .serviceUrl(serviceUrl) - .build(); + ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl); + + if (token != null && token != "") { + builder = builder.authentication(AuthenticationFactory.token(token)); + } + client = builder.build(); + log.info("Successfully created pulsar client to {}", serviceUrl); producer = client.newProducer(Schema.BYTES) .topic(topic)