This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5d258  Beam transform that uses DebeziumIO connector to support CDC
     new 32a0ff1  Merge pull request #13983 from [BEAM-11818] DebeziumIO 
connector to support CDC
ac5d258 is described below

commit ac5d258dcc598a2ef13f0609debefae38530d14a
Author: Juan Sandoval <[email protected]>
AuthorDate: Tue Feb 16 20:01:40 2021 -0800

    Beam transform that uses DebeziumIO connector to support CDC
    
    Debeziumio PoC (#7)
    
    * New DebeziumIO class.
    
    * Merge connector code
    
    * DebeziumIO and MySqlConnector integrated.
    
    * Added FormatFuntion param to Read builder on DebeziumIO.
    
    * Added arguments checker to DebeziumIO.
    
    * Add simple JSON mapper object (#1)
    
    * Add simple JSON mapper object
    
    * Fixed Mapper.
    
    * Add SqlServer connector test
    
    * Added PostgreSql Connector Test
    
    PostgreSql now works with Json mapper
    
    * Added PostgreSql Connector Test
    
    PostgreSql now works with Json mapper
    
    * Fixing MySQL schema DataException
    
    Using file instead of schema should fix it
    
    * MySQL Connector updated from 1.3.0 to 1.3.1
    
    Co-authored-by: osvaldo-salinas <[email protected]>
    Co-authored-by: Carlos Dominguez <[email protected]>
    Co-authored-by: Carlos Domínguez <[email protected]>
    
    * Add debeziumio tests
    
    * Debeziumio testing json mapper (#3)
    
    * Some code refactors. Use a default DBHistory if not provided
    
    * Add basic tests for Json mapper
    
    * Debeziumio time restriction (#5)
    
    * Add simple JSON mapper object
    
    * Fixed Mapper.
    
    * Add SqlServer connector test
    
    * Added PostgreSql Connector Test
    
    PostgreSql now works with Json mapper
    
    * Added PostgreSql Connector Test
    
    PostgreSql now works with Json mapper
    
    * Fixing MySQL schema DataException
    
    Using file instead of schema should fix it
    
    * MySQL Connector updated from 1.3.0 to 1.3.1
    
    * Some code refactors. Use a default DBHistory if not provided
    
    * Adding based-time restriction
    
    Stop polling after specified amount of time
    
    * Add basic tests for Json mapper
    
    * Adding new restriction
    
    Uses a time-based restriction
    
    * Adding optional restrcition
    
    Uses an optional time-based restriction
    
    Co-authored-by: juanitodread <[email protected]>
    Co-authored-by: osvaldo-salinas <[email protected]>
    
    * Upgrade DebeziumIO connector (#4)
    
    * Address comments (Change dependencies to testCompile, Set 
JsonMapper/Coder as default, refactors) (#8)
    
    * Revert file
    
    * Change dependencies to testCompile
    * Move Counter sample to unit test
    
    * Set JsonMapper as default mapper function
    * Set String Coder as default coder when using JsonMapper
    * Change logs from info to debug
    
    * Debeziumio javadoc (#9)
    
    * Adding javadoc
    
    * Added some titles and examples
    
    * Added SourceRecordJson doc
    
    * Added Basic Connector doc
    
    * Added KafkaSourceConsumer doc
    
    * Javadoc cleanup
    
    * Removing BasicConnector
    
    No usages of this class were found overall
    
    * Editing documentation
    
    * Debeziumio fetched records restriction (#10)
    
    * Adding javadoc
    
    * Adding restriction by number of fetched records
    
    Also adding a quick-fix for null value within SourceRecords
    Minor fix on both MySQL and PostgreSQL Connectors Tests
    
    * Run either by time or by number of records
    
    * Added DebeziumOffsetTrackerTest
    
    Tests both restrictions: By amount of time and by Number of records
    
    * Removing comment
    
    * DebeziumIO test for DB2. (#11)
    
    * DebeziumIO test for DB2.
    
    * DebeziumIO javadoc.
    
    * Clean code:removed commented code lines on DebeziumIOConnectorTest.java
    
    * Clean code:removing unused imports and using readAsJson().
    
    Co-authored-by: Carlos Domínguez 
<[email protected]>
    
    * Debezium limit records (now configurable) (#12)
    
    * Adding javadoc
    
    * Records Limit is now configurable
    
    (It was fixed before)
    
    * Debeziumio dockerize (#13)
    
    * Add mysql docker container to tests
    
    * Move debezium mysql integration test to its own file
    
    * Add assertion to verify that the results contains a record.
    
    * Debeziumio readme (#15)
    
    * Adding javadoc
    
    * Adding README file
    
    * Add number of records configuration to the DebeziumIO component (#16)
    
    * Code refactors (#17)
    
    * Remove/ignore null warnings
    
    * Remove DB2 code
    
    * Remove docker dependency in DebeziumIO unit test and max number of recods 
to MySql integration test
    
    * Change access modifiers accordingly
    
    * Remove incomplete integration tests (Postgres and SqlServer)
    
    * Add experimenal tag
    
    * Debezium testing stoppable consumer (#18)
    
    * Add try-catch-finally, stop SourceTask at finally.
    
    * Fix warnings
    
    * stopConsumer and processedRecords local variables removed. UT for task 
stop use case added
    
    * Fix minor code style issue
    
    Co-authored-by: juanitodread <[email protected]>
    
    * Fix style issues (check, spotlessApply) (#19)
    
    Co-authored-by: Osvaldo Salinas <[email protected]>
    Co-authored-by: alejandro.maguey <[email protected]>
    Co-authored-by: osvaldo-salinas <[email protected]>
    Co-authored-by: Carlos Dominguez <[email protected]>
    Co-authored-by: Carlos Domínguez <[email protected]>
    Co-authored-by: Carlos Domínguez 
<[email protected]>
    Co-authored-by: Alejandro Maguey <[email protected]>
    Co-authored-by: Hassan Reyes <[email protected]>
    
    Add missing apache license to README.md
    
    Enabling integration test for DebeziumIO (#20)
    
    Rename connector package cdc=>debezium. Update doc references (#21)
    
    Fix code style on DebeziumIOMySqlConnectorIT
---
 build.gradle.kts                                   |   1 +
 sdks/java/io/debezium/build.gradle                 |  83 ++++
 sdks/java/io/debezium/src/README.md                | 178 +++++++
 .../org/apache/beam/io/debezium/DebeziumIO.java    | 515 +++++++++++++++++++++
 .../beam/io/debezium/KafkaSourceConsumerFn.java    | 394 ++++++++++++++++
 .../apache/beam/io/debezium/SourceRecordJson.java  | 287 ++++++++++++
 .../beam/io/debezium/SourceRecordMapper.java       |  31 ++
 .../org/apache/beam/io/debezium/package-info.java  |  28 ++
 .../io/debezium/DebeziumIOMySqlConnectorIT.java    | 108 +++++
 .../apache/beam/io/debezium/DebeziumIOTest.java    | 101 ++++
 .../io/debezium/KafkaSourceConsumerFnTest.java     | 264 +++++++++++
 .../apache/beam/io/debezium/OffsetTrackerTest.java |  71 +++
 .../beam/io/debezium/SourceRecordJsonTest.java     | 113 +++++
 settings.gradle.kts                                |   1 +
 14 files changed, 2175 insertions(+)

diff --git a/build.gradle.kts b/build.gradle.kts
index c2af480..5b51c77 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -163,6 +163,7 @@ task("javaPostCommit") {
   dependsOn(":runners:google-cloud-dataflow-java:postCommitRunnerV2")
   dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit")
   dependsOn(":sdks:java:extensions:zetasketch:postCommit")
+  dependsOn(":sdks:java:io:debezium:integrationTest")
   dependsOn(":sdks:java:io:google-cloud-platform:postCommit")
   dependsOn(":sdks:java:io:kinesis:integrationTest")
   dependsOn(":sdks:java:extensions:ml:postCommit")
diff --git a/sdks/java/io/debezium/build.gradle 
b/sdks/java/io/debezium/build.gradle
new file mode 100644
index 0000000..3c43f98
--- /dev/null
+++ b/sdks/java/io/debezium/build.gradle
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.io.debezium',
+        mavenRepositories: [
+                [id: 'io.confluent', url: 
'https://packages.confluent.io/maven/']
+        ],
+        checkerTooSlowOnTests: true,
+        enableSpotbugs: false,
+)
+provideIntegrationTestingDependencies()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Debezium"
+ext.summary = "Library to work with Debezium data."
+
+dependencies {
+    compile library.java.vendored_guava_26_0_jre
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    compile library.java.slf4j_api
+    compile library.java.joda_time
+    provided library.java.jackson_dataformat_csv
+    testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
+    testCompile project(path: ":sdks:java:io:common", configuration: 
"testRuntime")
+
+    // Test dependencies
+    testCompile library.java.junit
+    testRuntimeOnly library.java.slf4j_jdk14
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
+    testCompile project(":runners:google-cloud-dataflow-java")
+    testCompile "org.testcontainers:testcontainers:1.15.1"
+    testCompile "org.testcontainers:mysql:1.15.1"
+
+    // Kafka connect dependencies
+    compile "org.apache.kafka:connect-api:2.5.0"
+    compile "org.apache.kafka:connect-json:2.5.0"
+
+    // Debezium dependencies
+    compile group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final'
+    testCompile group: 'io.debezium', name: 'debezium-connector-mysql', 
version: '1.3.1.Final'
+}
+
+test {
+    testLogging {
+        outputs.upToDateWhen {false}
+        showStandardStreams = true
+    }
+}
+
+
+task integrationTest(type: Test, dependsOn: processTestResources) {
+  group = "Verification"
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+          "--runner=DirectRunner",
+  ])
+
+  // Disable Gradle cache: these ITs interact with live service that should 
always be considered "out of date"
+  outputs.upToDateWhen { false }
+
+  include '**/*IT.class'
+  classpath = sourceSets.test.runtimeClasspath
+  testClassesDirs = sourceSets.test.output.classesDirs
+
+  useJUnit {
+  }
+}
diff --git a/sdks/java/io/debezium/src/README.md 
b/sdks/java/io/debezium/src/README.md
new file mode 100644
index 0000000..4cf9be8
--- /dev/null
+++ b/sdks/java/io/debezium/src/README.md
@@ -0,0 +1,178 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# DebeziumIO
+## Connect your Debezium Databases to Apache Beam easily.
+
+### What is DebeziumIO?
+DebeziumIO is an Apache Beam connector that lets users connect their 
Events-Driven Databases on [Debezium](https://debezium.io) to [Apache 
Beam](https://beam.apache.org/) without the need to set up a 
[Kafka](https://kafka.apache.org/) instance.
+
+### Getting Started
+
+DebeziumIO uses [Debezium Connectors 
v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect 
to Apache Beam. All you need to do is choose the Debezium Connector that suits 
your Debezium setup and pick a [Serializable 
Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html),
 then you will be able to connect to Apache Beam and start building your own 
Pipelines.
+
+These connectors have been successfully tested and are known to work fine:
+*  MySQL Connector
+*  PostgreSQL Connector
+*  SQLServer Connector
+*  DB2 Connector
+
+Other connectors might also work.
+
+
+Setting up a connector and running a Pipeline should be as simple as:
+```
+Pipeline p = Pipeline.create();                   // Create a Pipeline
+        p.apply(DebeziumIO.<String>read()
+                .withConnectorConfiguration(...)  // Debezium Connector setup
+                .withFormatFunction(...)          // Serializable Function to 
use
+        ).setCoder(StringUtf8Coder.of());
+p.run().waitUntilFinish();                        // Run your pipeline!
+```
+
+### Setting up a Debezium Connector
+
+DebeziumIO comes with a handy ConnectorConfiguration builder, which lets you 
provide all the configuration needed to access your Debezium Database.
+
+A basic configuration such as **username**, **password**, **port number**, and 
**host name** must be specified along with the **Debezium Connector class** you 
will use by using these methods:
+
+|Method|Param|Description|
+|-|-|-|
+|`.withConnectorClass(connectorClass)`|_Class_|Debezium Connector|
+|`.withUsername(username)`|_String_|Database Username|
+|`.withPassword(password)`|_String_|Database Password|
+|`.withHostName(hostname)`|_String_|Database Hostname|
+|`.withPort(portNumber)`|_String_|Database Port number|
+
+You can also add more configuration, such as Connector-specific Properties 
with the `_withConnectionProperty_` method:
+
+|Method|Params|Description|
+|-|-|-|
+|`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a 
custom property to the connector.|
+> **Note:** For more information on custom properties, see your [Debezium 
Connector](https://debezium.io/documentation/reference/1.3/connectors/) 
specific documentation.
+
+Example of a MySQL Debezium Connector setup:
+```
+DebeziumIO.ConnectorConfiguration.create()
+        .withUsername("dbUsername")
+        .withPassword("dbPassword")
+        .withConnectorClass(MySqlConnector.class)
+        .withHostName("127.0.0.1")
+        .withPort("3306")
+        .withConnectionProperty("database.server.id", "serverId")
+        .withConnectionProperty("database.server.name", "serverName")
+        .withConnectionProperty("database.include.list", "dbName")
+        .withConnectionProperty("include.schema.changes", "false")
+```
+
+### Setting a Serializable Function
+
+A serializable function is required to depict each `SourceRecord` fetched from 
the Database.
+
+DebeziumIO comes with a built-in JSON Mapper that you can optionally use to 
map every `SourceRecord` fetched from the Database to a JSON object. This helps 
users visualize and access their data in a simple way.
+
+If you want to use this built-in JSON Mapper, you can do it by setting an 
instance of **SourceRecordJsonMapper** as a Serializable Function to the 
DebeziumIO:
+```
+.withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
+```
+> **Note:** `SourceRecordJsonMapper`comes out of the box, but you may use any 
Format Function you prefer.
+
+## Quick Example
+
+The following example is how an actual setup would look like using a **MySQL 
Debezium Connector** and **SourceRecordJsonMapper** as the Serializable 
Function.
+```
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+p.apply(DebeziumIO.<String>read().
+        withConnectorConfiguration(                     // Debezium Connector 
setup
+                DebeziumIO.ConnectorConfiguration.create()
+                        .withUsername("debezium")
+                        .withPassword("dbz")
+                        .withConnectorClass(MySqlConnector.class)
+                        .withHostName("127.0.0.1")
+                        .withPort("3306")
+                        .withConnectionProperty("database.server.id", "184054")
+                        .withConnectionProperty("database.server.name", 
"dbserver1")
+                        .withConnectionProperty("database.include.list", 
"inventory")
+                        .withConnectionProperty("include.schema.changes", 
"false")
+        ).withFormatFunction(
+                new SourceRecordJson.SourceRecordJsonMapper() // Serializable 
Function
+        )
+).setCoder(StringUtf8Coder.of());
+
+p.run().waitUntilFinish();
+```
+
+## Shortcut!
+
+If you will be using the built-in **SourceRecordJsonMapper** as your 
Serializable Function for all your pipelines, you should use **readAsJson()**.
+
+DebeziumIO comes with a method called `readAsJson`, which automatically sets 
the `SourceRecordJsonMapper` as the Serializable Function for your pipeline. 
This way, you would need to setup your connector before running your pipeline, 
without explicitly setting a Serializable Function.
+
+Example of using **readAsJson**:
+```
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+p.apply(DebeziumIO.<String>read().
+        withConnectorConfiguration(                     // Debezium Connector 
setup
+                DebeziumIO.ConnectorConfiguration.create()
+                        .withUsername("debezium")
+                        .withPassword("dbz")
+                        .withConnectorClass(MySqlConnector.class)
+                        .withHostName("127.0.0.1")
+                        .withPort("3306")
+                        .withConnectionProperty("database.server.id", "184054")
+                        .withConnectionProperty("database.server.name", 
"dbserver1")
+                        .withConnectionProperty("database.include.list", 
"inventory")
+                        .withConnectionProperty("include.schema.changes", 
"false"));
+
+p.run().waitUntilFinish();
+```
+
+## Under the hood
+
+### KafkaSourceConsumerFn and Restrictions
+
+KafkaSourceConsumerFn (KSC onwards) is a 
[DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html)
 in charge of the Database replication and CDC.
+
+There are two ways of initializing KSC:
+*  Restricted by number of records
+*  Restricted by amount of time (minutes)
+
+By default, DebeziumIO initializes it with the former, though user may choose 
the latter by setting the amount of minutes as a parameter:
+
+|Function|Param|Description|
+|-|-|-|
+|`KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)`|_Class, 
SourceRecordMapper, Int_|Restrict run by number of records (Default).|
+|`KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)`|_Class, 
SourceRecordMapper, Long_|Restrict run by amount of time (in minutes).|
+
+### Requirements and Supported versions
+
+-  JDK v8
+-  Debezium Connectors v1.3
+-  Apache Beam 2.25
+
+## Running Unit Tests
+
+You can run Integration Tests using **gradlew**.
+
+Example of running the MySQL Connector Integration Test:
+```
+./gradlew integrationTest -p sdks/java/io/debezium/ --tests 
org.apache.beam.io.debezium.DebeziumIOMySqlConnectorIT 
-DintegrationTestRunner=direct
+```
\ No newline at end of file
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
new file mode 100644
index 0000000..b7c084f
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class which exposes an implementation {@link #read} and a Debezium 
configuration.
+ *
+ * <h3>Quick Overview</h3>
+ *
+ * <p>This class lets Beam users connect to their existing Debezium 
implementations in an easy way.
+ *
+ * <p>Any Kafka connector supported by Debezium should work fine with this IO.
+ *
+ * <p>The following connectors were tested and worked well in some simple 
scenarios:
+ *
+ * <ul>
+ *   <li>MySQL
+ *   <li>PostgreSQL
+ *   <li>SQLServer
+ *   <li>DB2
+ * </ul>
+ *
+ * <h3>Usage example</h3>
+ *
+ * <p>Connect to a Debezium - MySQL database and run a Pipeline
+ *
+ * <pre>
+ *     private static final ConnectorConfiguration mySqlConnectorConfig = 
ConnectorConfiguration
+ *             .create()
+ *             .withUsername("uname")
+ *             .withPassword("pwd123")
+ *             .withHostName("127.0.0.1")
+ *             .withPort("3306")
+ *             .withConnectorClass(MySqlConnector.class)
+ *             .withConnectionProperty("database.server.id", "184054")
+ *             .withConnectionProperty("database.server.name", "serverid")
+ *             .withConnectionProperty("database.include.list", "dbname")
+ *             .withConnectionProperty("database.history", 
DebeziumSDFDatabaseHistory.class.getName())
+ *             .withConnectionProperty("include.schema.changes", "false");
+ *
+ *      PipelineOptions options = PipelineOptionsFactory.create();
+ *      Pipeline p = Pipeline.create(options);
+ *      p.apply(DebeziumIO.read()
+ *               .withConnectorConfiguration(mySqlConnectorConfig)
+ *               .withFormatFunction(new 
SourceRecordJson.SourceRecordJsonMapper())
+ *       ).setCoder(StringUtf8Coder.of());
+ *       p.run().waitUntilFinish();
+ * </pre>
+ *
+ * <p>In this example we are using {@link 
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory} to
+ * handle the Database history.
+ *
+ * <h3>Dependencies</h3>
+ *
+ * <p>User may work with any of the supported Debezium Connectors above 
mentioned
+ *
+ * <p>See <a 
href="https://debezium.io/documentation/reference/1.3/connectors/index.html";>Debezium
+ * Connectors</a> for more info.
+ */
+@Experimental(Kind.SOURCE_SINK)
+@SuppressWarnings({"nullness"})
+public class DebeziumIO {
+  private static final Logger LOG = LoggerFactory.getLogger(DebeziumIO.class);
+
+  /**
+   * Read data from a Debezium source.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return new AutoValue_DebeziumIO_Read.Builder<T>().build();
+  }
+
+  /**
+   * Read data from Debezium source and convert a Kafka {@link
+   * org.apache.kafka.connect.source.SourceRecord} into a JSON string using 
{@link
+   * org.apache.beam.io.debezium.SourceRecordJson.SourceRecordJsonMapper} as 
default function
+   * mapper.
+   *
+   * @return Reader object of String.
+   */
+  public static Read<String> readAsJson() {
+    return new AutoValue_DebeziumIO_Read.Builder<String>()
+        .setFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
+        .setCoder(StringUtf8Coder.of())
+        .build();
+  }
+
+  /** Disallow construction of utility class. */
+  private DebeziumIO() {}
+
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+
+    private static final long serialVersionUID = 1L;
+
+    abstract @Nullable ConnectorConfiguration getConnectorConfiguration();
+
+    abstract @Nullable SourceRecordMapper<T> getFormatFunction();
+
+    abstract @Nullable Integer getMaxNumberOfRecords();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setConnectorConfiguration(ConnectorConfiguration 
config);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setFormatFunction(SourceRecordMapper<T> mapperFn);
+
+      abstract Builder<T> setMaxNumberOfRecords(Integer maxNumberOfRecords);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Applies the given configuration to the connector. It cannot be null.
+     *
+     * @param config Configuration to be used within the connector.
+     * @return PTransform {@link #read}
+     */
+    public Read<T> withConnectorConfiguration(final ConnectorConfiguration 
config) {
+      checkArgument(config != null, "config can not be null");
+      return toBuilder().setConnectorConfiguration(config).build();
+    }
+
+    /**
+     * Applies a {@link SourceRecordMapper} to the connector. It cannot be 
null.
+     *
+     * @param mapperFn the mapper function to be used on each {@link
+     *     org.apache.kafka.connect.source.SourceRecord}.
+     * @return PTransform {@link #read}
+     */
+    public Read<T> withFormatFunction(SourceRecordMapper<T> mapperFn) {
+      checkArgument(mapperFn != null, "mapperFn can not be null");
+      return toBuilder().setFormatFunction(mapperFn).build();
+    }
+
+    /**
+     * Applies a {@link Coder} to the connector. It cannot be null
+     *
+     * @param coder The Coder to be used over the data.
+     * @return PTransform {@link #read}
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /**
+     * Once the specified number of records has been reached, it will stop 
fetching them. The value
+     * can be null (default) which means it will not stop.
+     *
+     * @param maxNumberOfRecords The maximum number of records to be fetched 
before stop.
+     * @return PTransform {@link #read}
+     */
+    public Read<T> withMaxNumberOfRecords(Integer maxNumberOfRecords) {
+      return toBuilder().setMaxNumberOfRecords(maxNumberOfRecords).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      return input
+          .apply(
+              
Create.of(Lists.newArrayList(getConnectorConfiguration().getConfigurationMap()))
+                  .withCoder(MapCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of())))
+          .apply(
+              ParDo.of(
+                  new KafkaSourceConsumerFn<>(
+                      getConnectorConfiguration().getConnectorClass().get(),
+                      getFormatFunction(),
+                      getMaxNumberOfRecords())))
+          .setCoder(getCoder());
+    }
+  }
+
+  /** A POJO describing a Debezium configuration. */
+  @AutoValue
+  public abstract static class ConnectorConfiguration implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    abstract @Nullable ValueProvider<Class<?>> getConnectorClass();
+
+    abstract @Nullable ValueProvider<String> getHostName();
+
+    abstract @Nullable ValueProvider<String> getPort();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<SourceConnector> getSourceConnector();
+
+    abstract @Nullable ValueProvider<Map<String, String>> 
getConnectionProperties();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConnectorClass(ValueProvider<Class<?>> 
connectorClass);
+
+      abstract Builder setHostName(ValueProvider<String> hostname);
+
+      abstract Builder setPort(ValueProvider<String> port);
+
+      abstract Builder setUsername(ValueProvider<String> username);
+
+      abstract Builder setPassword(ValueProvider<String> password);
+
+      abstract Builder setConnectionProperties(
+          ValueProvider<Map<String, String>> connectionProperties);
+
+      abstract Builder setSourceConnector(ValueProvider<SourceConnector> 
sourceConnector);
+
+      abstract ConnectorConfiguration build();
+    }
+
+    /**
+     * Creates a ConnectorConfiguration.
+     *
+     * @return {@link ConnectorConfiguration}
+     */
+    public static ConnectorConfiguration create() {
+      return new AutoValue_DebeziumIO_ConnectorConfiguration.Builder()
+          .setConnectionProperties(ValueProvider.StaticValueProvider.of(new 
HashMap<>()))
+          .build();
+    }
+
+    /**
+     * Applies the connectorClass to be used to connect to your database.
+     *
+     * <p>Currently supported connectors are:
+     *
+     * <ul>
+     *   <li>{@link io.debezium.connector.mysql.MySqlConnector}
+     *   <li>{@link io.debezium.connector.postgresql.PostgresConnector}
+     *   <li>{@link io.debezium.connector.sqlserver.SqlServerConnector }
+     * </ul>
+     *
+     * @param connectorClass Any of the supported connectors.
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withConnectorClass(Class<?> connectorClass) {
+      checkArgument(connectorClass != null, "connectorClass can not be null");
+      return 
withConnectorClass(ValueProvider.StaticValueProvider.of(connectorClass));
+    }
+
+    /**
+     * Sets the connectorClass to be used to connect to your database. It 
cannot be null.
+     *
+     * <p>Currently supported connectors are:
+     *
+     * <ul>
+     *   <li>{@link io.debezium.connector.mysql.MySqlConnector}
+     *   <li>{@link io.debezium.connector.postgresql.PostgresConnector}
+     *   <li>{@link io.debezium.connector.sqlserver.SqlServerConnector }
+     * </ul>
+     *
+     * @param connectorClass (as ValueProvider)
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withConnectorClass(ValueProvider<Class<?>> 
connectorClass) {
+      checkArgument(connectorClass != null, "connectorClass can not be null");
+      return builder().setConnectorClass(connectorClass).build();
+    }
+
+    /**
+     * Sets the host name to be used on the database. It cannot be null.
+     *
+     * @param hostName The hostname of your database.
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withHostName(String hostName) {
+      checkArgument(hostName != null, "hostName can not be null");
+      return withHostName(ValueProvider.StaticValueProvider.of(hostName));
+    }
+
+    /**
+     * Sets the host name to be used on the database. It cannot be null.
+     *
+     * @param hostName The hostname of your database (as ValueProvider).
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withHostName(ValueProvider<String> hostName) 
{
+      checkArgument(hostName != null, "hostName can not be null");
+      return builder().setHostName(hostName).build();
+    }
+
+    /**
+     * Sets the port on which your database is listening. It cannot be null.
+     *
+     * @param port The port to be used to connect to your database (as 
ValueProvider).
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withPort(String port) {
+      checkArgument(port != null, "port can not be null");
+      return withPort(ValueProvider.StaticValueProvider.of(port));
+    }
+
+    /**
+     * Sets the port on which your database is listening. It cannot be null.
+     *
+     * @param port The port to be used to connect to your database.
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withPort(ValueProvider<String> port) {
+      checkArgument(port != null, "port can not be null");
+      return builder().setPort(port).build();
+    }
+
+    /**
+     * Sets the username to connect to your database. It cannot be null.
+     *
+     * @param username Database username
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withUsername(String username) {
+      checkArgument(username != null, "username can not be null");
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    /**
+     * Sets the username to connect to your database. It cannot be null.
+     *
+     * @param username (as ValueProvider).
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withUsername(ValueProvider<String> username) 
{
+      checkArgument(username != null, "username can not be null");
+      return builder().setUsername(username).build();
+    }
+
+    /**
+     * Sets the password to connect to your database. It cannot be null.
+     *
+     * @param password Database password
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withPassword(String password) {
+      checkArgument(password != null, "password can not be null");
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    /**
+     * Sets the password to connect to your database. It cannot be null.
+     *
+     * @param password (as ValueProvider).
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withPassword(ValueProvider<String> password) 
{
+      checkArgument(password != null, "password can not be null");
+      return builder().setPassword(password).build();
+    }
+
+    /**
+     * Sets a custom property to be used within the connection to your 
database.
+     *
+     * <p>You may use this to set special configurations such as:
+     *
+     * <ul>
+     *   <li>slot.name
+     *   <li>database.dbname
+     *   <li>database.server.id
+     *   <li>...
+     * </ul>
+     *
+     * @param connectionProperties Properties (Key, Value) Map
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withConnectionProperties(
+        Map<String, String> connectionProperties) {
+      checkArgument(connectionProperties != null, "connectionProperties can 
not be null");
+      return 
withConnectionProperties(ValueProvider.StaticValueProvider.of(connectionProperties));
+    }
+
+    /**
+     * Sets a custom property to be used within the connection to your 
database.
+     *
+     * <p>You may use this to set special configurations such as:
+     *
+     * <ul>
+     *   <li>slot.name
+     *   <li>database.dbname
+     *   <li>database.server.id
+     *   <li>...
+     * </ul>
+     *
+     * @param connectionProperties (as ValueProvider).
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withConnectionProperties(
+        ValueProvider<Map<String, String>> connectionProperties) {
+      checkArgument(connectionProperties != null, "connectionProperties can 
not be null");
+      return builder().setConnectionProperties(connectionProperties).build();
+    }
+
+    /**
+     * Sets a custom property to be used within the connection to your 
database.
+     *
+     * <p>You may use this to set special configurations such as:
+     *
+     * <ul>
+     *   <li>slot.name
+     *   <li>database.dbname
+     *   <li>database.server.id
+     *   <li>...
+     * </ul>
+     *
+     * @param key Property name
+     * @param value Property value
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withConnectionProperty(String key, String 
value) {
+      checkArgument(key != null, "key can not be null");
+      checkArgument(value != null, "value can not be null");
+      checkArgument(
+          getConnectionProperties().get() != null, "connectionProperties can 
not be null");
+
+      ConnectorConfiguration config = builder().build();
+      config.getConnectionProperties().get().putIfAbsent(key, value);
+      return config;
+    }
+
+    /**
+     * Sets the {@link SourceConnector} to be used. It cannot be null.
+     *
+     * @param sourceConnector Any supported connector
+     * @return {@link ConnectorConfiguration}
+     */
+    public ConnectorConfiguration withSourceConnector(SourceConnector 
sourceConnector) {
+      checkArgument(sourceConnector != null, "sourceConnector can not be 
null");
+      return 
withSourceConnector(ValueProvider.StaticValueProvider.of(sourceConnector));
+    }
+
+    public ConnectorConfiguration withSourceConnector(
+        ValueProvider<SourceConnector> sourceConnector) {
+      checkArgument(sourceConnector != null, "sourceConnector can not be 
null");
+      return builder().setSourceConnector(sourceConnector).build();
+    }
+
+    /**
+     * Configuration Map Getter.
+     *
+     * @return Configuration Map.
+     */
+    public Map<String, String> getConfigurationMap() {
+      HashMap<String, String> configuration = new HashMap<>();
+
+      configuration.computeIfAbsent(
+          "connector.class", k -> 
getConnectorClass().get().getCanonicalName());
+      configuration.computeIfAbsent("database.hostname", k -> 
getHostName().get());
+      configuration.computeIfAbsent("database.port", k -> getPort().get());
+      configuration.computeIfAbsent("database.user", k -> getUsername().get());
+      configuration.computeIfAbsent("database.password", k -> 
getPassword().get());
+
+      for (Map.Entry<String, String> entry : 
getConnectionProperties().get().entrySet()) {
+        configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue());
+      }
+
+      // Set default Database History impl. if not provided
+      configuration.computeIfAbsent(
+          "database.history",
+          k -> 
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName());
+
+      String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> 
").join(configuration);
+      LOG.debug("---------------- Connector configuration: {}", 
stringProperties);
+
+      return configuration;
+    }
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
new file mode 100644
index 0000000..c5a5b4f
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import io.debezium.document.Document;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.HistoryRecord;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ * <h3>Quick Overview</h3>
+ *
+ * SDF used to process records fetched from supported Debezium Connectors.
+ *
+ * <p>Currently it has a time limiter (see {@link OffsetTracker}) which, if 
set, it will stop
+ * automatically after the specified elapsed minutes. Otherwise, it will keep 
running until the user
+ * explicitly interrupts it.
+ *
+ * <p>It might be initialized either as:
+ *
+ * <pre>KafkaSourceConsumerFn(connectorClass, SourceRecordMapper)</pre>
+ *
+ * Or with a time limiter:
+ *
+ * <pre>KafkaSourceConsumerFn(connectorClass, SourceRecordMapper, 
minutesToRun)</pre>
+ */
+@SuppressWarnings({"nullness"})
+public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceConsumerFn.class);
+  public static final String BEAM_INSTANCE_PROPERTY = "beam.parent.instance";
+
+  private final Class<? extends SourceConnector> connectorClass;
+  private final SourceRecordMapper<T> fn;
+
+  private static long minutesToRun = -1;
+  private static Integer maxRecords;
+  private static DateTime startTime;
+  private static final Map<String, RestrictionTracker<OffsetHolder, 
Map<String, Object>>>
+      restrictionTrackers = new ConcurrentHashMap<>();
+
+  /**
+   * Initializes the SDF with a time limit.
+   *
+   * @param connectorClass Supported Debezium connector class
+   * @param fn a SourceRecordMapper
+   * @param minutesToRun Maximum time to run (in minutes)
+   */
+  public KafkaSourceConsumerFn(
+      Class<?> connectorClass, SourceRecordMapper<T> fn, long minutesToRun) {
+    this.connectorClass = (Class<? extends SourceConnector>) connectorClass;
+    this.fn = fn;
+    KafkaSourceConsumerFn.minutesToRun = minutesToRun;
+  }
+
+  /**
+   * Initializes the SDF to be run indefinitely.
+   *
+   * @param connectorClass Supported Debezium connector class
+   * @param fn a SourceRecordMapper
+   */
+  public KafkaSourceConsumerFn(
+      Class<?> connectorClass, SourceRecordMapper<T> fn, Integer maxRecords) {
+    this.connectorClass = (Class<? extends SourceConnector>) connectorClass;
+    this.fn = fn;
+    KafkaSourceConsumerFn.maxRecords = maxRecords;
+  }
+
+  @GetInitialRestriction
+  public OffsetHolder getInitialRestriction(@Element Map<String, String> 
unused)
+      throws IOException {
+    KafkaSourceConsumerFn.startTime = new DateTime();
+    return new OffsetHolder(null, null, null);
+  }
+
+  @NewTracker
+  public RestrictionTracker<OffsetHolder, Map<String, Object>> newTracker(
+      @Restriction OffsetHolder restriction) {
+    return new OffsetTracker(restriction);
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetHolder> getRestrictionCoder() {
+    return SerializableCoder.of(OffsetHolder.class);
+  }
+
+  /**
+   * Process the retrieved element. Currently it just logs the retrieved 
record as JSON.
+   *
+   * @param element Record retrieved
+   * @param tracker Restriction Tracker
+   * @param receiver Output Receiver
+   * @return
+   * @throws Exception
+   */
+  @DoFn.ProcessElement
+  public ProcessContinuation process(
+      @Element Map<String, String> element,
+      RestrictionTracker<OffsetHolder, Map<String, Object>> tracker,
+      OutputReceiver<T> receiver)
+      throws Exception {
+    Map<String, String> configuration = new HashMap<>(element);
+
+    // Adding the current restriction to the class object to be found by the 
database history
+    restrictionTrackers.put(this.getHashCode(), tracker);
+    configuration.put(BEAM_INSTANCE_PROPERTY, this.getHashCode());
+
+    SourceConnector connector = 
connectorClass.getDeclaredConstructor().newInstance();
+    connector.start(configuration);
+
+    SourceTask task = (SourceTask) 
connector.taskClass().getDeclaredConstructor().newInstance();
+
+    try {
+      Map<String, ?> consumerOffset = tracker.currentRestriction().offset;
+      LOG.debug("--------- Consumer offset from Debezium Tracker: {}", 
consumerOffset);
+
+      task.initialize(new 
BeamSourceTaskContext(tracker.currentRestriction().offset));
+      task.start(connector.taskConfigs(1).get(0));
+
+      List<SourceRecord> records = task.poll();
+
+      if (records == null) {
+        LOG.debug("-------- Pulled records null");
+        return ProcessContinuation.stop();
+      }
+
+      LOG.debug("-------- {} records found", records.size());
+      if (!records.isEmpty()) {
+        for (SourceRecord record : records) {
+          LOG.debug("-------- Record found: {}", record);
+
+          Map<String, Object> offset = (Map<String, Object>) 
record.sourceOffset();
+
+          if (offset == null || !tracker.tryClaim(offset)) {
+            LOG.debug("-------- Offset null or could not be claimed");
+            return ProcessContinuation.stop();
+          }
+
+          T json = this.fn.mapSourceRecord(record);
+          LOG.debug("****************** RECEIVED SOURCE AS JSON: {}", json);
+
+          receiver.output(json);
+        }
+
+        task.commit();
+      }
+    } catch (Exception ex) {
+      LOG.error(
+          "-------- Error on consumer: {}. with stacktrace: {}",
+          ex.getMessage(),
+          ex.getStackTrace());
+    } finally {
+      restrictionTrackers.remove(this.getHashCode());
+
+      LOG.debug("------- Stopping SourceTask");
+      task.stop();
+    }
+
+    return 
ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1));
+  }
+
+  public String getHashCode() {
+    return Integer.toString(System.identityHashCode(this));
+  }
+
+  private static class BeamSourceTaskContext implements SourceTaskContext {
+    private final @Nullable Map<String, ?> initialOffset;
+
+    BeamSourceTaskContext(@Nullable Map<String, ?> initialOffset) {
+      this.initialOffset = initialOffset;
+    }
+
+    @Override
+    public Map<String, String> configs() {
+      // TODO(pabloem): Do we need to implement this?
+      throw new UnsupportedOperationException("unimplemented");
+    }
+
+    @Override
+    public OffsetStorageReader offsetStorageReader() {
+      LOG.debug("------------- Creating an offset storage reader");
+      return new DebeziumSourceOffsetStorageReader(initialOffset);
+    }
+  }
+
+  private static class DebeziumSourceOffsetStorageReader implements 
OffsetStorageReader {
+    private final Map<String, ?> offset;
+
+    DebeziumSourceOffsetStorageReader(Map<String, ?> initialOffset) {
+      this.offset = initialOffset;
+    }
+
+    @Override
+    public <V> Map<String, Object> offset(Map<String, V> partition) {
+      return offsets(Collections.singletonList(partition))
+          .getOrDefault(partition, ImmutableMap.of());
+    }
+
+    @Override
+    public <T> Map<Map<String, T>, Map<String, Object>> offsets(
+        Collection<Map<String, T>> partitions) {
+      LOG.debug("-------------- GETTING OFFSETS!");
+
+      Map<Map<String, T>, Map<String, Object>> map = new HashMap<>();
+      for (Map<String, T> partition : partitions) {
+        map.put(partition, (Map<String, Object>) offset);
+      }
+
+      LOG.debug("-------------- OFFSETS: {}", map);
+      return map;
+    }
+  }
+
+  static class OffsetHolder implements Serializable {
+    public final @Nullable Map<String, ?> offset;
+    public final @Nullable List<?> history;
+    public final @Nullable Integer fetchedRecords;
+
+    OffsetHolder(
+        @Nullable Map<String, ?> offset,
+        @Nullable List<?> history,
+        @Nullable Integer fetchedRecords) {
+      this.offset = offset;
+      this.history = history == null ? new ArrayList<>() : history;
+      this.fetchedRecords = fetchedRecords;
+    }
+  }
+
+  /** {@link RestrictionTracker} for Debezium connectors. */
+  static class OffsetTracker extends RestrictionTracker<OffsetHolder, 
Map<String, Object>> {
+    private OffsetHolder restriction;
+    private static final long MILLIS = 60 * 1000;
+
+    OffsetTracker(OffsetHolder holder) {
+      this.restriction = holder;
+    }
+
+    /**
+     * Overriding {@link #tryClaim} in order to stop fetching records from the 
database.
+     *
+     * <p>This works on two different ways:
+     *
+     * <h3>Number of records</h3>
+     *
+     * <p>This is the default behavior. Once the specified number of records 
has been reached, it
+     * will stop fetching them.
+     *
+     * <h3>Time based</h3>
+     *
+     * User may specify the amount of time the connector to be kept alive. 
Please see {@link
+     * KafkaSourceConsumerFn} for more details on this.
+     *
+     * @param position Currently not used
+     * @return boolean
+     */
+    @Override
+    public boolean tryClaim(Map<String, Object> position) {
+      LOG.debug("-------------- Claiming {} used to have: {}", position, 
restriction.offset);
+      long elapsedTime = System.currentTimeMillis() - startTime.getMillis();
+      int fetchedRecords =
+          this.restriction.fetchedRecords == null ? 0 : 
this.restriction.fetchedRecords + 1;
+      LOG.debug("-------------- Time running: {} / {}", elapsedTime, 
(minutesToRun * MILLIS));
+      this.restriction = new OffsetHolder(position, this.restriction.history, 
fetchedRecords);
+      LOG.debug("-------------- History: {}", this.restriction.history);
+
+      if (maxRecords == null && minutesToRun == -1) {
+        return true;
+      }
+
+      if (maxRecords != null) {
+        return fetchedRecords < maxRecords;
+      }
+
+      return elapsedTime < minutesToRun * MILLIS;
+    }
+
+    @Override
+    public OffsetHolder currentRestriction() {
+      return restriction;
+    }
+
+    @Override
+    public SplitResult<OffsetHolder> trySplit(double fractionOfRemainder) {
+      LOG.debug("-------------- Trying to split: fractionOfRemainder={}", 
fractionOfRemainder);
+      return SplitResult.of(new OffsetHolder(null, null, null), restriction);
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {}
+
+    @Override
+    public IsBounded isBounded() {
+      return IsBounded.BOUNDED;
+    }
+  }
+
+  public static class DebeziumSDFDatabaseHistory extends 
AbstractDatabaseHistory {
+    private List<byte[]> history;
+
+    public DebeziumSDFDatabaseHistory() {
+      this.history = new ArrayList<>();
+    }
+
+    @Override
+    public void start() {
+      super.start();
+      LOG.debug(
+          "------------ STARTING THE DATABASE HISTORY! - trackers: {} - 
config: {}",
+          restrictionTrackers,
+          config.asMap());
+
+      // We fetch the first key to get the first restriction tracker.
+      // TODO(BEAM-11737): This may cause issues with multiple trackers in the 
future.
+      RestrictionTracker<OffsetHolder, ?> tracker =
+          
restrictionTrackers.get(restrictionTrackers.keySet().iterator().next());
+      this.history = (List<byte[]>) tracker.currentRestriction().history;
+    }
+
+    @Override
+    protected void storeRecord(HistoryRecord record) throws 
DatabaseHistoryException {
+      LOG.debug("------------- Adding history! {}", record);
+
+      
history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document()));
+    }
+
+    @Override
+    protected void recoverRecords(Consumer<HistoryRecord> consumer) {
+      LOG.debug("------------- Trying to recover!");
+
+      try {
+        for (byte[] record : history) {
+          Document doc = DocumentReader.defaultReader().read(record);
+          consumer.accept(new HistoryRecord(doc));
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public boolean exists() {
+      return history != null && !history.isEmpty();
+    }
+
+    @Override
+    public boolean storageExists() {
+      return history != null && !history.isEmpty();
+    }
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java
new file mode 100644
index 0000000..10bcb14
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordJson.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.GsonBuilder;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * This class can be used as a mapper for each {@link SourceRecord} retrieved.
+ *
+ * <h3>What it does</h3>
+ *
+ * <p>It maps any SourceRecord retrieved from any supported {@link 
io.debezium.connector} to JSON
+ *
+ * <h3>How it works</h3>
+ *
+ * <p>It will extract valuable fields from any given SourceRecord:
+ *
+ * <ul>
+ *   <li>before - {@link #loadBefore}
+ *   <li>after - {@link #loadAfter}
+ *   <li>metadata - {@link #loadMetadata}
+ *       <ul>
+ *         <li>schema - Database Schema
+ *         <li>connector - Connector used
+ *         <li>version - Connector version
+ *       </ul>
+ * </ul>
+ *
+ * <h3>Usage Example</h3>
+ *
+ * <p>Map each SourceRecord to JSON
+ *
+ * <pre>
+ *     DebeziumIO.read()
+ *         .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()
+ * </pre>
+ */
+@SuppressWarnings({"nullness"})
+public class SourceRecordJson {
+  private final @Nullable SourceRecord sourceRecord;
+  private final @Nullable Struct value;
+  private final @Nullable Event event;
+
+  /**
+   * Initializer.
+   *
+   * @param sourceRecord retrieved SourceRecord using a supported 
SourceConnector
+   */
+  public SourceRecordJson(@Nullable SourceRecord sourceRecord) {
+    if (sourceRecord == null) {
+      throw new IllegalArgumentException();
+    }
+
+    this.sourceRecord = sourceRecord;
+    this.value = (Struct) sourceRecord.value();
+
+    if (this.value == null) {
+      this.event = new Event(null, null, null);
+    } else {
+      Event.Metadata metadata = this.loadMetadata();
+      Event.Before before = this.loadBefore();
+      Event.After after = this.loadAfter();
+
+      this.event = new Event(metadata, before, after);
+    }
+  }
+
+  /**
+   * Extracts metadata from the SourceRecord.
+   *
+   * @return Metadata
+   */
+  private Event.Metadata loadMetadata() {
+    @Nullable Struct source;
+    try {
+      source = (Struct) this.value.get("source");
+    } catch (RuntimeException e) {
+      throw new IllegalArgumentException();
+    }
+    @Nullable String schema;
+
+    if (source == null) {
+      return null;
+    }
+
+    try {
+      // PostgreSQL and SQL server use Schema
+      schema = source.getString("schema");
+    } catch (DataException e) {
+      // MySQL uses file instead
+      schema = source.getString("file");
+    }
+
+    return new Event.Metadata(
+        source.getString("connector"),
+        source.getString("version"),
+        source.getString("name"),
+        source.getString("db"),
+        schema,
+        source.getString("table"));
+  }
+
+  /**
+   * Extracts the before field within SourceRecord.
+   *
+   * @return Before
+   */
+  private Event.Before loadBefore() {
+    @Nullable Struct before;
+    try {
+      before = (Struct) this.value.get("before");
+    } catch (DataException e) {
+      return null;
+    }
+    if (before == null) {
+      return null;
+    }
+
+    Map<String, Object> fields = new HashMap<>();
+    for (Field field : before.schema().fields()) {
+      fields.put(field.name(), before.get(field));
+    }
+
+    return new Event.Before(fields);
+  }
+
+  /**
+   * Extracts the after field within SourceRecord.
+   *
+   * @return After
+   */
+  private Event.After loadAfter() {
+    @Nullable Struct after;
+    try {
+      after = (Struct) this.value.get("after");
+    } catch (DataException e) {
+      return null;
+    }
+    if (after == null) {
+      return null;
+    }
+
+    Map<String, Object> fields = new HashMap<>();
+    for (Field field : after.schema().fields()) {
+      fields.put(field.name(), after.get(field));
+    }
+
+    return new Event.After(fields);
+  }
+
+  /**
+   * Transforms the extracted data to a JSON string.
+   *
+   * @return JSON String
+   */
+  public String toJson() {
+    return this.event.toJson();
+  }
+
+  /** {@link SourceRecordJson} implementation. */
+  public static class SourceRecordJsonMapper implements 
SourceRecordMapper<String> {
+    @Override
+    public String mapSourceRecord(SourceRecord sourceRecord) throws Exception {
+      return new SourceRecordJson(sourceRecord).toJson();
+    }
+  }
+
+  /** Depicts a SourceRecord as an Event in order for it to be mapped as JSON. 
*/
+  static class Event implements Serializable {
+    private final SourceRecordJson.Event.Metadata metadata;
+    private final SourceRecordJson.Event.Before before;
+    private final SourceRecordJson.Event.After after;
+
+    /**
+     * Event Initializer.
+     *
+     * @param metadata Metadata retrieved from SourceRecord
+     * @param before Before data retrieved from SourceRecord
+     * @param after After data retrieved from SourceRecord
+     */
+    public Event(
+        SourceRecordJson.Event.Metadata metadata,
+        SourceRecordJson.Event.Before before,
+        SourceRecordJson.Event.After after) {
+      this.metadata = metadata;
+      this.before = before;
+      this.after = after;
+    }
+
+    /**
+     * Transforms the Event to a JSON string.
+     *
+     * @return JSON String
+     */
+    public String toJson() {
+      Gson gson = new GsonBuilder().serializeNulls().create();
+      return gson.toJson(this);
+    }
+
+    /** Depicts the metadata within a SourceRecord. It has valuable fields. */
+    static class Metadata implements Serializable {
+      private final @Nullable String connector;
+      private final @Nullable String version;
+      private final @Nullable String name;
+      private final @Nullable String database;
+      private final @Nullable String schema;
+      private final @Nullable String table;
+
+      /**
+       * Metadata Initializer.
+       *
+       * @param connector Connector used
+       * @param version Connector version
+       * @param name Connector name
+       * @param database DB name
+       * @param schema Schema name
+       * @param table Table name
+       */
+      public Metadata(
+          @Nullable String connector,
+          @Nullable String version,
+          @Nullable String name,
+          @Nullable String database,
+          @Nullable String schema,
+          @Nullable String table) {
+        this.connector = connector;
+        this.version = version;
+        this.name = name;
+        this.database = database;
+        this.schema = schema;
+        this.table = table;
+      }
+    }
+
+    /** Depicts the before field within SourceRecord. */
+    static class Before implements Serializable {
+      private final @Nullable Map<String, Object> fields;
+
+      /**
+       * Before Initializer.
+       *
+       * @param fields Key - Value map with information within Before
+       */
+      public Before(@Nullable Map<String, Object> fields) {
+        this.fields = fields;
+      }
+    }
+
+    /** Depicts the after field within SourceRecord. */
+    static class After implements Serializable {
+      private final @Nullable Map<String, Object> fields;
+
+      /**
+       * After Initializer.
+       *
+       * @param fields Key - Value map with information within After
+       */
+      public After(@Nullable Map<String, Object> fields) {
+        this.fields = fields;
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java
new file mode 100644
index 0000000..65e42e9
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/SourceRecordMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Interface used to map a Kafka source record.
+ *
+ * @param <T> The desired type you want to map the Kafka source record
+ */
+@FunctionalInterface
+public interface SourceRecordMapper<T> extends Serializable {
+  T mapSourceRecord(SourceRecord sourceRecord) throws Exception;
+}
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java
new file mode 100644
index 0000000..86ba1f5
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading from DebeziumIO.
+ *
+ * @see org.apache.beam.io.debezium.DebeziumIO
+ */
+@Experimental(Kind.SOURCE_SINK)
+package org.apache.beam.io.debezium;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
new file mode 100644
index 0000000..6056ca0
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import io.debezium.connector.mysql.MySqlConnector;
+import java.time.Duration;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class DebeziumIOMySqlConnectorIT {
+  /**
+   * Debezium - MySqlContainer
+   *
+   * <p>Creates a docker container using the image used by the debezium 
tutorial.
+   */
+  @ClassRule
+  public static final MySQLContainer<?> MY_SQL_CONTAINER =
+      new MySQLContainer<>(
+              DockerImageName.parse("debezium/example-mysql:1.4")
+                  .asCompatibleSubstituteFor("mysql"))
+          .withPassword("debezium")
+          .withUsername("mysqluser")
+          .withExposedPorts(3306)
+          .waitingFor(
+              new HttpWaitStrategy()
+                  .forPort(3306)
+                  .forStatusCodeMatching(response -> response == 200)
+                  .withStartupTimeout(Duration.ofMinutes(2)));
+
+  /**
+   * Debezium - MySQL connector Test.
+   *
+   * <p>Tests that connector can actually connect to the database
+   */
+  @Test
+  public void testDebeziumIOMySql() {
+    MY_SQL_CONTAINER.start();
+
+    String host = MY_SQL_CONTAINER.getContainerIpAddress();
+    String port = MY_SQL_CONTAINER.getMappedPort(3306).toString();
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Pipeline p = Pipeline.create(options);
+    PCollection<String> results =
+        p.apply(
+            DebeziumIO.<String>read()
+                .withConnectorConfiguration(
+                    DebeziumIO.ConnectorConfiguration.create()
+                        .withUsername("debezium")
+                        .withPassword("dbz")
+                        .withConnectorClass(MySqlConnector.class)
+                        .withHostName(host)
+                        .withPort(port)
+                        .withConnectionProperty("database.server.id", "184054")
+                        .withConnectionProperty("database.server.name", 
"dbserver1")
+                        .withConnectionProperty("database.include.list", 
"inventory")
+                        .withConnectionProperty("include.schema.changes", 
"false"))
+                .withFormatFunction(new 
SourceRecordJson.SourceRecordJsonMapper())
+                .withMaxNumberOfRecords(30)
+                .withCoder(StringUtf8Coder.of()));
+    String expected =
+        
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
+            + 
"\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,"
+            + "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\","
+            + "\"street\":\"3183 Moore 
Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,"
+            + "\"type\":\"SHIPPING\"}}}";
+
+    PAssert.that(results)
+        .satisfies(
+            (Iterable<String> res) -> {
+              assertThat(res, hasItem(expected));
+              return null;
+            });
+
+    p.run().waitUntilFinish();
+    MY_SQL_CONTAINER.stop();
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
new file mode 100644
index 0000000..ccf57b6
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnector;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.io.debezium.DebeziumIO.ConnectorConfiguration;
+import org.apache.kafka.common.config.ConfigValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test on the DebeziumIO. */
+@RunWith(JUnit4.class)
+public class DebeziumIOTest implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumIOTest.class);
+  private static final ConnectorConfiguration MYSQL_CONNECTOR_CONFIGURATION =
+      ConnectorConfiguration.create()
+          .withUsername("debezium")
+          .withPassword("dbz")
+          .withHostName("127.0.0.1")
+          .withPort("3306")
+          .withConnectorClass(MySqlConnector.class)
+          .withConnectionProperty("database.server.id", "184054")
+          .withConnectionProperty("database.server.name", "dbserver1")
+          .withConnectionProperty("database.include.list", "inventory")
+          .withConnectionProperty(
+              "database.history", 
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName())
+          .withConnectionProperty("include.schema.changes", "false");
+
+  @Test
+  public void testSourceMySqlConnectorValidConfiguration() {
+    Map<String, String> configurationMap = 
MYSQL_CONNECTOR_CONFIGURATION.getConfigurationMap();
+
+    Configuration debeziumConf = Configuration.from(configurationMap);
+    Map<String, ConfigValue> validConfig = 
debeziumConf.validate(MySqlConnectorConfig.ALL_FIELDS);
+
+    for (ConfigValue configValue : validConfig.values()) {
+      assertTrue(configValue.errorMessages().isEmpty());
+    }
+  }
+
+  @Test
+  public void testSourceConnectorUsernamePassword() {
+    String username = "debezium";
+    String password = "dbz";
+    ConnectorConfiguration configuration =
+        
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password);
+    Map<String, String> configurationMap = configuration.getConfigurationMap();
+
+    Configuration debeziumConf = Configuration.from(configurationMap);
+    Map<String, ConfigValue> validConfig = 
debeziumConf.validate(MySqlConnectorConfig.ALL_FIELDS);
+
+    for (ConfigValue configValue : validConfig.values()) {
+      assertTrue(configValue.errorMessages().isEmpty());
+    }
+  }
+
+  @Test
+  public void testSourceConnectorNullPassword() {
+    String username = "debezium";
+    String password = null;
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> 
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password));
+  }
+
+  @Test
+  public void testSourceConnectorNullUsernameAndPassword() {
+    String username = null;
+    String password = null;
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> 
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password));
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java
new file mode 100644
index 0000000..c22f8a3
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class KafkaSourceConsumerFnTest implements Serializable {
+  @Test
+  public void testKafkaSourceConsumerFn() {
+    Map<String, String> config =
+        ImmutableMap.of(
+            "from", "1",
+            "to", "10",
+            "delay", "0.4",
+            "topic", "any");
+
+    Pipeline pipeline = Pipeline.create();
+
+    PCollection<Integer> counts =
+        pipeline
+            .apply(
+                Create.of(Lists.newArrayList(config))
+                    .withCoder(MapCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of())))
+            .apply(
+                ParDo.of(
+                    new KafkaSourceConsumerFn<>(
+                        CounterSourceConnector.class,
+                        sourceRecord -> (Integer) sourceRecord.value(),
+                        10)))
+            .setCoder(VarIntCoder.of());
+
+    PAssert.that(counts).containsInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testStoppableKafkaSourceConsumerFn() {
+    Map<String, String> config =
+        ImmutableMap.of(
+            "from", "1",
+            "to", "3",
+            "delay", "0.2",
+            "topic", "any");
+
+    Pipeline pipeline = Pipeline.create();
+
+    PCollection<Integer> counts =
+        pipeline
+            .apply(
+                Create.of(Lists.newArrayList(config))
+                    .withCoder(MapCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of())))
+            .apply(
+                ParDo.of(
+                    new KafkaSourceConsumerFn<>(
+                        CounterSourceConnector.class,
+                        sourceRecord -> (Integer) sourceRecord.value(),
+                        1)))
+            .setCoder(VarIntCoder.of());
+
+    pipeline.run().waitUntilFinish();
+    Assert.assertEquals(3, CounterTask.getCountTasks());
+  }
+}
+
+class CounterSourceConnector extends SourceConnector {
+  public static class CounterSourceConnectorConfig extends AbstractConfig {
+    final Map<String, String> props;
+
+    CounterSourceConnectorConfig(Map<String, String> props) {
+      super(configDef(), props);
+      this.props = props;
+    }
+
+    protected static ConfigDef configDef() {
+      return new ConfigDef()
+          .define("from", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, 
"Number to start from")
+          .define("to", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "Number 
to go to")
+          .define(
+              "delay", ConfigDef.Type.DOUBLE, ConfigDef.Importance.HIGH, "Time 
between each event")
+          .define(
+              "topic",
+              ConfigDef.Type.STRING,
+              ConfigDef.Importance.HIGH,
+              "Name of Kafka topic to produce to");
+    }
+  }
+
+  @Nullable private CounterSourceConnectorConfig connectorConfig;
+
+  @Override
+  public void start(Map<String, String> props) {
+    this.connectorConfig = new CounterSourceConnectorConfig(props);
+  }
+
+  @Override
+  public Class<? extends Task> taskClass() {
+    return CounterTask.class;
+  }
+
+  @Override
+  public List<Map<String, String>> taskConfigs(int maxTasks) {
+    if (this.connectorConfig == null || this.connectorConfig.props == null) {
+      return Collections.emptyList();
+    }
+
+    return Collections.singletonList(
+        ImmutableMap.of(
+            "from", this.connectorConfig.props.get("from"),
+            "to", this.connectorConfig.props.get("to"),
+            "delay", this.connectorConfig.props.get("delay"),
+            "topic", this.connectorConfig.props.get("topic")));
+  }
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public ConfigDef config() {
+    return CounterSourceConnectorConfig.configDef();
+  }
+
+  @Override
+  public String version() {
+    return "ONE";
+  }
+}
+
+class CounterTask extends SourceTask {
+  private static int countStopTasks = 0;
+  private String topic = "";
+  private Integer from = 0;
+  private Integer to = 0;
+  private Double delay = 0.0;
+
+  private Long start = System.currentTimeMillis();
+  private Integer last = 0;
+  private Object lastOffset = null;
+
+  private static final String PARTITION_FIELD = "mod";
+  private static final Integer PARTITION_NAME = 1;
+
+  @Override
+  public String version() {
+    return "ONE";
+  }
+
+  @Override
+  public void initialize(SourceTaskContext context) {
+    super.initialize(context);
+
+    Map<String, Object> offset =
+        context
+            .offsetStorageReader()
+            .offset(Collections.singletonMap(PARTITION_FIELD, PARTITION_NAME));
+
+    if (offset == null) {
+      this.start = System.currentTimeMillis();
+      this.last = 0;
+    } else {
+      this.start = (Long) offset.get("start");
+      this.last = ((Long) offset.getOrDefault("last", 0)).intValue();
+    }
+    this.lastOffset = offset;
+  }
+
+  @Override
+  public void start(Map<String, String> props) {
+    this.topic = props.getOrDefault("topic", "");
+    this.from = Integer.parseInt(props.getOrDefault("from", "0"));
+    this.to = Integer.parseInt(props.getOrDefault("to", "0"));
+    this.delay = Double.parseDouble(props.getOrDefault("delay", "0"));
+
+    if (this.lastOffset != null) {
+      return;
+    }
+
+    this.start =
+        props.containsKey("start")
+            ? Long.parseLong(props.get("start"))
+            : System.currentTimeMillis();
+    this.last = this.from - 1;
+  }
+
+  @Override
+  public List<SourceRecord> poll() throws InterruptedException {
+    if (this.last.equals(to)) {
+      return null;
+    }
+
+    List<SourceRecord> records = new ArrayList<>();
+    Long callTime = System.currentTimeMillis();
+    Long secondsSinceStart = (callTime - this.start) / 1000;
+    Long recordsToOutput = Math.round(Math.floor(secondsSinceStart / 
this.delay));
+
+    while (this.last < this.to) {
+      this.last = this.last + 1;
+      Map<String, Integer> sourcePartition = 
Collections.singletonMap(PARTITION_FIELD, 1);
+      Map<String, Long> sourceOffset =
+          ImmutableMap.of("last", this.last.longValue(), "start", this.start);
+
+      records.add(
+          new SourceRecord(
+              sourcePartition, sourceOffset, this.topic, Schema.INT64_SCHEMA, 
this.last));
+
+      if (records.size() >= recordsToOutput) {
+        break;
+      }
+    }
+
+    return records;
+  }
+
+  @Override
+  public void stop() {
+    CounterTask.countStopTasks++;
+  }
+
+  public static int getCountTasks() {
+    return CounterTask.countStopTasks;
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java
new file mode 100644
index 0000000..b8a6c9b
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.debezium.connector.mysql.MySqlConnector;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OffsetTrackerTest implements Serializable {
+  @Test
+  public void testRestrictByNumberOfRecords() throws IOException {
+    Integer maxNumRecords = 10;
+    Map<String, Object> position = new HashMap<>();
+    KafkaSourceConsumerFn<String> kafkaSourceConsumerFn =
+        new KafkaSourceConsumerFn<String>(
+            MySqlConnector.class, new 
SourceRecordJson.SourceRecordJsonMapper(), maxNumRecords);
+    KafkaSourceConsumerFn.OffsetHolder restriction =
+        kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>());
+    KafkaSourceConsumerFn.OffsetTracker tracker =
+        new KafkaSourceConsumerFn.OffsetTracker(restriction);
+
+    for (int records = 0; records < maxNumRecords; records++) {
+      assertTrue("OffsetTracker should continue", tracker.tryClaim(position));
+    }
+    assertFalse("OffsetTracker should stop", tracker.tryClaim(position));
+  }
+
+  @Test
+  public void testRestrictByAmountOfTime() throws IOException, 
InterruptedException {
+    long millis = 60 * 1000;
+    long minutesToRun = 1;
+    Map<String, Object> position = new HashMap<>();
+    KafkaSourceConsumerFn<String> kafkaSourceConsumerFn =
+        new KafkaSourceConsumerFn<String>(
+            MySqlConnector.class, new 
SourceRecordJson.SourceRecordJsonMapper(), minutesToRun);
+    KafkaSourceConsumerFn.OffsetHolder restriction =
+        kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>());
+    KafkaSourceConsumerFn.OffsetTracker tracker =
+        new KafkaSourceConsumerFn.OffsetTracker(restriction);
+
+    assertTrue(tracker.tryClaim(position));
+
+    Thread.sleep(minutesToRun * millis + 100);
+
+    assertFalse(tracker.tryClaim(position));
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
new file mode 100644
index 0000000..badd01e
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.Serializable;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SourceRecordJsonTest implements Serializable {
+  @Test
+  public void testSourceRecordJson() {
+    SourceRecord record = this.buildSourceRecord();
+    SourceRecordJson json = new SourceRecordJson(record);
+
+    String jsonString = json.toJson();
+
+    String expectedJson =
+        "{\"metadata\":"
+            + "{\"connector\":\"test-connector\","
+            + "\"version\":\"version-connector\","
+            + "\"name\":\"test-connector-sql\","
+            + "\"database\":\"test-db\","
+            + "\"schema\":\"test-schema\","
+            + "\"table\":\"test-table\"},"
+            + "\"before\":{\"fields\":{\"column1\":\"before-name\"}},"
+            + "\"after\":{\"fields\":{\"column1\":\"after-name\"}}}";
+
+    assertEquals(expectedJson, jsonString);
+  }
+
+  @Test
+  public void testSourceRecordJsonWhenSourceRecordIsNull() {
+    assertThrows(IllegalArgumentException.class, () -> new 
SourceRecordJson(null));
+  }
+
+  private Schema buildSourceSchema() {
+    return SchemaBuilder.struct()
+        .field("connector", Schema.STRING_SCHEMA)
+        .field("version", Schema.STRING_SCHEMA)
+        .field("name", Schema.STRING_SCHEMA)
+        .field("db", Schema.STRING_SCHEMA)
+        .field("schema", Schema.STRING_SCHEMA)
+        .field("table", Schema.STRING_SCHEMA)
+        .build();
+  }
+
+  private Schema buildBeforeSchema() {
+    return SchemaBuilder.struct().field("column1", 
Schema.STRING_SCHEMA).build();
+  }
+
+  private Schema buildAfterSchema() {
+    return SchemaBuilder.struct().field("column1", 
Schema.STRING_SCHEMA).build();
+  }
+
+  private SourceRecord buildSourceRecord() {
+    final Schema sourceSchema = this.buildSourceSchema();
+    final Schema beforeSchema = this.buildBeforeSchema();
+    final Schema afterSchema = this.buildAfterSchema();
+
+    final Schema schema =
+        SchemaBuilder.struct()
+            .name("test")
+            .field("source", sourceSchema)
+            .field("before", beforeSchema)
+            .field("after", afterSchema)
+            .build();
+
+    final Struct source = new Struct(sourceSchema);
+    final Struct before = new Struct(beforeSchema);
+    final Struct after = new Struct(afterSchema);
+    final Struct value = new Struct(schema);
+
+    source.put("connector", "test-connector");
+    source.put("version", "version-connector");
+    source.put("name", "test-connector-sql");
+    source.put("db", "test-db");
+    source.put("schema", "test-schema");
+    source.put("table", "test-table");
+
+    before.put("column1", "before-name");
+    after.put("column1", "after-name");
+
+    value.put("source", source);
+    value.put("before", before);
+    value.put("after", after);
+
+    return new SourceRecord(null, null, null, schema, value);
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index e6f037e..c01df18 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -140,6 +140,7 @@ include(":sdks:java:io:cassandra")
 include(":sdks:java:io:clickhouse")
 include(":sdks:java:io:common")
 include(":sdks:java:io:contextualtextio")
+include(":sdks:java:io:debezium")
 include(":sdks:java:io:elasticsearch")
 include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-2")
 include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-5")

Reply via email to