This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 7792fac [pulsar-io] pass client builder if no service url provided to
debezium connector (#12145) (#14041)
7792fac is described below
commit 7792facfbff68e04273cc0aa32c3d25ea5f2d1d9
Author: Rui Fu <[email protected]>
AuthorDate: Thu Feb 24 21:00:01 2022 +0800
[pulsar-io] pass client builder if no service url provided to debezium
connector (#12145) (#14041)
---
.../org/apache/pulsar/io/debezium/DebeziumSource.java | 18 +++++++-----------
.../pulsar/io/debezium/PulsarDatabaseHistory.java | 6 +++---
.../io/sources/debezium/DebeziumMySqlSourceTester.java | 7 +++++--
.../io/sources/debezium/PulsarDebeziumSourcesTest.java | 15 +++++++++++----
4 files changed, 26 insertions(+), 20 deletions(-)
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index b9074b9..eeb216b 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -18,9 +18,8 @@
*/
package org.apache.pulsar.io.debezium;
-import io.debezium.relational.history.DatabaseHistory;
import java.util.Map;
-
+import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.TopicName;
@@ -50,10 +49,7 @@ public abstract class DebeziumSource extends
KafkaConnectSource {
}
public static void setConfigIfNull(Map<String, Object> config, String key,
String value) {
- Object orig = config.get(key);
- if (orig == null) {
- config.put(key, value);
- }
+ config.putIfAbsent(key, value);
}
// namespace for output topics, default value is "tenant/namespace"
@@ -81,9 +77,6 @@ public abstract class DebeziumSource extends
KafkaConnectSource {
// database.history.pulsar.service.url
String pulsarUrl = (String)
config.get(PulsarDatabaseHistory.SERVICE_URL.name());
- if (StringUtils.isEmpty(pulsarUrl)) {
- throw new IllegalArgumentException("Pulsar service URL for History
Database not provided.");
- }
String topicNamespace = topicNamespace(sourceContext);
// topic.namespace
@@ -97,8 +90,11 @@ public abstract class DebeziumSource extends
KafkaConnectSource {
setConfigIfNull(config,
PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
- config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING +
"pulsar.client.builder",
- SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()));
+ // pass pulsar.client.builder if database.history.pulsar.service.url
is not provided
+ if (StringUtils.isEmpty(pulsarUrl)) {
+ String pulsarClientBuilder =
SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
+ config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(),
pulsarClientBuilder);
+ }
super.open(config, sourceContext);
}
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index be152a6..c97e101 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -103,12 +103,12 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
}
this.topicName = config.getString(TOPIC);
- if (config.getString(CLIENT_BUILDER) == null &&
config.getString(SERVICE_URL) == null) {
+ String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
+ if (isBlank(clientBuilderBase64Encoded) &&
isBlank(config.getString(SERVICE_URL))) {
throw new IllegalArgumentException("Neither Pulsar Service URL nor
ClientBuilder provided.");
}
- String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
this.clientBuilder = PulsarClient.builder();
- if (null != clientBuilderBase64Encoded) {
+ if (!isBlank(clientBuilderBase64Encoded)) {
// deserialize the client builder to the same classloader
this.clientBuilder = (ClientBuilder)
SerDeUtils.deserialize(clientBuilderBase64Encoded,
this.clientBuilder.getClass().getClassLoader());
} else {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 3cb64db..7958fa0 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -49,7 +49,8 @@ public class DebeziumMySqlSourceTester extends
SourceTester<DebeziumMySQLContain
private final PulsarCluster pulsarCluster;
- public DebeziumMySqlSourceTester(PulsarCluster cluster, String
converterClassName) {
+ public DebeziumMySqlSourceTester(PulsarCluster cluster, String
converterClassName,
+ boolean testWithClientBuilder) {
super(NAME);
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" +
PulsarContainer.BROKER_PORT;
@@ -61,7 +62,9 @@ public class DebeziumMySqlSourceTester extends
SourceTester<DebeziumMySQLContain
sourceConfig.put("database.server.id", "184054");
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.whitelist", "inventory");
- sourceConfig.put("database.history.pulsar.service.url",
pulsarServiceUrl);
+ if (!testWithClientBuilder) {
+ sourceConfig.put("database.history.pulsar.service.url",
pulsarServiceUrl);
+ }
sourceConfig.put("key.converter", converterClassName);
sourceConfig.put("value.converter", converterClassName);
sourceConfig.put("topic.namespace", "debezium/mysql-" +
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 9836319..246b52b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -45,13 +45,18 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
-
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
+
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true,
false);
+ }
+
+ @Test(groups = "source")
+ public void testDebeziumMySqlSourceJsonWithClientBuilder() throws
Exception {
+
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true,
true);
}
@Test(groups = "source")
public void testDebeziumMySqlSourceAvro() throws Exception {
testDebeziumMySqlConnect(
-
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false);
+
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false,
false);
}
@Test(groups = "source")
@@ -59,6 +64,7 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
testDebeziumPostgreSqlConnect("org.apache.kafka.connect.json.JsonConverter",
true);
}
+
@Test(groups = "source")
public void testDebeziumMongoDbSource() throws Exception{
testDebeziumMongoDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
@@ -69,7 +75,8 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
testDebeziumMsSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
}
- private void testDebeziumMySqlConnect(String converterClassName, boolean
jsonWithEnvelope) throws Exception {
+ private void testDebeziumMySqlConnect(String converterClassName, boolean
jsonWithEnvelope,
+ boolean testWithClientBuilder)
throws Exception {
final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
@@ -104,7 +111,7 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
admin.topics().createNonPartitionedTopic(outputTopicName);
@Cleanup
- DebeziumMySqlSourceTester sourceTester = new
DebeziumMySqlSourceTester(pulsarCluster, converterClassName);
+ DebeziumMySqlSourceTester sourceTester = new
DebeziumMySqlSourceTester(pulsarCluster, converterClassName,
testWithClientBuilder);
sourceTester.getSourceConfig().put("json-with-envelope",
jsonWithEnvelope);
// setup debezium mysql server