This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 7792fac  [pulsar-io] pass client builder if no service url provided to 
debezium connector  (#12145) (#14041)
7792fac is described below

commit 7792facfbff68e04273cc0aa32c3d25ea5f2d1d9
Author: Rui Fu <[email protected]>
AuthorDate: Thu Feb 24 21:00:01 2022 +0800

    [pulsar-io] pass client builder if no service url provided to debezium 
connector  (#12145) (#14041)
---
 .../org/apache/pulsar/io/debezium/DebeziumSource.java  | 18 +++++++-----------
 .../pulsar/io/debezium/PulsarDatabaseHistory.java      |  6 +++---
 .../io/sources/debezium/DebeziumMySqlSourceTester.java |  7 +++++--
 .../io/sources/debezium/PulsarDebeziumSourcesTest.java | 15 +++++++++++----
 4 files changed, 26 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index b9074b9..eeb216b 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pulsar.io.debezium;
 
-import io.debezium.relational.history.DatabaseHistory;
 import java.util.Map;
-
+import io.debezium.relational.history.DatabaseHistory;
 import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.naming.TopicName;
@@ -50,10 +49,7 @@ public abstract class DebeziumSource extends 
KafkaConnectSource {
     }
 
     public static void setConfigIfNull(Map<String, Object> config, String key, 
String value) {
-        Object orig = config.get(key);
-        if (orig == null) {
-            config.put(key, value);
-        }
+        config.putIfAbsent(key, value);
     }
 
     // namespace for output topics, default value is "tenant/namespace"
@@ -81,9 +77,6 @@ public abstract class DebeziumSource extends 
KafkaConnectSource {
 
         // database.history.pulsar.service.url
         String pulsarUrl = (String) 
config.get(PulsarDatabaseHistory.SERVICE_URL.name());
-        if (StringUtils.isEmpty(pulsarUrl)) {
-            throw new IllegalArgumentException("Pulsar service URL for History 
Database not provided.");
-        }
 
         String topicNamespace = topicNamespace(sourceContext);
         // topic.namespace
@@ -97,8 +90,11 @@ public abstract class DebeziumSource extends 
KafkaConnectSource {
         setConfigIfNull(config, 
PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
             topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
 
-        config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + 
"pulsar.client.builder",
-                SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()));
+        // pass pulsar.client.builder if database.history.pulsar.service.url 
is not provided
+        if (StringUtils.isEmpty(pulsarUrl)) {
+            String pulsarClientBuilder = 
SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
+            config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), 
pulsarClientBuilder);
+        }
 
         super.open(config, sourceContext);
     }
diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index be152a6..c97e101 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -103,12 +103,12 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         }
         this.topicName = config.getString(TOPIC);
 
-        if (config.getString(CLIENT_BUILDER) == null && 
config.getString(SERVICE_URL) == null) {
+        String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
+        if (isBlank(clientBuilderBase64Encoded) && 
isBlank(config.getString(SERVICE_URL))) {
             throw new IllegalArgumentException("Neither Pulsar Service URL nor 
ClientBuilder provided.");
         }
-        String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
         this.clientBuilder = PulsarClient.builder();
-        if (null != clientBuilderBase64Encoded) {
+        if (!isBlank(clientBuilderBase64Encoded)) {
             // deserialize the client builder to the same classloader
             this.clientBuilder = (ClientBuilder) 
SerDeUtils.deserialize(clientBuilderBase64Encoded, 
this.clientBuilder.getClass().getClassLoader());
         } else {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 3cb64db..7958fa0 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -49,7 +49,8 @@ public class DebeziumMySqlSourceTester extends 
SourceTester<DebeziumMySQLContain
 
     private final PulsarCluster pulsarCluster;
 
-    public DebeziumMySqlSourceTester(PulsarCluster cluster, String 
converterClassName) {
+    public DebeziumMySqlSourceTester(PulsarCluster cluster, String 
converterClassName,
+                                     boolean testWithClientBuilder) {
         super(NAME);
         this.pulsarCluster = cluster;
         pulsarServiceUrl = "pulsar://pulsar-proxy:" + 
PulsarContainer.BROKER_PORT;
@@ -61,7 +62,9 @@ public class DebeziumMySqlSourceTester extends 
SourceTester<DebeziumMySQLContain
         sourceConfig.put("database.server.id", "184054");
         sourceConfig.put("database.server.name", "dbserver1");
         sourceConfig.put("database.whitelist", "inventory");
-        sourceConfig.put("database.history.pulsar.service.url", 
pulsarServiceUrl);
+        if (!testWithClientBuilder) {
+            sourceConfig.put("database.history.pulsar.service.url", 
pulsarServiceUrl);
+        }
         sourceConfig.put("key.converter", converterClassName);
         sourceConfig.put("value.converter", converterClassName);
         sourceConfig.put("topic.namespace", "debezium/mysql-" +
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 9836319..246b52b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -45,13 +45,18 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
 
     @Test(groups = "source")
     public void testDebeziumMySqlSourceJson() throws Exception {
-        
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
+        
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, 
false);
+    }
+
+    @Test(groups = "source")
+    public void testDebeziumMySqlSourceJsonWithClientBuilder() throws 
Exception {
+        
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, 
true);
     }
 
     @Test(groups = "source")
     public void testDebeziumMySqlSourceAvro() throws Exception {
         testDebeziumMySqlConnect(
-                
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false);
+                
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false, 
false);
     }
 
     @Test(groups = "source")
@@ -59,6 +64,7 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
         
testDebeziumPostgreSqlConnect("org.apache.kafka.connect.json.JsonConverter", 
true);
     }
 
+
     @Test(groups = "source")
     public void testDebeziumMongoDbSource() throws Exception{
         
testDebeziumMongoDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
@@ -69,7 +75,8 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
         
testDebeziumMsSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
     }
 
-    private void testDebeziumMySqlConnect(String converterClassName, boolean 
jsonWithEnvelope) throws Exception {
+    private void testDebeziumMySqlConnect(String converterClassName, boolean 
jsonWithEnvelope,
+                                          boolean testWithClientBuilder) 
throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
@@ -104,7 +111,7 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
         admin.topics().createNonPartitionedTopic(outputTopicName);
 
         @Cleanup
-        DebeziumMySqlSourceTester sourceTester = new 
DebeziumMySqlSourceTester(pulsarCluster, converterClassName);
+        DebeziumMySqlSourceTester sourceTester = new 
DebeziumMySqlSourceTester(pulsarCluster, converterClassName, 
testWithClientBuilder);
         sourceTester.getSourceConfig().put("json-with-envelope", 
jsonWithEnvelope);
 
         // setup debezium mysql server

Reply via email to