This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ac5d258 Beam transform that uses DebeziumIO connector to support CDC
new 32a0ff1 Merge pull request #13983 from [BEAM-11818] DebeziumIO
connector to support CDC
ac5d258 is described below
commit ac5d258dcc598a2ef13f0609debefae38530d14a
Author: Juan Sandoval <[email protected]>
AuthorDate: Tue Feb 16 20:01:40 2021 -0800
Beam transform that uses DebeziumIO connector to support CDC
Debeziumio PoC (#7)
* New DebeziumIO class.
* Merge connector code
* DebeziumIO and MySqlConnector integrated.
* Added FormatFuntion param to Read builder on DebeziumIO.
* Added arguments checker to DebeziumIO.
* Add simple JSON mapper object (#1)
* Add simple JSON mapper object
* Fixed Mapper.
* Add SqlServer connector test
* Added PostgreSql Connector Test
PostgreSql now works with Json mapper
* Added PostgreSql Connector Test
PostgreSql now works with Json mapper
* Fixing MySQL schema DataException
Using file instead of schema should fix it
* MySQL Connector updated from 1.3.0 to 1.3.1
Co-authored-by: osvaldo-salinas <[email protected]>
Co-authored-by: Carlos Dominguez <[email protected]>
Co-authored-by: Carlos Domínguez <[email protected]>
* Add debeziumio tests
* Debeziumio testing json mapper (#3)
* Some code refactors. Use a default DBHistory if not provided
* Add basic tests for Json mapper
* Debeziumio time restriction (#5)
* Add simple JSON mapper object
* Fixed Mapper.
* Add SqlServer connector test
* Added PostgreSql Connector Test
PostgreSql now works with Json mapper
* Added PostgreSql Connector Test
PostgreSql now works with Json mapper
* Fixing MySQL schema DataException
Using file instead of schema should fix it
* MySQL Connector updated from 1.3.0 to 1.3.1
* Some code refactors. Use a default DBHistory if not provided
* Adding based-time restriction
Stop polling after specified amount of time
* Add basic tests for Json mapper
* Adding new restriction
Uses a time-based restriction
* Adding optional restrcition
Uses an optional time-based restriction
Co-authored-by: juanitodread <[email protected]>
Co-authored-by: osvaldo-salinas <[email protected]>
* Upgrade DebeziumIO connector (#4)
* Address comments (Change dependencies to testCompile, Set
JsonMapper/Coder as default, refactors) (#8)
* Revert file
* Change dependencies to testCompile
* Move Counter sample to unit test
* Set JsonMapper as default mapper function
* Set String Coder as default coder when using JsonMapper
* Change logs from info to debug
* Debeziumio javadoc (#9)
* Adding javadoc
* Added some titles and examples
* Added SourceRecordJson doc
* Added Basic Connector doc
* Added KafkaSourceConsumer doc
* Javadoc cleanup
* Removing BasicConnector
No usages of this class were found overall
* Editing documentation
* Debeziumio fetched records restriction (#10)
* Adding javadoc
* Adding restriction by number of fetched records
Also adding a quick-fix for null value within SourceRecords
Minor fix on both MySQL and PostgreSQL Connectors Tests
* Run either by time or by number of records
* Added DebeziumOffsetTrackerTest
Tests both restrictions: By amount of time and by Number of records
* Removing comment
* DebeziumIO test for DB2. (#11)
* DebeziumIO test for DB2.
* DebeziumIO javadoc.
* Clean code:removed commented code lines on DebeziumIOConnectorTest.java
* Clean code:removing unused imports and using readAsJson().
Co-authored-by: Carlos Domínguez
<[email protected]>
* Debezium limit records (now configurable) (#12)
* Adding javadoc
* Records Limit is now configurable
(It was fixed before)
* Debeziumio dockerize (#13)
* Add mysql docker container to tests
* Move debezium mysql integration test to its own file
* Add assertion to verify that the results contains a record.
* Debeziumio readme (#15)
* Adding javadoc
* Adding README file
* Add number of records configuration to the DebeziumIO component (#16)
* Code refactors (#17)
* Remove/ignore null warnings
* Remove DB2 code
* Remove docker dependency in DebeziumIO unit test and max number of recods
to MySql integration test
* Change access modifiers accordingly
* Remove incomplete integration tests (Postgres and SqlServer)
* Add experimenal tag
* Debezium testing stoppable consumer (#18)
* Add try-catch-finally, stop SourceTask at finally.
* Fix warnings
* stopConsumer and processedRecords local variables removed. UT for task
stop use case added
* Fix minor code style issue
Co-authored-by: juanitodread <[email protected]>
* Fix style issues (check, spotlessApply) (#19)
Co-authored-by: Osvaldo Salinas <[email protected]>
Co-authored-by: alejandro.maguey <[email protected]>
Co-authored-by: osvaldo-salinas <[email protected]>
Co-authored-by: Carlos Dominguez <[email protected]>
Co-authored-by: Carlos Domínguez <[email protected]>
Co-authored-by: Carlos Domínguez
<[email protected]>
Co-authored-by: Alejandro Maguey <[email protected]>
Co-authored-by: Hassan Reyes <[email protected]>
Add missing apache license to README.md
Enabling integration test for DebeziumIO (#20)
Rename connector package cdc=>debezium. Update doc references (#21)
Fix code style on DebeziumIOMySqlConnectorIT
---
build.gradle.kts | 1 +
sdks/java/io/debezium/build.gradle | 83 ++++
sdks/java/io/debezium/src/README.md | 178 +++++++
.../org/apache/beam/io/debezium/DebeziumIO.java | 515 +++++++++++++++++++++
.../beam/io/debezium/KafkaSourceConsumerFn.java | 394 ++++++++++++++++
.../apache/beam/io/debezium/SourceRecordJson.java | 287 ++++++++++++
.../beam/io/debezium/SourceRecordMapper.java | 31 ++
.../org/apache/beam/io/debezium/package-info.java | 28 ++
.../io/debezium/DebeziumIOMySqlConnectorIT.java | 108 +++++
.../apache/beam/io/debezium/DebeziumIOTest.java | 101 ++++
.../io/debezium/KafkaSourceConsumerFnTest.java | 264 +++++++++++
.../apache/beam/io/debezium/OffsetTrackerTest.java | 71 +++
.../beam/io/debezium/SourceRecordJsonTest.java | 113 +++++
settings.gradle.kts | 1 +
14 files changed, 2175 insertions(+)
diff --git a/build.gradle.kts b/build.gradle.kts
index c2af480..5b51c77 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -163,6 +163,7 @@ task("javaPostCommit") {
dependsOn(":runners:google-cloud-dataflow-java:postCommitRunnerV2")
dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit")
dependsOn(":sdks:java:extensions:zetasketch:postCommit")
+ dependsOn(":sdks:java:io:debezium:integrationTest")
dependsOn(":sdks:java:io:google-cloud-platform:postCommit")
dependsOn(":sdks:java:io:kinesis:integrationTest")
dependsOn(":sdks:java:extensions:ml:postCommit")
diff --git a/sdks/java/io/debezium/build.gradle
b/sdks/java/io/debezium/build.gradle
new file mode 100644
index 0000000..3c43f98
--- /dev/null
+++ b/sdks/java/io/debezium/build.gradle
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.io.debezium',
+ mavenRepositories: [
+ [id: 'io.confluent', url:
'https://packages.confluent.io/maven/']
+ ],
+ checkerTooSlowOnTests: true,
+ enableSpotbugs: false,
+)
+provideIntegrationTestingDependencies()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Debezium"
+ext.summary = "Library to work with Debezium data."
+
+dependencies {
+ compile library.java.vendored_guava_26_0_jre
+ compile project(path: ":sdks:java:core", configuration: "shadow")
+ compile library.java.slf4j_api
+ compile library.java.joda_time
+ provided library.java.jackson_dataformat_csv
+ testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
+ testCompile project(path: ":sdks:java:io:common", configuration:
"testRuntime")
+
+ // Test dependencies
+ testCompile library.java.junit
+ testRuntimeOnly library.java.slf4j_jdk14
+ testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
+ testCompile project(":runners:google-cloud-dataflow-java")
+ testCompile "org.testcontainers:testcontainers:1.15.1"
+ testCompile "org.testcontainers:mysql:1.15.1"
+
+ // Kafka connect dependencies
+ compile "org.apache.kafka:connect-api:2.5.0"
+ compile "org.apache.kafka:connect-json:2.5.0"
+
+ // Debezium dependencies
+ compile group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final'
+ testCompile group: 'io.debezium', name: 'debezium-connector-mysql',
version: '1.3.1.Final'
+}
+
+test {
+ testLogging {
+ outputs.upToDateWhen {false}
+ showStandardStreams = true
+ }
+}
+
+
+task integrationTest(type: Test, dependsOn: processTestResources) {
+ group = "Verification"
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+ "--runner=DirectRunner",
+ ])
+
+ // Disable Gradle cache: these ITs interact with live service that should
always be considered "out of date"
+ outputs.upToDateWhen { false }
+
+ include '**/*IT.class'
+ classpath = sourceSets.test.runtimeClasspath
+ testClassesDirs = sourceSets.test.output.classesDirs
+
+ useJUnit {
+ }
+}
diff --git a/sdks/java/io/debezium/src/README.md
b/sdks/java/io/debezium/src/README.md
new file mode 100644
index 0000000..4cf9be8
--- /dev/null
+++ b/sdks/java/io/debezium/src/README.md
@@ -0,0 +1,178 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DebeziumIO
+## Connect your Debezium Databases to Apache Beam easily.
+
+### What is DebeziumIO?
+DebeziumIO is an Apache Beam connector that lets users connect their
Events-Driven Databases on [Debezium](https://debezium.io) to [Apache
Beam](https://beam.apache.org/) without the need to set up a
[Kafka](https://kafka.apache.org/) instance.
+
+### Getting Started
+
+DebeziumIO uses [Debezium Connectors
v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect
to Apache Beam. All you need to do is choose the Debezium Connector that suits
your Debezium setup and pick a [Serializable
Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html),
then you will be able to connect to Apache Beam and start building your own
Pipelines.
+
+These connectors have been successfully tested and are known to work fine:
+* MySQL Connector
+* PostgreSQL Connector
+* SQLServer Connector
+* DB2 Connector
+
+Other connectors might also work.
+
+
+Setting up a connector and running a Pipeline should be as simple as:
+```
+Pipeline p = Pipeline.create(); // Create a Pipeline
+ p.apply(DebeziumIO.<String>read()
+ .withConnectorConfiguration(...) // Debezium Connector setup
+ .withFormatFunction(...) // Serializable Function to
use
+ ).setCoder(StringUtf8Coder.of());
+p.run().waitUntilFinish(); // Run your pipeline!
+```
+
+### Setting up a Debezium Connector
+
+DebeziumIO comes with a handy ConnectorConfiguration builder, which lets you
provide all the configuration needed to access your Debezium Database.
+
+A basic configuration such as **username**, **password**, **port number**, and
**host name** must be specified along with the **Debezium Connector class** you
will use by using these methods:
+
+|Method|Param|Description|
+|-|-|-|
+|`.withConnectorClass(connectorClass)`|_Class_|Debezium Connector|
+|`.withUsername(username)`|_String_|Database Username|
+|`.withPassword(password)`|_String_|Database Password|
+|`.withHostName(hostname)`|_String_|Database Hostname|
+|`.withPort(portNumber)`|_String_|Database Port number|
+
+You can also add more configuration, such as Connector-specific Properties
with the `_withConnectionProperty_` method:
+
+|Method|Params|Description|
+|-|-|-|
+|`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a
custom property to the connector.|
+> **Note:** For more information on custom properties, see your [Debezium
Connector](https://debezium.io/documentation/reference/1.3/connectors/)
specific documentation.
+
+Example of a MySQL Debezium Connector setup:
+```
+DebeziumIO.ConnectorConfiguration.create()
+ .withUsername("dbUsername")
+ .withPassword("dbPassword")
+ .withConnectorClass(MySqlConnector.class)
+ .withHostName("127.0.0.1")
+ .withPort("3306")
+ .withConnectionProperty("database.server.id", "serverId")
+ .withConnectionProperty("database.server.name", "serverName")
+ .withConnectionProperty("database.include.list", "dbName")
+ .withConnectionProperty("include.schema.changes", "false")
+```
+
+### Setting a Serializable Function
+
+A serializable function is required to depict each `SourceRecord` fetched from
the Database.
+
+DebeziumIO comes with a built-in JSON Mapper that you can optionally use to
map every `SourceRecord` fetched from the Database to a JSON object. This helps
users visualize and access their data in a simple way.
+
+If you want to use this built-in JSON Mapper, you can do it by setting an
instance of **SourceRecordJsonMapper** as a Serializable Function to the
DebeziumIO:
+```
+.withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
+```
+> **Note:** `SourceRecordJsonMapper`comes out of the box, but you may use any
Format Function you prefer.
+
+## Quick Example
+
+The following example is how an actual setup would look like using a **MySQL
Debezium Connector** and **SourceRecordJsonMapper** as the Serializable
Function.
+```
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+p.apply(DebeziumIO.<String>read().
+ withConnectorConfiguration( // Debezium Connector
setup
+ DebeziumIO.ConnectorConfiguration.create()
+ .withUsername("debezium")
+ .withPassword("dbz")
+ .withConnectorClass(MySqlConnector.class)
+ .withHostName("127.0.0.1")
+ .withPort("3306")
+ .withConnectionProperty("database.server.id", "184054")
+ .withConnectionProperty("database.server.name",
"dbserver1")
+ .withConnectionProperty("database.include.list",
"inventory")
+ .withConnectionProperty("include.schema.changes",
"false")
+ ).withFormatFunction(
+ new SourceRecordJson.SourceRecordJsonMapper() // Serializable
Function
+ )
+).setCoder(StringUtf8Coder.of());
+
+p.run().waitUntilFinish();
+```
+
+## Shortcut!
+
+If you will be using the built-in **SourceRecordJsonMapper** as your
Serializable Function for all your pipelines, you should use **readAsJson()**.
+
+DebeziumIO comes with a method called `readAsJson`, which automatically sets
the `SourceRecordJsonMapper` as the Serializable Function for your pipeline.
This way, you would need to setup your connector before running your pipeline,
without explicitly setting a Serializable Function.
+
+Example of using **readAsJson**:
+```
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+p.apply(DebeziumIO.<String>read().
+ withConnectorConfiguration( // Debezium Connector
setup
+ DebeziumIO.ConnectorConfiguration.create()
+ .withUsername("debezium")
+ .withPassword("dbz")
+ .withConnectorClass(MySqlConnector.class)
+ .withHostName("127.0.0.1")
+ .withPort("3306")
+ .withConnectionProperty("database.server.id", "184054")
+ .withConnectionProperty("database.server.name",
"dbserver1")
+ .withConnectionProperty("database.include.list",
"inventory")
+ .withConnectionProperty("include.schema.changes",
"false"));
+
+p.run().waitUntilFinish();
+```
+
+## Under the hood
+
+### KafkaSourceConsumerFn and Restrictions
+
+KafkaSourceConsumerFn (KSC onwards) is a
[DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html)
in charge of the Database replication and CDC.
+
+There are two ways of initializing KSC:
+* Restricted by number of records
+* Restricted by amount of time (minutes)
+
+By default, DebeziumIO initializes it with the former, though user may choose
the latter by setting the amount of minutes as a parameter:
+
+|Function|Param|Description|
+|-|-|-|
+|`KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)`|_Class,
SourceRecordMapper, Int_|Restrict run by number of records (Default).|
+|`KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)`|_Class,
SourceRecordMapper, Long_|Restrict run by amount of time (in minutes).|
+
+### Requirements and Supported versions
+
+- JDK v8
+- Debezium Connectors v1.3
+- Apache Beam 2.25
+
+## Running Unit Tests
+
+You can run Integration Tests using **gradlew**.
+
+Example of running the MySQL Connector Integration Test:
+```
+./gradlew integrationTest -p sdks/java/io/debezium/ --tests
org.apache.beam.io.debezium.DebeziumIOMySqlConnectorIT
-DintegrationTestRunner=direct
+```
\ No newline at end of file
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
new file mode 100644
index 0000000..b7c084f
--- /dev/null
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class which exposes an implementation {@link #read} and a Debezium
configuration.
+ *
+ * <h3>Quick Overview</h3>
+ *
+ * <p>This class lets Beam users connect to their existing Debezium
implementations in an easy way.
+ *
+ * <p>Any Kafka connector supported by Debezium should work fine with this IO.
+ *
+ * <p>The following connectors were tested and worked well in some simple
scenarios:
+ *
+ * <ul>
+ * <li>MySQL
+ * <li>PostgreSQL
+ * <li>SQLServer
+ * <li>DB2
+ * </ul>
+ *
+ * <h3>Usage example</h3>
+ *
+ * <p>Connect to a Debezium - MySQL database and run a Pipeline
+ *
+ * <pre>
+ * private static final ConnectorConfiguration mySqlConnectorConfig =
ConnectorConfiguration
+ * .create()
+ * .withUsername("uname")
+ * .withPassword("pwd123")
+ * .withHostName("127.0.0.1")
+ * .withPort("3306")
+ * .withConnectorClass(MySqlConnector.class)
+ * .withConnectionProperty("database.server.id", "184054")
+ * .withConnectionProperty("database.server.name", "serverid")
+ * .withConnectionProperty("database.include.list", "dbname")
+ * .withConnectionProperty("database.history",
DebeziumSDFDatabaseHistory.class.getName())
+ * .withConnectionProperty("include.schema.changes", "false");
+ *
+ * PipelineOptions options = PipelineOptionsFactory.create();
+ * Pipeline p = Pipeline.create(options);
+ * p.apply(DebeziumIO.read()
+ * .withConnectorConfiguration(mySqlConnectorConfig)
+ * .withFormatFunction(new
SourceRecordJson.SourceRecordJsonMapper())
+ * ).setCoder(StringUtf8Coder.of());
+ * p.run().waitUntilFinish();
+ * </pre>
+ *
+ * <p>In this example we are using {@link
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory} to
+ * handle the Database history.
+ *
+ * <h3>Dependencies</h3>
+ *
+ * <p>User may work with any of the supported Debezium Connectors above
mentioned
+ *
+ * <p>See <a
href="https://debezium.io/documentation/reference/1.3/connectors/index.html">Debezium
+ * Connectors</a> for more info.
+ */
+@Experimental(Kind.SOURCE_SINK)
+@SuppressWarnings({"nullness"})
+public class DebeziumIO {
+ private static final Logger LOG = LoggerFactory.getLogger(DebeziumIO.class);
+
+ /**
+ * Read data from a Debezium source.
+ *
+ * @param <T> Type of the data to be read.
+ */
+ public static <T> Read<T> read() {
+ return new AutoValue_DebeziumIO_Read.Builder<T>().build();
+ }
+
+ /**
+ * Read data from Debezium source and convert a Kafka {@link
+ * org.apache.kafka.connect.source.SourceRecord} into a JSON string using
{@link
+ * org.apache.beam.io.debezium.SourceRecordJson.SourceRecordJsonMapper} as
default function
+ * mapper.
+ *
+ * @return Reader object of String.
+ */
+ public static Read<String> readAsJson() {
+ return new AutoValue_DebeziumIO_Read.Builder<String>()
+ .setFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
+ .setCoder(StringUtf8Coder.of())
+ .build();
+ }
+
+ /** Disallow construction of utility class. */
+ private DebeziumIO() {}
+
+ /** Implementation of {@link #read}. */
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin,
PCollection<T>> {
+
+ private static final long serialVersionUID = 1L;
+
+ abstract @Nullable ConnectorConfiguration getConnectorConfiguration();
+
+ abstract @Nullable SourceRecordMapper<T> getFormatFunction();
+
+ abstract @Nullable Integer getMaxNumberOfRecords();
+
+ abstract @Nullable Coder<T> getCoder();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setConnectorConfiguration(ConnectorConfiguration
config);
+
+ abstract Builder<T> setCoder(Coder<T> coder);
+
+ abstract Builder<T> setFormatFunction(SourceRecordMapper<T> mapperFn);
+
+ abstract Builder<T> setMaxNumberOfRecords(Integer maxNumberOfRecords);
+
+ abstract Read<T> build();
+ }
+
+ /**
+ * Applies the given configuration to the connector. It cannot be null.
+ *
+ * @param config Configuration to be used within the connector.
+ * @return PTransform {@link #read}
+ */
+ public Read<T> withConnectorConfiguration(final ConnectorConfiguration
config) {
+ checkArgument(config != null, "config can not be null");
+ return toBuilder().setConnectorConfiguration(config).build();
+ }
+
+ /**
+ * Applies a {@link SourceRecordMapper} to the connector. It cannot be
null.
+ *
+ * @param mapperFn the mapper function to be used on each {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ * @return PTransform {@link #read}
+ */
+ public Read<T> withFormatFunction(SourceRecordMapper<T> mapperFn) {
+ checkArgument(mapperFn != null, "mapperFn can not be null");
+ return toBuilder().setFormatFunction(mapperFn).build();
+ }
+
+ /**
+ * Applies a {@link Coder} to the connector. It cannot be null
+ *
+ * @param coder The Coder to be used over the data.
+ * @return PTransform {@link #read}
+ */
+ public Read<T> withCoder(Coder<T> coder) {
+ checkArgument(coder != null, "coder can not be null");
+ return toBuilder().setCoder(coder).build();
+ }
+
+ /**
+ * Once the specified number of records has been reached, it will stop
fetching them. The value
+ * can be null (default) which means it will not stop.
+ *
+ * @param maxNumberOfRecords The maximum number of records to be fetched
before stop.
+ * @return PTransform {@link #read}
+ */
+ public Read<T> withMaxNumberOfRecords(Integer maxNumberOfRecords) {
+ return toBuilder().setMaxNumberOfRecords(maxNumberOfRecords).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ return input
+ .apply(
+
Create.of(Lists.newArrayList(getConnectorConfiguration().getConfigurationMap()))
+ .withCoder(MapCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of())))
+ .apply(
+ ParDo.of(
+ new KafkaSourceConsumerFn<>(
+ getConnectorConfiguration().getConnectorClass().get(),
+ getFormatFunction(),
+ getMaxNumberOfRecords())))
+ .setCoder(getCoder());
+ }
+ }
+
+ /** A POJO describing a Debezium configuration. */
+ @AutoValue
+ public abstract static class ConnectorConfiguration implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ abstract @Nullable ValueProvider<Class<?>> getConnectorClass();
+
+ abstract @Nullable ValueProvider<String> getHostName();
+
+ abstract @Nullable ValueProvider<String> getPort();
+
+ abstract @Nullable ValueProvider<String> getUsername();
+
+ abstract @Nullable ValueProvider<String> getPassword();
+
+ abstract @Nullable ValueProvider<SourceConnector> getSourceConnector();
+
+ abstract @Nullable ValueProvider<Map<String, String>>
getConnectionProperties();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectorClass(ValueProvider<Class<?>>
connectorClass);
+
+ abstract Builder setHostName(ValueProvider<String> hostname);
+
+ abstract Builder setPort(ValueProvider<String> port);
+
+ abstract Builder setUsername(ValueProvider<String> username);
+
+ abstract Builder setPassword(ValueProvider<String> password);
+
+ abstract Builder setConnectionProperties(
+ ValueProvider<Map<String, String>> connectionProperties);
+
+ abstract Builder setSourceConnector(ValueProvider<SourceConnector>
sourceConnector);
+
+ abstract ConnectorConfiguration build();
+ }
+
+ /**
+ * Creates a ConnectorConfiguration.
+ *
+ * @return {@link ConnectorConfiguration}
+ */
+ public static ConnectorConfiguration create() {
+ return new AutoValue_DebeziumIO_ConnectorConfiguration.Builder()
+ .setConnectionProperties(ValueProvider.StaticValueProvider.of(new
HashMap<>()))
+ .build();
+ }
+
+ /**
+ * Applies the connectorClass to be used to connect to your database.
+ *
+ * <p>Currently supported connectors are:
+ *
+ * <ul>
+ * <li>{@link io.debezium.connector.mysql.MySqlConnector}
+ * <li>{@link io.debezium.connector.postgresql.PostgresConnector}
+ * <li>{@link io.debezium.connector.sqlserver.SqlServerConnector }
+ * </ul>
+ *
+ * @param connectorClass Any of the supported connectors.
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withConnectorClass(Class<?> connectorClass) {
+ checkArgument(connectorClass != null, "connectorClass can not be null");
+ return
withConnectorClass(ValueProvider.StaticValueProvider.of(connectorClass));
+ }
+
+ /**
+ * Sets the connectorClass to be used to connect to your database. It
cannot be null.
+ *
+ * <p>Currently supported connectors are:
+ *
+ * <ul>
+ * <li>{@link io.debezium.connector.mysql.MySqlConnector}
+ * <li>{@link io.debezium.connector.postgresql.PostgresConnector}
+ * <li>{@link io.debezium.connector.sqlserver.SqlServerConnector }
+ * </ul>
+ *
+ * @param connectorClass (as ValueProvider)
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withConnectorClass(ValueProvider<Class<?>>
connectorClass) {
+ checkArgument(connectorClass != null, "connectorClass can not be null");
+ return builder().setConnectorClass(connectorClass).build();
+ }
+
+ /**
+ * Sets the host name to be used on the database. It cannot be null.
+ *
+ * @param hostName The hostname of your database.
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withHostName(String hostName) {
+ checkArgument(hostName != null, "hostName can not be null");
+ return withHostName(ValueProvider.StaticValueProvider.of(hostName));
+ }
+
+ /**
+ * Sets the host name to be used on the database. It cannot be null.
+ *
+ * @param hostName The hostname of your database (as ValueProvider).
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withHostName(ValueProvider<String> hostName)
{
+ checkArgument(hostName != null, "hostName can not be null");
+ return builder().setHostName(hostName).build();
+ }
+
+ /**
+ * Sets the port on which your database is listening. It cannot be null.
+ *
+ * @param port The port to be used to connect to your database (as
ValueProvider).
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withPort(String port) {
+ checkArgument(port != null, "port can not be null");
+ return withPort(ValueProvider.StaticValueProvider.of(port));
+ }
+
+ /**
+ * Sets the port on which your database is listening. It cannot be null.
+ *
+ * @param port The port to be used to connect to your database.
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withPort(ValueProvider<String> port) {
+ checkArgument(port != null, "port can not be null");
+ return builder().setPort(port).build();
+ }
+
+ /**
+ * Sets the username to connect to your database. It cannot be null.
+ *
+ * @param username Database username
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withUsername(String username) {
+ checkArgument(username != null, "username can not be null");
+ return withUsername(ValueProvider.StaticValueProvider.of(username));
+ }
+
+ /**
+ * Sets the username to connect to your database. It cannot be null.
+ *
+ * @param username (as ValueProvider).
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withUsername(ValueProvider<String> username)
{
+ checkArgument(username != null, "username can not be null");
+ return builder().setUsername(username).build();
+ }
+
+ /**
+ * Sets the password to connect to your database. It cannot be null.
+ *
+ * @param password Database password
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withPassword(String password) {
+ checkArgument(password != null, "password can not be null");
+ return withPassword(ValueProvider.StaticValueProvider.of(password));
+ }
+
+ /**
+ * Sets the password to connect to your database. It cannot be null.
+ *
+ * @param password (as ValueProvider).
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withPassword(ValueProvider<String> password)
{
+ checkArgument(password != null, "password can not be null");
+ return builder().setPassword(password).build();
+ }
+
+ /**
+ * Sets a custom property to be used within the connection to your
database.
+ *
+ * <p>You may use this to set special configurations such as:
+ *
+ * <ul>
+ * <li>slot.name
+ * <li>database.dbname
+ * <li>database.server.id
+ * <li>...
+ * </ul>
+ *
+ * @param connectionProperties Properties (Key, Value) Map
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withConnectionProperties(
+ Map<String, String> connectionProperties) {
+ checkArgument(connectionProperties != null, "connectionProperties can
not be null");
+ return
withConnectionProperties(ValueProvider.StaticValueProvider.of(connectionProperties));
+ }
+
+ /**
+ * Sets a custom property to be used within the connection to your
database.
+ *
+ * <p>You may use this to set special configurations such as:
+ *
+ * <ul>
+ * <li>slot.name
+ * <li>database.dbname
+ * <li>database.server.id
+ * <li>...
+ * </ul>
+ *
+ * @param connectionProperties (as ValueProvider).
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withConnectionProperties(
+ ValueProvider<Map<String, String>> connectionProperties) {
+ checkArgument(connectionProperties != null, "connectionProperties can
not be null");
+ return builder().setConnectionProperties(connectionProperties).build();
+ }
+
+ /**
+ * Sets a custom property to be used within the connection to your
database.
+ *
+ * <p>You may use this to set special configurations such as:
+ *
+ * <ul>
+ * <li>slot.name
+ * <li>database.dbname
+ * <li>database.server.id
+ * <li>...
+ * </ul>
+ *
+ * @param key Property name
+ * @param value Property value
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withConnectionProperty(String key, String
value) {
+ checkArgument(key != null, "key can not be null");
+ checkArgument(value != null, "value can not be null");
+ checkArgument(
+ getConnectionProperties().get() != null, "connectionProperties can
not be null");
+
+ ConnectorConfiguration config = builder().build();
+ config.getConnectionProperties().get().putIfAbsent(key, value);
+ return config;
+ }
+
+ /**
+ * Sets the {@link SourceConnector} to be used. It cannot be null.
+ *
+ * @param sourceConnector Any supported connector
+ * @return {@link ConnectorConfiguration}
+ */
+ public ConnectorConfiguration withSourceConnector(SourceConnector
sourceConnector) {
+ checkArgument(sourceConnector != null, "sourceConnector can not be
null");
+ return
withSourceConnector(ValueProvider.StaticValueProvider.of(sourceConnector));
+ }
+
+ public ConnectorConfiguration withSourceConnector(
+ ValueProvider<SourceConnector> sourceConnector) {
+ checkArgument(sourceConnector != null, "sourceConnector can not be
null");
+ return builder().setSourceConnector(sourceConnector).build();
+ }
+
+ /**
+ * Configuration Map Getter.
+ *
+ * @return Configuration Map.
+ */
+ public Map<String, String> getConfigurationMap() {
+ HashMap<String, String> configuration = new HashMap<>();
+
+ configuration.computeIfAbsent(
+ "connector.class", k ->
getConnectorClass().get().getCanonicalName());
+ configuration.computeIfAbsent("database.hostname", k ->
getHostName().get());
+ configuration.computeIfAbsent("database.port", k -> getPort().get());
+ configuration.computeIfAbsent("database.user", k -> getUsername().get());
+ configuration.computeIfAbsent("database.password", k ->
getPassword().get());
+
+ for (Map.Entry<String, String> entry :
getConnectionProperties().get().entrySet()) {
+ configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue());
+ }
+
+ // Set default Database History impl. if not provided
+ configuration.computeIfAbsent(
+ "database.history",
+ k ->
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName());
+
+ String stringProperties = Joiner.on('\n').withKeyValueSeparator(" ->
").join(configuration);
+ LOG.debug("---------------- Connector configuration: {}",
stringProperties);
+
+ return configuration;
+ }
+ }
+}
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
new file mode 100644
index 0000000..c5a5b4f
--- /dev/null
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import io.debezium.document.Document;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.HistoryRecord;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ * <h3>Quick Overview</h3>
+ *
+ * SDF used to process records fetched from supported Debezium Connectors.
+ *
+ * <p>Currently it has a time limiter (see {@link OffsetTracker}) which, if
set, it will stop
+ * automatically after the specified elapsed minutes. Otherwise, it will keep
running until the user
+ * explicitly interrupts it.
+ *
+ * <p>It might be initialized either as:
+ *
+ * <pre>KafkaSourceConsumerFn(connectorClass, SourceRecordMapper)</pre>
+ *
+ * Or with a time limiter:
+ *
+ * <pre>KafkaSourceConsumerFn(connectorClass, SourceRecordMapper,
minutesToRun)</pre>
+ */
+@SuppressWarnings({"nullness"})
+public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSourceConsumerFn.class);
+ public static final String BEAM_INSTANCE_PROPERTY = "beam.parent.instance";
+
+ private final Class<? extends SourceConnector> connectorClass;
+ private final SourceRecordMapper<T> fn;
+
+ private static long minutesToRun = -1;
+ private static Integer maxRecords;
+ private static DateTime startTime;
+ private static final Map<String, RestrictionTracker<OffsetHolder,
Map<String, Object>>>
+ restrictionTrackers = new ConcurrentHashMap<>();
+
+ /**
+ * Initializes the SDF with a time limit.
+ *
+ * @param connectorClass Supported Debezium connector class
+ * @param fn a SourceRecordMapper
+ * @param minutesToRun Maximum time to run (in minutes)
+ */
+ public KafkaSourceConsumerFn(
+ Class<?> connectorClass, SourceRecordMapper<T> fn, long minutesToRun) {
+ this.connectorClass = (Class<? extends SourceConnector>) connectorClass;
+ this.fn = fn;
+ KafkaSourceConsumerFn.minutesToRun = minutesToRun;
+ }
+
+ /**
+ * Initializes the SDF to be run indefinitely.
+ *
+ * @param connectorClass Supported Debezium connector class
+ * @param fn a SourceRecordMapper
+ */
+ public KafkaSourceConsumerFn(
+ Class<?> connectorClass, SourceRecordMapper<T> fn, Integer maxRecords) {
+ this.connectorClass = (Class<? extends SourceConnector>) connectorClass;
+ this.fn = fn;
+ KafkaSourceConsumerFn.maxRecords = maxRecords;
+ }
+
+ @GetInitialRestriction
+ public OffsetHolder getInitialRestriction(@Element Map<String, String>
unused)
+ throws IOException {
+ KafkaSourceConsumerFn.startTime = new DateTime();
+ return new OffsetHolder(null, null, null);
+ }
+
+ @NewTracker
+ public RestrictionTracker<OffsetHolder, Map<String, Object>> newTracker(
+ @Restriction OffsetHolder restriction) {
+ return new OffsetTracker(restriction);
+ }
+
+ @GetRestrictionCoder
+ public Coder<OffsetHolder> getRestrictionCoder() {
+ return SerializableCoder.of(OffsetHolder.class);
+ }
+
+ /**
+ * Process the retrieved element. Currently it just logs the retrieved
record as JSON.
+ *
+ * @param element Record retrieved
+ * @param tracker Restriction Tracker
+ * @param receiver Output Receiver
+ * @return
+ * @throws Exception
+ */
+ @DoFn.ProcessElement
+ public ProcessContinuation process(
+ @Element Map<String, String> element,
+ RestrictionTracker<OffsetHolder, Map<String, Object>> tracker,
+ OutputReceiver<T> receiver)
+ throws Exception {
+ Map<String, String> configuration = new HashMap<>(element);
+
+ // Adding the current restriction to the class object to be found by the
database history
+ restrictionTrackers.put(this.getHashCode(), tracker);
+ configuration.put(BEAM_INSTANCE_PROPERTY, this.getHashCode());
+
+ SourceConnector connector =
connectorClass.getDeclaredConstructor().newInstance();
+ connector.start(configuration);
+
+ SourceTask task = (SourceTask)
connector.taskClass().getDeclaredConstructor().newInstance();
+
+ try {
+ Map<String, ?> consumerOffset = tracker.currentRestriction().offset;
+ LOG.debug("--------- Consumer offset from Debezium Tracker: {}",
consumerOffset);
+
+ task.initialize(new
BeamSourceTaskContext(tracker.currentRestriction().offset));
+ task.start(connector.taskConfigs(1).get(0));
+
+ List<SourceRecord> records = task.poll();
+
+ if (records == null) {
+ LOG.debug("-------- Pulled records null");
+ return ProcessContinuation.stop();
+ }
+
+ LOG.debug("-------- {} records found", records.size());
+ if (!records.isEmpty()) {
+ for (SourceRecord record : records) {
+ LOG.debug("-------- Record found: {}", record);
+
+ Map<String, Object> offset = (Map<String, Object>)
record.sourceOffset();
+
+ if (offset == null || !tracker.tryClaim(offset)) {
+ LOG.debug("-------- Offset null or could not be claimed");
+ return ProcessContinuation.stop();
+ }
+
+ T json = this.fn.mapSourceRecord(record);
+ LOG.debug("****************** RECEIVED SOURCE AS JSON: {}", json);
+
+ receiver.output(json);
+ }
+
+ task.commit();
+ }
+ } catch (Exception ex) {
+ LOG.error(
+ "-------- Error on consumer: {}. with stacktrace: {}",
+ ex.getMessage(),
+ ex.getStackTrace());
+ } finally {
+ restrictionTrackers.remove(this.getHashCode());
+
+ LOG.debug("------- Stopping SourceTask");
+ task.stop();
+ }
+
+ return
ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1));
+ }
+
+ public String getHashCode() {
+ return Integer.toString(System.identityHashCode(this));
+ }
+
+ private static class BeamSourceTaskContext implements SourceTaskContext {
+ private final @Nullable Map<String, ?> initialOffset;
+
+ BeamSourceTaskContext(@Nullable Map<String, ?> initialOffset) {
+ this.initialOffset = initialOffset;
+ }
+
+ @Override
+ public Map<String, String> configs() {
+ // TODO(pabloem): Do we need to implement this?
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ LOG.debug("------------- Creating an offset storage reader");
+ return new DebeziumSourceOffsetStorageReader(initialOffset);
+ }
+ }
+
+ private static class DebeziumSourceOffsetStorageReader implements
OffsetStorageReader {
+ private final Map<String, ?> offset;
+
+ DebeziumSourceOffsetStorageReader(Map<String, ?> initialOffset) {
+ this.offset = initialOffset;
+ }
+
+ @Override
+ public <V> Map<String, Object> offset(Map<String, V> partition) {
+ return offsets(Collections.singletonList(partition))
+ .getOrDefault(partition, ImmutableMap.of());
+ }
+
+ @Override
+ public <T> Map<Map<String, T>, Map<String, Object>> offsets(
+ Collection<Map<String, T>> partitions) {
+ LOG.debug("-------------- GETTING OFFSETS!");
+
+ Map<Map<String, T>, Map<String, Object>> map = new HashMap<>();
+ for (Map<String, T> partition : partitions) {
+ map.put(partition, (Map<String, Object>) offset);
+ }
+
+ LOG.debug("-------------- OFFSETS: {}", map);
+ return map;
+ }
+ }
+
+ static class OffsetHolder implements Serializable {
+ public final @Nullable Map<String, ?> offset;
+ public final @Nullable List<?> history;
+ public final @Nullable Integer fetchedRecords;
+
+ OffsetHolder(
+ @Nullable Map<String, ?> offset,
+ @Nullable List<?> history,
+ @Nullable Integer fetchedRecords) {
+ this.offset = offset;
+ this.history = history == null ? new ArrayList<>() : history;
+ this.fetchedRecords = fetchedRecords;
+ }
+ }
+
+ /** {@link RestrictionTracker} for Debezium connectors. */
+ static class OffsetTracker extends RestrictionTracker<OffsetHolder,
Map<String, Object>> {
+ private OffsetHolder restriction;
+ private static final long MILLIS = 60 * 1000;
+
+ OffsetTracker(OffsetHolder holder) {
+ this.restriction = holder;
+ }
+
+ /**
+ * Overriding {@link #tryClaim} in order to stop fetching records from the
database.
+ *
+ * <p>This works on two different ways:
+ *
+ * <h3>Number of records</h3>
+ *
+ * <p>This is the default behavior. Once the specified number of records
has been reached, it
+ * will stop fetching them.
+ *
+ * <h3>Time based</h3>
+ *
+ * User may specify the amount of time the connector to be kept alive.
Please see {@link
+ * KafkaSourceConsumerFn} for more details on this.
+ *
+ * @param position Currently not used
+ * @return boolean
+ */
+ @Override
+ public boolean tryClaim(Map<String, Object> position) {
+ LOG.debug("-------------- Claiming {} used to have: {}", position,
restriction.offset);
+ long elapsedTime = System.currentTimeMillis() - startTime.getMillis();
+ int fetchedRecords =
+ this.restriction.fetchedRecords == null ? 0 :
this.restriction.fetchedRecords + 1;
+ LOG.debug("-------------- Time running: {} / {}", elapsedTime,
(minutesToRun * MILLIS));
+ this.restriction = new OffsetHolder(position, this.restriction.history,
fetchedRecords);
+ LOG.debug("-------------- History: {}", this.restriction.history);
+
+ if (maxRecords == null && minutesToRun == -1) {
+ return true;
+ }
+
+ if (maxRecords != null) {
+ return fetchedRecords < maxRecords;
+ }
+
+ return elapsedTime < minutesToRun * MILLIS;
+ }
+
+ @Override
+ public OffsetHolder currentRestriction() {
+ return restriction;
+ }
+
+ @Override
+ public SplitResult<OffsetHolder> trySplit(double fractionOfRemainder) {
+ LOG.debug("-------------- Trying to split: fractionOfRemainder={}",
fractionOfRemainder);
+ return SplitResult.of(new OffsetHolder(null, null, null), restriction);
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {}
+
+ @Override
+ public IsBounded isBounded() {
+ return IsBounded.BOUNDED;
+ }
+ }
+
+ public static class DebeziumSDFDatabaseHistory extends
AbstractDatabaseHistory {
+ private List<byte[]> history;
+
+ public DebeziumSDFDatabaseHistory() {
+ this.history = new ArrayList<>();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ LOG.debug(
+ "------------ STARTING THE DATABASE HISTORY! - trackers: {} -
config: {}",
+ restrictionTrackers,
+ config.asMap());
+
+ // We fetch the first key to get the first restriction tracker.
+ // TODO(BEAM-11737): This may cause issues with multiple trackers in the
future.
+ RestrictionTracker<OffsetHolder, ?> tracker =
+
restrictionTrackers.get(restrictionTrackers.keySet().iterator().next());
+ this.history = (List<byte[]>) tracker.currentRestriction().history;
+ }
+
+ @Override
+ protected void storeRecord(HistoryRecord record) throws
DatabaseHistoryException {
+ LOG.debug("------------- Adding history! {}", record);
+
+
history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document()));
+ }
+
+ @Override
+ protected void recoverRecords(Consumer<HistoryRecord> consumer) {
+ LOG.debug("------------- Trying to recover!");
+
+ try {
+ for (byte[] record : history) {
+ Document doc = DocumentReader.defaultReader().read(record);
+ consumer.accept(new HistoryRecord(doc));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean exists() {
+ return history != null && !history.isEmpty();
+ }
+
+ @Override
+ public boolean storageExists() {
+ return history != null && !history.isEmpty();
+ }
+ }
+}
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java
new file mode 100644
index 0000000..10bcb14
--- /dev/null
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.GsonBuilder;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * This class can be used as a mapper for each {@link SourceRecord} retrieved.
+ *
+ * <h3>What it does</h3>
+ *
+ * <p>It maps any SourceRecord retrieved from any supported {@link
io.debezium.connector} to JSON
+ *
+ * <h3>How it works</h3>
+ *
+ * <p>It will extract valuable fields from any given SourceRecord:
+ *
+ * <ul>
+ * <li>before - {@link #loadBefore}
+ * <li>after - {@link #loadAfter}
+ * <li>metadata - {@link #loadMetadata}
+ * <ul>
+ * <li>schema - Database Schema
+ * <li>connector - Connector used
+ * <li>version - Connector version
+ * </ul>
+ * </ul>
+ *
+ * <h3>Usage Example</h3>
+ *
+ * <p>Map each SourceRecord to JSON
+ *
+ * <pre>
+ * DebeziumIO.read()
+ * .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()
+ * </pre>
+ */
+@SuppressWarnings({"nullness"})
+public class SourceRecordJson {
+ private final @Nullable SourceRecord sourceRecord;
+ private final @Nullable Struct value;
+ private final @Nullable Event event;
+
+ /**
+ * Initializer.
+ *
+ * @param sourceRecord retrieved SourceRecord using a supported
SourceConnector
+ */
+ public SourceRecordJson(@Nullable SourceRecord sourceRecord) {
+ if (sourceRecord == null) {
+ throw new IllegalArgumentException();
+ }
+
+ this.sourceRecord = sourceRecord;
+ this.value = (Struct) sourceRecord.value();
+
+ if (this.value == null) {
+ this.event = new Event(null, null, null);
+ } else {
+ Event.Metadata metadata = this.loadMetadata();
+ Event.Before before = this.loadBefore();
+ Event.After after = this.loadAfter();
+
+ this.event = new Event(metadata, before, after);
+ }
+ }
+
+ /**
+ * Extracts metadata from the SourceRecord.
+ *
+ * @return Metadata
+ */
+ private Event.Metadata loadMetadata() {
+ @Nullable Struct source;
+ try {
+ source = (Struct) this.value.get("source");
+ } catch (RuntimeException e) {
+ throw new IllegalArgumentException();
+ }
+ @Nullable String schema;
+
+ if (source == null) {
+ return null;
+ }
+
+ try {
+ // PostgreSQL and SQL server use Schema
+ schema = source.getString("schema");
+ } catch (DataException e) {
+ // MySQL uses file instead
+ schema = source.getString("file");
+ }
+
+ return new Event.Metadata(
+ source.getString("connector"),
+ source.getString("version"),
+ source.getString("name"),
+ source.getString("db"),
+ schema,
+ source.getString("table"));
+ }
+
+ /**
+ * Extracts the before field within SourceRecord.
+ *
+ * @return Before
+ */
+ private Event.Before loadBefore() {
+ @Nullable Struct before;
+ try {
+ before = (Struct) this.value.get("before");
+ } catch (DataException e) {
+ return null;
+ }
+ if (before == null) {
+ return null;
+ }
+
+ Map<String, Object> fields = new HashMap<>();
+ for (Field field : before.schema().fields()) {
+ fields.put(field.name(), before.get(field));
+ }
+
+ return new Event.Before(fields);
+ }
+
+ /**
+ * Extracts the after field within SourceRecord.
+ *
+ * @return After
+ */
+ private Event.After loadAfter() {
+ @Nullable Struct after;
+ try {
+ after = (Struct) this.value.get("after");
+ } catch (DataException e) {
+ return null;
+ }
+ if (after == null) {
+ return null;
+ }
+
+ Map<String, Object> fields = new HashMap<>();
+ for (Field field : after.schema().fields()) {
+ fields.put(field.name(), after.get(field));
+ }
+
+ return new Event.After(fields);
+ }
+
+ /**
+ * Transforms the extracted data to a JSON string.
+ *
+ * @return JSON String
+ */
+ public String toJson() {
+ return this.event.toJson();
+ }
+
+ /** {@link SourceRecordJson} implementation. */
+ public static class SourceRecordJsonMapper implements
SourceRecordMapper<String> {
+ @Override
+ public String mapSourceRecord(SourceRecord sourceRecord) throws Exception {
+ return new SourceRecordJson(sourceRecord).toJson();
+ }
+ }
+
+ /** Depicts a SourceRecord as an Event in order for it to be mapped as JSON.
*/
+ static class Event implements Serializable {
+ private final SourceRecordJson.Event.Metadata metadata;
+ private final SourceRecordJson.Event.Before before;
+ private final SourceRecordJson.Event.After after;
+
+ /**
+ * Event Initializer.
+ *
+ * @param metadata Metadata retrieved from SourceRecord
+ * @param before Before data retrieved from SourceRecord
+ * @param after After data retrieved from SourceRecord
+ */
+ public Event(
+ SourceRecordJson.Event.Metadata metadata,
+ SourceRecordJson.Event.Before before,
+ SourceRecordJson.Event.After after) {
+ this.metadata = metadata;
+ this.before = before;
+ this.after = after;
+ }
+
+ /**
+ * Transforms the Event to a JSON string.
+ *
+ * @return JSON String
+ */
+ public String toJson() {
+ Gson gson = new GsonBuilder().serializeNulls().create();
+ return gson.toJson(this);
+ }
+
+ /** Depicts the metadata within a SourceRecord. It has valuable fields. */
+ static class Metadata implements Serializable {
+ private final @Nullable String connector;
+ private final @Nullable String version;
+ private final @Nullable String name;
+ private final @Nullable String database;
+ private final @Nullable String schema;
+ private final @Nullable String table;
+
+ /**
+ * Metadata Initializer.
+ *
+ * @param connector Connector used
+ * @param version Connector version
+ * @param name Connector name
+ * @param database DB name
+ * @param schema Schema name
+ * @param table Table name
+ */
+ public Metadata(
+ @Nullable String connector,
+ @Nullable String version,
+ @Nullable String name,
+ @Nullable String database,
+ @Nullable String schema,
+ @Nullable String table) {
+ this.connector = connector;
+ this.version = version;
+ this.name = name;
+ this.database = database;
+ this.schema = schema;
+ this.table = table;
+ }
+ }
+
+ /** Depicts the before field within SourceRecord. */
+ static class Before implements Serializable {
+ private final @Nullable Map<String, Object> fields;
+
+ /**
+ * Before Initializer.
+ *
+ * @param fields Key - Value map with information within Before
+ */
+ public Before(@Nullable Map<String, Object> fields) {
+ this.fields = fields;
+ }
+ }
+
+ /** Depicts the after field within SourceRecord. */
+ static class After implements Serializable {
+ private final @Nullable Map<String, Object> fields;
+
+ /**
+ * After Initializer.
+ *
+ * @param fields Key - Value map with information within After
+ */
+ public After(@Nullable Map<String, Object> fields) {
+ this.fields = fields;
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java
new file mode 100644
index 0000000..65e42e9
--- /dev/null
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Interface used to map a Kafka source record.
+ *
+ * @param <T> The desired type you want to map the Kafka source record
+ */
+@FunctionalInterface
+public interface SourceRecordMapper<T> extends Serializable {
+ T mapSourceRecord(SourceRecord sourceRecord) throws Exception;
+}
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java
new file mode 100644
index 0000000..86ba1f5
--- /dev/null
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading from DebeziumIO.
+ *
+ * @see org.apache.beam.io.debezium.DebeziumIO
+ */
+@Experimental(Kind.SOURCE_SINK)
+package org.apache.beam.io.debezium;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
new file mode 100644
index 0000000..6056ca0
--- /dev/null
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import io.debezium.connector.mysql.MySqlConnector;
+import java.time.Duration;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class DebeziumIOMySqlConnectorIT {
+ /**
+ * Debezium - MySqlContainer
+ *
+ * <p>Creates a docker container using the image used by the debezium
tutorial.
+ */
+ @ClassRule
+ public static final MySQLContainer<?> MY_SQL_CONTAINER =
+ new MySQLContainer<>(
+ DockerImageName.parse("debezium/example-mysql:1.4")
+ .asCompatibleSubstituteFor("mysql"))
+ .withPassword("debezium")
+ .withUsername("mysqluser")
+ .withExposedPorts(3306)
+ .waitingFor(
+ new HttpWaitStrategy()
+ .forPort(3306)
+ .forStatusCodeMatching(response -> response == 200)
+ .withStartupTimeout(Duration.ofMinutes(2)));
+
+ /**
+ * Debezium - MySQL connector Test.
+ *
+ * <p>Tests that connector can actually connect to the database
+ */
+ @Test
+ public void testDebeziumIOMySql() {
+ MY_SQL_CONTAINER.start();
+
+ String host = MY_SQL_CONTAINER.getContainerIpAddress();
+ String port = MY_SQL_CONTAINER.getMappedPort(3306).toString();
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ Pipeline p = Pipeline.create(options);
+ PCollection<String> results =
+ p.apply(
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(
+ DebeziumIO.ConnectorConfiguration.create()
+ .withUsername("debezium")
+ .withPassword("dbz")
+ .withConnectorClass(MySqlConnector.class)
+ .withHostName(host)
+ .withPort(port)
+ .withConnectionProperty("database.server.id", "184054")
+ .withConnectionProperty("database.server.name",
"dbserver1")
+ .withConnectionProperty("database.include.list",
"inventory")
+ .withConnectionProperty("include.schema.changes",
"false"))
+ .withFormatFunction(new
SourceRecordJson.SourceRecordJsonMapper())
+ .withMaxNumberOfRecords(30)
+ .withCoder(StringUtf8Coder.of()));
+ String expected =
+
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
+ +
"\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,"
+ + "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\","
+ + "\"street\":\"3183 Moore
Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,"
+ + "\"type\":\"SHIPPING\"}}}";
+
+ PAssert.that(results)
+ .satisfies(
+ (Iterable<String> res) -> {
+ assertThat(res, hasItem(expected));
+ return null;
+ });
+
+ p.run().waitUntilFinish();
+ MY_SQL_CONTAINER.stop();
+ }
+}
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
new file mode 100644
index 0000000..ccf57b6
--- /dev/null
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnector;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.io.debezium.DebeziumIO.ConnectorConfiguration;
+import org.apache.kafka.common.config.ConfigValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test on the DebeziumIO. */
+@RunWith(JUnit4.class)
+public class DebeziumIOTest implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(DebeziumIOTest.class);
+ private static final ConnectorConfiguration MYSQL_CONNECTOR_CONFIGURATION =
+ ConnectorConfiguration.create()
+ .withUsername("debezium")
+ .withPassword("dbz")
+ .withHostName("127.0.0.1")
+ .withPort("3306")
+ .withConnectorClass(MySqlConnector.class)
+ .withConnectionProperty("database.server.id", "184054")
+ .withConnectionProperty("database.server.name", "dbserver1")
+ .withConnectionProperty("database.include.list", "inventory")
+ .withConnectionProperty(
+ "database.history",
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName())
+ .withConnectionProperty("include.schema.changes", "false");
+
+ @Test
+ public void testSourceMySqlConnectorValidConfiguration() {
+ Map<String, String> configurationMap =
MYSQL_CONNECTOR_CONFIGURATION.getConfigurationMap();
+
+ Configuration debeziumConf = Configuration.from(configurationMap);
+ Map<String, ConfigValue> validConfig =
debeziumConf.validate(MySqlConnectorConfig.ALL_FIELDS);
+
+ for (ConfigValue configValue : validConfig.values()) {
+ assertTrue(configValue.errorMessages().isEmpty());
+ }
+ }
+
+ @Test
+ public void testSourceConnectorUsernamePassword() {
+ String username = "debezium";
+ String password = "dbz";
+ ConnectorConfiguration configuration =
+
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password);
+ Map<String, String> configurationMap = configuration.getConfigurationMap();
+
+ Configuration debeziumConf = Configuration.from(configurationMap);
+ Map<String, ConfigValue> validConfig =
debeziumConf.validate(MySqlConnectorConfig.ALL_FIELDS);
+
+ for (ConfigValue configValue : validConfig.values()) {
+ assertTrue(configValue.errorMessages().isEmpty());
+ }
+ }
+
+ @Test
+ public void testSourceConnectorNullPassword() {
+ String username = "debezium";
+ String password = null;
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password));
+ }
+
+ @Test
+ public void testSourceConnectorNullUsernameAndPassword() {
+ String username = null;
+ String password = null;
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password));
+ }
+}
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java
new file mode 100644
index 0000000..c22f8a3
--- /dev/null
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class KafkaSourceConsumerFnTest implements Serializable {
+ @Test
+ public void testKafkaSourceConsumerFn() {
+ Map<String, String> config =
+ ImmutableMap.of(
+ "from", "1",
+ "to", "10",
+ "delay", "0.4",
+ "topic", "any");
+
+ Pipeline pipeline = Pipeline.create();
+
+ PCollection<Integer> counts =
+ pipeline
+ .apply(
+ Create.of(Lists.newArrayList(config))
+ .withCoder(MapCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of())))
+ .apply(
+ ParDo.of(
+ new KafkaSourceConsumerFn<>(
+ CounterSourceConnector.class,
+ sourceRecord -> (Integer) sourceRecord.value(),
+ 10)))
+ .setCoder(VarIntCoder.of());
+
+ PAssert.that(counts).containsInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testStoppableKafkaSourceConsumerFn() {
+ Map<String, String> config =
+ ImmutableMap.of(
+ "from", "1",
+ "to", "3",
+ "delay", "0.2",
+ "topic", "any");
+
+ Pipeline pipeline = Pipeline.create();
+
+ PCollection<Integer> counts =
+ pipeline
+ .apply(
+ Create.of(Lists.newArrayList(config))
+ .withCoder(MapCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of())))
+ .apply(
+ ParDo.of(
+ new KafkaSourceConsumerFn<>(
+ CounterSourceConnector.class,
+ sourceRecord -> (Integer) sourceRecord.value(),
+ 1)))
+ .setCoder(VarIntCoder.of());
+
+ pipeline.run().waitUntilFinish();
+ Assert.assertEquals(3, CounterTask.getCountTasks());
+ }
+}
+
+class CounterSourceConnector extends SourceConnector {
+ public static class CounterSourceConnectorConfig extends AbstractConfig {
+ final Map<String, String> props;
+
+ CounterSourceConnectorConfig(Map<String, String> props) {
+ super(configDef(), props);
+ this.props = props;
+ }
+
+ protected static ConfigDef configDef() {
+ return new ConfigDef()
+ .define("from", ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
"Number to start from")
+ .define("to", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "Number
to go to")
+ .define(
+ "delay", ConfigDef.Type.DOUBLE, ConfigDef.Importance.HIGH, "Time
between each event")
+ .define(
+ "topic",
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ "Name of Kafka topic to produce to");
+ }
+ }
+
+ @Nullable private CounterSourceConnectorConfig connectorConfig;
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.connectorConfig = new CounterSourceConnectorConfig(props);
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return CounterTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ if (this.connectorConfig == null || this.connectorConfig.props == null) {
+ return Collections.emptyList();
+ }
+
+ return Collections.singletonList(
+ ImmutableMap.of(
+ "from", this.connectorConfig.props.get("from"),
+ "to", this.connectorConfig.props.get("to"),
+ "delay", this.connectorConfig.props.get("delay"),
+ "topic", this.connectorConfig.props.get("topic")));
+ }
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public ConfigDef config() {
+ return CounterSourceConnectorConfig.configDef();
+ }
+
+ @Override
+ public String version() {
+ return "ONE";
+ }
+}
+
+class CounterTask extends SourceTask {
+ private static int countStopTasks = 0;
+ private String topic = "";
+ private Integer from = 0;
+ private Integer to = 0;
+ private Double delay = 0.0;
+
+ private Long start = System.currentTimeMillis();
+ private Integer last = 0;
+ private Object lastOffset = null;
+
+ private static final String PARTITION_FIELD = "mod";
+ private static final Integer PARTITION_NAME = 1;
+
+ @Override
+ public String version() {
+ return "ONE";
+ }
+
+ @Override
+ public void initialize(SourceTaskContext context) {
+ super.initialize(context);
+
+ Map<String, Object> offset =
+ context
+ .offsetStorageReader()
+ .offset(Collections.singletonMap(PARTITION_FIELD, PARTITION_NAME));
+
+ if (offset == null) {
+ this.start = System.currentTimeMillis();
+ this.last = 0;
+ } else {
+ this.start = (Long) offset.get("start");
+ this.last = ((Long) offset.getOrDefault("last", 0)).intValue();
+ }
+ this.lastOffset = offset;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.topic = props.getOrDefault("topic", "");
+ this.from = Integer.parseInt(props.getOrDefault("from", "0"));
+ this.to = Integer.parseInt(props.getOrDefault("to", "0"));
+ this.delay = Double.parseDouble(props.getOrDefault("delay", "0"));
+
+ if (this.lastOffset != null) {
+ return;
+ }
+
+ this.start =
+ props.containsKey("start")
+ ? Long.parseLong(props.get("start"))
+ : System.currentTimeMillis();
+ this.last = this.from - 1;
+ }
+
+ @Override
+ public List<SourceRecord> poll() throws InterruptedException {
+ if (this.last.equals(to)) {
+ return null;
+ }
+
+ List<SourceRecord> records = new ArrayList<>();
+ Long callTime = System.currentTimeMillis();
+ Long secondsSinceStart = (callTime - this.start) / 1000;
+ Long recordsToOutput = Math.round(Math.floor(secondsSinceStart /
this.delay));
+
+ while (this.last < this.to) {
+ this.last = this.last + 1;
+ Map<String, Integer> sourcePartition =
Collections.singletonMap(PARTITION_FIELD, 1);
+ Map<String, Long> sourceOffset =
+ ImmutableMap.of("last", this.last.longValue(), "start", this.start);
+
+ records.add(
+ new SourceRecord(
+ sourcePartition, sourceOffset, this.topic, Schema.INT64_SCHEMA,
this.last));
+
+ if (records.size() >= recordsToOutput) {
+ break;
+ }
+ }
+
+ return records;
+ }
+
+ @Override
+ public void stop() {
+ CounterTask.countStopTasks++;
+ }
+
+ public static int getCountTasks() {
+ return CounterTask.countStopTasks;
+ }
+}
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java
new file mode 100644
index 0000000..b8a6c9b
--- /dev/null
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.debezium.connector.mysql.MySqlConnector;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OffsetTrackerTest implements Serializable {
+ @Test
+ public void testRestrictByNumberOfRecords() throws IOException {
+ Integer maxNumRecords = 10;
+ Map<String, Object> position = new HashMap<>();
+ KafkaSourceConsumerFn<String> kafkaSourceConsumerFn =
+ new KafkaSourceConsumerFn<String>(
+ MySqlConnector.class, new
SourceRecordJson.SourceRecordJsonMapper(), maxNumRecords);
+ KafkaSourceConsumerFn.OffsetHolder restriction =
+ kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>());
+ KafkaSourceConsumerFn.OffsetTracker tracker =
+ new KafkaSourceConsumerFn.OffsetTracker(restriction);
+
+ for (int records = 0; records < maxNumRecords; records++) {
+ assertTrue("OffsetTracker should continue", tracker.tryClaim(position));
+ }
+ assertFalse("OffsetTracker should stop", tracker.tryClaim(position));
+ }
+
+ @Test
+ public void testRestrictByAmountOfTime() throws IOException,
InterruptedException {
+ long millis = 60 * 1000;
+ long minutesToRun = 1;
+ Map<String, Object> position = new HashMap<>();
+ KafkaSourceConsumerFn<String> kafkaSourceConsumerFn =
+ new KafkaSourceConsumerFn<String>(
+ MySqlConnector.class, new
SourceRecordJson.SourceRecordJsonMapper(), minutesToRun);
+ KafkaSourceConsumerFn.OffsetHolder restriction =
+ kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>());
+ KafkaSourceConsumerFn.OffsetTracker tracker =
+ new KafkaSourceConsumerFn.OffsetTracker(restriction);
+
+ assertTrue(tracker.tryClaim(position));
+
+ Thread.sleep(minutesToRun * millis + 100);
+
+ assertFalse(tracker.tryClaim(position));
+ }
+}
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
new file mode 100644
index 0000000..badd01e
--- /dev/null
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.Serializable;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SourceRecordJsonTest implements Serializable {
+ @Test
+ public void testSourceRecordJson() {
+ SourceRecord record = this.buildSourceRecord();
+ SourceRecordJson json = new SourceRecordJson(record);
+
+ String jsonString = json.toJson();
+
+ String expectedJson =
+ "{\"metadata\":"
+ + "{\"connector\":\"test-connector\","
+ + "\"version\":\"version-connector\","
+ + "\"name\":\"test-connector-sql\","
+ + "\"database\":\"test-db\","
+ + "\"schema\":\"test-schema\","
+ + "\"table\":\"test-table\"},"
+ + "\"before\":{\"fields\":{\"column1\":\"before-name\"}},"
+ + "\"after\":{\"fields\":{\"column1\":\"after-name\"}}}";
+
+ assertEquals(expectedJson, jsonString);
+ }
+
+ @Test
+ public void testSourceRecordJsonWhenSourceRecordIsNull() {
+ assertThrows(IllegalArgumentException.class, () -> new
SourceRecordJson(null));
+ }
+
+ private Schema buildSourceSchema() {
+ return SchemaBuilder.struct()
+ .field("connector", Schema.STRING_SCHEMA)
+ .field("version", Schema.STRING_SCHEMA)
+ .field("name", Schema.STRING_SCHEMA)
+ .field("db", Schema.STRING_SCHEMA)
+ .field("schema", Schema.STRING_SCHEMA)
+ .field("table", Schema.STRING_SCHEMA)
+ .build();
+ }
+
+ private Schema buildBeforeSchema() {
+ return SchemaBuilder.struct().field("column1",
Schema.STRING_SCHEMA).build();
+ }
+
+ private Schema buildAfterSchema() {
+ return SchemaBuilder.struct().field("column1",
Schema.STRING_SCHEMA).build();
+ }
+
+ private SourceRecord buildSourceRecord() {
+ final Schema sourceSchema = this.buildSourceSchema();
+ final Schema beforeSchema = this.buildBeforeSchema();
+ final Schema afterSchema = this.buildAfterSchema();
+
+ final Schema schema =
+ SchemaBuilder.struct()
+ .name("test")
+ .field("source", sourceSchema)
+ .field("before", beforeSchema)
+ .field("after", afterSchema)
+ .build();
+
+ final Struct source = new Struct(sourceSchema);
+ final Struct before = new Struct(beforeSchema);
+ final Struct after = new Struct(afterSchema);
+ final Struct value = new Struct(schema);
+
+ source.put("connector", "test-connector");
+ source.put("version", "version-connector");
+ source.put("name", "test-connector-sql");
+ source.put("db", "test-db");
+ source.put("schema", "test-schema");
+ source.put("table", "test-table");
+
+ before.put("column1", "before-name");
+ after.put("column1", "after-name");
+
+ value.put("source", source);
+ value.put("before", before);
+ value.put("after", after);
+
+ return new SourceRecord(null, null, null, schema, value);
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index e6f037e..c01df18 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -140,6 +140,7 @@ include(":sdks:java:io:cassandra")
include(":sdks:java:io:clickhouse")
include(":sdks:java:io:common")
include(":sdks:java:io:contextualtextio")
+include(":sdks:java:io:debezium")
include(":sdks:java:io:elasticsearch")
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-2")
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-5")