This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4150f47 [Pulsar IO]: Add a source connector for debezium postgres
(#3924)
4150f47 is described below
commit 4150f472860b907e6c6a47039f110fbec496a684
Author: Jia Zhai <[email protected]>
AuthorDate: Sat Mar 30 05:39:20 2019 +0800
[Pulsar IO]: Add a source connector for debezium postgres (#3924)
This PR try to add a source connector for debezium postgres.
changes:
- add debezium postges project;
- add DebeziumSource, and make mysql and postgres both inherit from it.
---
pom.xml | 2 +-
.../apache/pulsar/io/debezium/DebeziumSource.java} | 35 ++++-----
.../io/debezium/mysql/DebeziumMysqlSource.java | 82 ++--------------------
pulsar-io/debezium/pom.xml | 1 +
pulsar-io/debezium/{ => postgres}/pom.xml | 37 +++++++---
.../debezium/postgres/DebeziumPostgresSource.java | 37 ++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 22 ++++++
.../resources/debezium-postgres-source-config.yaml | 41 +++++++++++
8 files changed, 148 insertions(+), 109 deletions(-)
diff --git a/pom.xml b/pom.xml
index 539b5b8..d7aa9fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,7 +187,7 @@ flexible messaging model and an intuitive client
API.</description>
<presto.version>0.206</presto.version>
<flink.version>1.6.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
- <debezium.version>0.8.2</debezium.version>
+ <debezium.version>0.9.2.Final</debezium.version>
<jsonwebtoken.version>0.10.5</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<zstd.version>1.3.7-3</zstd.version>
diff --git
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
similarity index 78%
copy from
pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
copy to
pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index e8fe0c3..c207a95 100644
---
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -16,32 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.debezium.mysql;
+package org.apache.pulsar.io.debezium;
import java.util.Map;
-import io.debezium.connector.mysql.MySqlConnectorConfig;
-import lombok.extern.slf4j.Slf4j;
+import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.debezium.PulsarDatabaseHistory;
import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
-/**
- * A pulsar source that runs
- */
-@Slf4j
-public class DebeziumMysqlSource extends KafkaConnectSource {
- static private final String DEFAULT_TASK =
"io.debezium.connector.mysql.MySqlConnectorTask";
+public abstract class DebeziumSource extends KafkaConnectSource {
static private final String DEFAULT_CONVERTER =
"org.apache.kafka.connect.json.JsonConverter";
static private final String DEFAULT_HISTORY =
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
- static private final String DEFAULT_OFFSET_TOPIC =
"debezium-mysql-offset-topic";
- static private final String DEFAULT_HISTORY_TOPIC =
"debezium-mysql-history-topic";
+ static private final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
+ static private final String DEFAULT_HISTORY_TOPIC =
"debezium-history-topic";
- private static void throwExceptionIfConfigNotMatch(Map<String, Object>
config,
+ public static void throwExceptionIfConfigNotMatch(Map<String, Object>
config,
String key,
String value) throws
IllegalArgumentException {
Object orig = config.get(key);
@@ -56,15 +48,15 @@ public class DebeziumMysqlSource extends KafkaConnectSource
{
}
}
- private static void setConfigIfNull(Map<String, Object> config, String
key, String value) {
+ public static void setConfigIfNull(Map<String, Object> config, String key,
String value) {
Object orig = config.get(key);
if (orig == null) {
config.put(key, value);
}
}
- // namespace: tenant/namespace
- private static String topicNamespace(SourceContext sourceContext) {
+ // namespace for output topics, default value is "tenant/namespace"
+ public static String topicNamespace(SourceContext sourceContext) {
String tenant = sourceContext.getTenant();
String namespace = sourceContext.getNamespace();
@@ -72,18 +64,19 @@ public class DebeziumMysqlSource extends KafkaConnectSource
{
(StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE :
namespace);
}
+ public abstract void setDbConnectorTask(Map<String, Object> config) throws
Exception;
+
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- // connector task
- throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
+ setDbConnectorTask(config);
// key.converter
setConfigIfNull(config,
PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
// value.converter
setConfigIfNull(config,
PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
- // database.history implementation class
- setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(),
DEFAULT_HISTORY);
+ // database.history : implementation class for database history.
+ setConfigIfNull(config,
HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(),
DEFAULT_HISTORY);
// database.history.pulsar.service.url, this is set as the value of
pulsar.service.url if null.
String serviceUrl = (String)
config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
diff --git
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
index e8fe0c3..316abe3 100644
---
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
+++
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
@@ -20,91 +20,17 @@ package org.apache.pulsar.io.debezium.mysql;
import java.util.Map;
-import io.debezium.connector.mysql.MySqlConnectorConfig;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.debezium.PulsarDatabaseHistory;
-import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
-import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
+import org.apache.pulsar.io.debezium.DebeziumSource;
/**
- * A pulsar source that runs
+ * A pulsar source that runs debezium mysql source
*/
-@Slf4j
-public class DebeziumMysqlSource extends KafkaConnectSource {
+public class DebeziumMysqlSource extends DebeziumSource {
static private final String DEFAULT_TASK =
"io.debezium.connector.mysql.MySqlConnectorTask";
- static private final String DEFAULT_CONVERTER =
"org.apache.kafka.connect.json.JsonConverter";
- static private final String DEFAULT_HISTORY =
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
- static private final String DEFAULT_OFFSET_TOPIC =
"debezium-mysql-offset-topic";
- static private final String DEFAULT_HISTORY_TOPIC =
"debezium-mysql-history-topic";
-
- private static void throwExceptionIfConfigNotMatch(Map<String, Object>
config,
- String key,
- String value) throws
IllegalArgumentException {
- Object orig = config.get(key);
- if (orig == null) {
- config.put(key, value);
- return;
- }
-
- // throw exception if value not match
- if (!orig.equals(value)) {
- throw new IllegalArgumentException("Expected " + value + " but has
" + orig);
- }
- }
-
- private static void setConfigIfNull(Map<String, Object> config, String
key, String value) {
- Object orig = config.get(key);
- if (orig == null) {
- config.put(key, value);
- }
- }
-
- // namespace: tenant/namespace
- private static String topicNamespace(SourceContext sourceContext) {
- String tenant = sourceContext.getTenant();
- String namespace = sourceContext.getNamespace();
-
- return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT :
tenant) + "/" +
- (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE :
namespace);
- }
@Override
- public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- // connector task
+ public void setDbConnectorTask(Map<String, Object> config) throws
Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
-
- // key.converter
- setConfigIfNull(config,
PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
- // value.converter
- setConfigIfNull(config,
PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
-
- // database.history implementation class
- setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(),
DEFAULT_HISTORY);
-
- // database.history.pulsar.service.url, this is set as the value of
pulsar.service.url if null.
- String serviceUrl = (String)
config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
- if (serviceUrl == null) {
- throw new IllegalArgumentException("Pulsar service URL not
provided.");
- }
- setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(),
serviceUrl);
-
- String topicNamespace = topicNamespace(sourceContext);
- // topic.namespace
- setConfigIfNull(config,
PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
-
- String sourceName = sourceContext.getSourceName();
- // database.history.pulsar.topic: history topic name
- setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(),
- topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC);
- // offset.storage.topic: offset topic name
- setConfigIfNull(config,
PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
- topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
-
- super.open(config, sourceContext);
}
-
}
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 37a50cc..5d6b3a1 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>core</module>
<module>mysql</module>
+ <module>postgres</module>
</modules>
</project>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/postgres/pom.xml
similarity index 56%
copy from pulsar-io/debezium/pom.xml
copy to pulsar-io/debezium/postgres/pom.xml
index 37a50cc..927db6e 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/postgres/pom.xml
@@ -19,21 +19,40 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
<parent>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io</artifactId>
+ <artifactId>pulsar-io-debezium</artifactId>
<version>2.4.0-SNAPSHOT</version>
</parent>
- <artifactId>pulsar-io-debezium</artifactId>
- <name>Pulsar IO :: Debezium</name>
+ <artifactId>pulsar-io-debezium-postgres</artifactId>
+ <name>Pulsar IO :: Debezium :: postgres</name>
- <modules>
- <module>core</module>
- <module>mysql</module>
- </modules>
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-debezium-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-postgres</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
new file mode 100644
index 0000000..fcd539e
--- /dev/null
+++
b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium.postgres;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.io.debezium.DebeziumSource;
+
+
+/**
+ * A pulsar source that runs debezium postgres source
+ */
+public class DebeziumPostgresSource extends DebeziumSource {
+ static private final String DEFAULT_TASK =
"io.debezium.connector.postgresql.PostgresConnectorTask";
+
+ @Override
+ public void setDbConnectorTask(Map<String, Object> config) throws
Exception {
+ throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
+ }
+}
diff --git
a/pulsar-io/debezium/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/debezium/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..6577d7d
--- /dev/null
+++
b/pulsar-io/debezium/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: debezium-postgres
+description: Debezium Postgres Source
+sourceClass: org.apache.pulsar.io.debezium.postgres.DebeziumPostgresSource
diff --git
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
new file mode 100644
index 0000000..e24f2e1
--- /dev/null
+++
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "public"
+namespace: "default"
+name: "debezium-postgres-source"
+topicName: "debezium-postgres-topic"
+archive: "connectors/pulsar-io-debezium-postgres-2.4.0-SNAPSHOT.nar"
+
+parallelism: 1
+
+configs:
+ ## config for pg, docker image: debezium/example-postgress:0.8
+ database.hostname: "localhost"
+ database.port: "5432"
+ database.user: "postgres"
+ database.password: "postgres"
+ database.dbname: "postgres"
+ database.server.name: "dbserver1"
+ schema.whitelist: "inventory"
+
+ ## PULSAR_SERVICE_URL_CONFIG
+ pulsar.service.url: "pulsar://127.0.0.1:6650"
+
+