This is an automated email from the ASF dual-hosted git repository. rfu pushed a commit to branch freeznet/fix-debezium-client-auth in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ae04fb4f6b1634800b79593a55ba570880deaed Author: Rui Fu <[email protected]> AuthorDate: Thu Sep 23 11:04:23 2021 +0800 stash --- .../java/org/apache/pulsar/io/debezium/DebeziumSource.java | 10 +++++----- .../org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java | 7 ++++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index b9074b9..6f75233 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -81,9 +81,6 @@ public abstract class DebeziumSource extends KafkaConnectSource { // database.history.pulsar.service.url String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name()); - if (StringUtils.isEmpty(pulsarUrl)) { - throw new IllegalArgumentException("Pulsar service URL for History Database not provided."); - } String topicNamespace = topicNamespace(sourceContext); // topic.namespace @@ -97,8 +94,11 @@ public abstract class DebeziumSource extends KafkaConnectSource { setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC); - config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder", - SerDeUtils.serialize(sourceContext.getPulsarClientBuilder())); + // pass pulsar.client.builder if database.history.pulsar.service.url is not provided + if (StringUtils.isEmpty(pulsarUrl)) { + String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()); + config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder); + } super.open(config, sourceContext); } diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java index be152a6..a98e817 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -103,14 +103,15 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { } this.topicName = config.getString(TOPIC); - if (config.getString(CLIENT_BUILDER) == null && config.getString(SERVICE_URL) == null) { + if (StringUtils.isEmpty(config.getString(CLIENT_BUILDER)) && StringUtils.isEmpty(config.getString(SERVICE_URL))) { throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided."); } String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER); this.clientBuilder = PulsarClient.builder(); - if (null != clientBuilderBase64Encoded) { + if (!StringUtils.isEmpty(clientBuilderBase64Encoded)) { // deserialize the client builder to the same classloader - this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, this.clientBuilder.getClass().getClassLoader()); + this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, + this.clientBuilder.getClass().getClassLoader()); } else { this.clientBuilder.serviceUrl(config.getString(SERVICE_URL)); }
