This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cecfa6128ce [IO] Update Debezium in DebeziumIO to 3.1.1 (#34763)
cecfa6128ce is described below

commit cecfa6128ceaecf3b8d02a8a42788b3505595845
Author: Tobias Kaymak <[email protected]>
AuthorDate: Sat Jun 14 03:01:33 2025 +0200

    [IO] Update Debezium in DebeziumIO to 3.1.1 (#34763)
    
    * Updating Debezium IO to 3.1.1
    Enforce JDK17 in build.gradle of Debezium IO
    
    * adjust to review comments by @Abacn
    
    * cleanup
    
    * mention which PR needs to merge before unpinning
    
    * Mention Beam version 2.66 in the README for Debezium
    
    ---------
    
    Co-authored-by: Shunping Huang <[email protected]>
---
 CHANGES.md                                         |  1 +
 sdks/java/io/debezium/build.gradle                 | 85 +++++++++++++------
 .../io/debezium/expansion-service/build.gradle     |  5 +-
 sdks/java/io/debezium/src/README.md                | 12 +--
 .../org/apache/beam/io/debezium/DebeziumIO.java    | 20 +++--
 .../DebeziumReadSchemaTransformProvider.java       |  2 +-
 .../beam/io/debezium/KafkaSourceConsumerFn.java    |  8 +-
 .../io/debezium/DebeziumIOMySqlConnectorIT.java    | 98 ++++++++++++++++++----
 .../debezium/DebeziumIOPostgresSqlConnectorIT.java | 13 +--
 .../apache/beam/io/debezium/DebeziumIOTest.java    |  1 -
 .../debezium/DebeziumReadSchemaTransformTest.java  | 28 +++++--
 11 files changed, 198 insertions(+), 75 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d4af889b2d2..2e75be8a79c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 ## I/Os
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) 
([#34747](https://github.com/apache/beam/issues/34747)).
 
 ## New Features / Improvements
 
diff --git a/sdks/java/io/debezium/build.gradle 
b/sdks/java/io/debezium/build.gradle
index 2070bcfc873..dcdfd33ce0a 100644
--- a/sdks/java/io/debezium/build.gradle
+++ b/sdks/java/io/debezium/build.gradle
@@ -24,6 +24,7 @@ applyJavaNature(
                 [id: 'io.confluent', url: 
'https://packages.confluent.io/maven/']
         ],
         enableSpotbugs: false,
+        requireJavaVersion: JavaVersion.VERSION_17,
 )
 provideIntegrationTestingDependencies()
 
@@ -38,10 +39,16 @@ dependencies {
     implementation library.java.joda_time
     provided library.java.jackson_dataformat_csv
     permitUnusedDeclared library.java.jackson_dataformat_csv
-    testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
-    testImplementation project(path: ":sdks:java:io:common")
+
+    // Kafka connect dependencies
+    implementation "org.apache.kafka:connect-api:3.9.0"
+
+    // Debezium dependencies
+    implementation group: 'io.debezium', name: 'debezium-core', version: 
'3.1.1.Final'
 
     // Test dependencies
+    testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
+    testImplementation project(path: ":sdks:java:io:common")
     testImplementation library.java.junit
     testImplementation project(path: ":sdks:java:io:jdbc")
     testRuntimeOnly library.java.slf4j_jdk14
@@ -49,29 +56,66 @@ dependencies {
     testImplementation project(":runners:google-cloud-dataflow-java")
     testImplementation library.java.hamcrest
     testImplementation library.java.testcontainers_base
-    testImplementation library.java.testcontainers_mysql
-    testImplementation library.java.testcontainers_postgresql
-    // TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires 
Java11+
-    testImplementation 'com.zaxxer:HikariCP:4.0.3'
+    testImplementation "org.testcontainers:kafka"
+    testImplementation "org.testcontainers:mysql"
+    testImplementation "org.testcontainers:postgresql"
+    testImplementation 
"io.debezium:debezium-testing-testcontainers:3.1.1.Final"
+    testImplementation 'com.zaxxer:HikariCP:5.1.0'
 
-    // Kafka connect dependencies
-    implementation "org.apache.kafka:connect-api:2.5.0"
-    implementation "org.apache.kafka:connect-json:2.5.0"
-    permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761
+    // Debezium connector implementations for testing
+    testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', 
version: '3.1.1.Final'
+    testImplementation group: 'io.debezium', name: 
'debezium-connector-postgres', version: '3.1.1.Final'
+}
 
-    // Debezium dependencies
-    implementation group: 'io.debezium', name: 'debezium-core', version: 
'1.3.1.Final'
-    testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', 
version: '1.3.1.Final'
-    testImplementation group: 'io.debezium', name: 
'debezium-connector-postgres', version: '1.3.1.Final'
+// TODO: Remove pin after Beam has unpinned it (PR #35231)
+// Pin the Antlr version
+configurations.all {
+    resolutionStrategy {
+        force 'org.antlr:antlr4:4.10', 'org.antlr:antlr4-runtime:4.10'
+    }
+}
+
+// TODO: Remove pin after upgrading Beam's Jackson version
+// Force Jackson versions for the test runtime classpath
+configurations.named("testRuntimeClasspath") {
+    resolutionStrategy.force (
+        'com.fasterxml.jackson.core:jackson-core:2.17.1',
+        'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
+        'com.fasterxml.jackson.core:jackson-databind:2.17.1',
+        'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1',
+        'com.fasterxml.jackson.module:jackson-module-afterburner:2.17.1'
+    )
+}
+
+def configureTestJvmArgs(Task task) {
+    List<String> currentJvmArgs = task.jvmArgs ? new ArrayList<>(task.jvmArgs) 
: new ArrayList<>()
+
+    // Add standard opens required for Jackson, Afterburner, and other 
reflective frameworks
+    // when dealing with Java Modules or complex classloader scenarios.
+    currentJvmArgs.addAll([
+        '--add-opens=java.base/java.lang=ALL-UNNAMED',
+        '--add-opens=java.base/java.util=ALL-UNNAMED',
+        '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
+        '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
+        '--add-opens=java.base/java.io=ALL-UNNAMED',
+        '--add-opens=java.base/java.nio=ALL-UNNAMED',
+        '--add-opens=java.base/java.math=ALL-UNNAMED',
+        '--add-opens=java.base/java.time=ALL-UNNAMED', // For JSR310 types
+    ])
+
+    task.jvmArgs = currentJvmArgs.unique() // Assign the new, filtered list 
back, removing duplicates
+    project.logger.lifecycle("Task ${task.name} final jvmArgs: 
${task.jvmArgs.join(' ')}")
 }
 
 test {
     testLogging {
         outputs.upToDateWhen {false}
         showStandardStreams = true
+        exceptionFormat = 'full'
     }
-}
+    configureTestJvmArgs(delegate)
 
+}
 
 task integrationTest(type: Test, dependsOn: processTestResources) {
   group = "Verification"
@@ -89,14 +133,3 @@ task integrationTest(type: Test, dependsOn: 
processTestResources) {
   useJUnit {
   }
 }
-
-configurations.all (Configuration it) -> {
-  resolutionStrategy {
-    // Force protobuf 3 because debezium is currently incompatible with 
protobuf 4.
-    // TODO - remove this and upgrade the version of debezium once a proto-4 
compatible version is available
-    // https://github.com/apache/beam/pull/33526 does some of this, but was 
abandoned because it still doesn't
-    // work with protobuf 4.
-    force "com.google.protobuf:protobuf-java:3.25.5"
-    force "com.google.protobuf:protobuf-java-util:3.25.5"
-  }
-}
diff --git a/sdks/java/io/debezium/expansion-service/build.gradle 
b/sdks/java/io/debezium/expansion-service/build.gradle
index 3cc6905893f..3c244d3bcef 100644
--- a/sdks/java/io/debezium/expansion-service/build.gradle
+++ b/sdks/java/io/debezium/expansion-service/build.gradle
@@ -25,6 +25,7 @@ applyJavaNature(
         exportJavadoc: false,
         validateShadowJar: false,
         shadowClosure: {},
+        requireJavaVersion: JavaVersion.VERSION_17,
 )
 
 description = "Apache Beam :: SDKs :: Java :: IO :: Debezium :: Expansion 
Service"
@@ -38,10 +39,10 @@ dependencies {
     runtimeOnly library.java.slf4j_jdk14
 
     // Debezium runtime dependencies
-    def debezium_version = '1.3.1.Final'
+    def debezium_version = '3.1.1.Final'
     runtimeOnly group: 'io.debezium', name: 'debezium-connector-mysql', 
version: debezium_version
     runtimeOnly group: 'io.debezium', name: 'debezium-connector-postgres', 
version: debezium_version
     runtimeOnly group: 'io.debezium', name: 'debezium-connector-sqlserver', 
version: debezium_version
     runtimeOnly group: 'io.debezium', name: 'debezium-connector-oracle', 
version: debezium_version
     runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version: 
debezium_version
-}
\ No newline at end of file
+}
diff --git a/sdks/java/io/debezium/src/README.md 
b/sdks/java/io/debezium/src/README.md
index 4cf9be81c61..e56ac370b70 100644
--- a/sdks/java/io/debezium/src/README.md
+++ b/sdks/java/io/debezium/src/README.md
@@ -25,7 +25,7 @@ DebeziumIO is an Apache Beam connector that lets users 
connect their Events-Driv
 
 ### Getting Started
 
-DebeziumIO uses [Debezium Connectors 
v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect 
to Apache Beam. All you need to do is choose the Debezium Connector that suits 
your Debezium setup and pick a [Serializable 
Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html),
 then you will be able to connect to Apache Beam and start building your own 
Pipelines.
+DebeziumIO uses [Debezium Connectors 
v3.1](https://debezium.io/documentation/reference/3.1/connectors/) to connect 
to Apache Beam. All you need to do is choose the Debezium Connector that suits 
your Debezium setup and pick a [Serializable 
Function](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/SerializableFunction.html),
 then you will be able to connect to Apache Beam and start building your own 
Pipelines.
 
 These connectors have been successfully tested and are known to work fine:
 *  MySQL Connector
@@ -65,7 +65,7 @@ You can also add more configuration, such as 
Connector-specific Properties with
 |Method|Params|Description|
 |-|-|-|
 |`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a 
custom property to the connector.|
-> **Note:** For more information on custom properties, see your [Debezium 
Connector](https://debezium.io/documentation/reference/1.3/connectors/) 
specific documentation.
+> **Note:** For more information on custom properties, see your [Debezium 
Connector](https://debezium.io/documentation/reference/3.1/connectors/) 
specific documentation.
 
 Example of a MySQL Debezium Connector setup:
 ```
@@ -149,7 +149,7 @@ p.run().waitUntilFinish();
 
 ### KafkaSourceConsumerFn and Restrictions
 
-KafkaSourceConsumerFn (KSC onwards) is a 
[DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html)
 in charge of the Database replication and CDC.
+KafkaSourceConsumerFn (KSC onwards) is a 
[DoFn](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/DoFn.html)
 in charge of the Database replication and CDC.
 
 There are two ways of initializing KSC:
 *  Restricted by number of records
@@ -164,9 +164,9 @@ By default, DebeziumIO initializes it with the former, 
though user may choose th
 
 ### Requirements and Supported versions
 
--  JDK v8
--  Debezium Connectors v1.3
--  Apache Beam 2.25
+-  JDK v17
+-  Debezium Connectors v3.1
+-  Apache Beam 2.66
 
 ## Running Unit Tests
 
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
index 30ad8a5f9f7..be418aed5ca 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
@@ -75,7 +75,6 @@ import org.slf4j.LoggerFactory;
  *             .withConnectorClass(MySqlConnector.class)
  *             .withConnectionProperty("database.server.id", "184054")
  *             .withConnectionProperty("database.server.name", "serverid")
- *             .withConnectionProperty("database.include.list", "dbname")
  *             .withConnectionProperty("database.history", 
DebeziumSDFDatabaseHistory.class.getName())
  *             .withConnectionProperty("include.schema.changes", "false");
  *
@@ -513,9 +512,14 @@ public class DebeziumIO {
       checkArgument(
           getConnectionProperties().get() != null, "connectionProperties can 
not be null");
 
-      ConnectorConfiguration config = builder().build();
-      config.getConnectionProperties().get().putIfAbsent(key, value);
-      return config;
+      // Create a new map, copy existing properties if they exist, or start 
fresh.
+      Map<String, String> newRawMap = new 
HashMap<>(getConnectionProperties().get());
+      newRawMap.put(key, value);
+      // Create a new ValueProvider for the updated map.
+      ValueProvider<Map<String, String>> newConnectionPropertiesProvider =
+          ValueProvider.StaticValueProvider.of(newRawMap);
+      // Create a new ConnectorConfiguration instance , replace only the 
connectionProperties field.
+      return 
builder().setConnectionProperties(newConnectionPropertiesProvider).build();
     }
 
     /**
@@ -554,10 +558,16 @@ public class DebeziumIO {
         configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue());
       }
 
-      // Set default Database History impl. if not provided
+      // Set default Database History impl. if not provided implementation and 
Kafka topic prefix,
+      // if not provided
       configuration.computeIfAbsent(
           "database.history",
           k -> 
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName());
+      configuration.computeIfAbsent("topic.prefix", k -> 
"beam-debezium-connector");
+      configuration.computeIfAbsent(
+          "schema.history.internal.kafka.bootstrap.servers", k -> 
"localhost:9092");
+      configuration.computeIfAbsent(
+          "schema.history.internal.kafka.topic", k -> 
"schema-changes.inventory");
 
       String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> 
").join(configuration);
       LOG.debug("---------------- Connector configuration: {}", 
stringProperties);
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
index 9f227708e5e..a0838174759 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
@@ -128,7 +128,7 @@ public class DebeziumReadSchemaTransformProvider
             String[] parts = connectionProperty.split("=", -1);
             String key = parts[0];
             String value = parts[1];
-            connectorConfiguration.withConnectionProperty(key, value);
+            connectorConfiguration = 
connectorConfiguration.withConnectionProperty(key, value);
           }
         }
 
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
index 54330d62047..a1edfb21054 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -22,9 +22,9 @@ import static 
org.apache.beam.io.debezium.KafkaConnectUtils.debeziumRecordInstan
 import io.debezium.document.Document;
 import io.debezium.document.DocumentReader;
 import io.debezium.document.DocumentWriter;
-import io.debezium.relational.history.AbstractDatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.AbstractSchemaHistory;
 import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.SchemaHistoryException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
@@ -474,7 +474,7 @@ public class KafkaSourceConsumerFn<T> extends 
DoFn<Map<String, String>, T> {
     }
   }
 
-  public static class DebeziumSDFDatabaseHistory extends 
AbstractDatabaseHistory {
+  public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory 
{
     private List<byte[]> history;
 
     public DebeziumSDFDatabaseHistory() {
@@ -497,7 +497,7 @@ public class KafkaSourceConsumerFn<T> extends 
DoFn<Map<String, String>, T> {
     }
 
     @Override
-    protected void storeRecord(HistoryRecord record) throws 
DatabaseHistoryException {
+    protected void storeRecord(HistoryRecord record) throws 
SchemaHistoryException {
       LOG.debug("------------- Adding history! {}", record);
 
       
history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document()));
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
index 12ba57bad45..e926a4b3062 100644
--- 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.io.debezium.DebeziumIOPostgresSqlConnectorIT.TABLE
 import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
@@ -50,7 +51,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.Network;
 import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 import org.testcontainers.utility.DockerImageName;
 
@@ -63,13 +66,20 @@ public class DebeziumIOMySqlConnectorIT {
    *
    * <p>Creates a docker container using the image used by the debezium 
tutorial.
    */
+  private static final DockerImageName KAFKA_IMAGE =
+      DockerImageName.parse("confluentinc/cp-kafka:7.6.0");
+
+  @ClassRule public static Network network = Network.newNetwork();
+
   @ClassRule
   public static final MySQLContainer<?> MY_SQL_CONTAINER =
       new MySQLContainer<>(
-              DockerImageName.parse("debezium/example-mysql:1.4")
+              
DockerImageName.parse("quay.io/debezium/example-mysql:3.1.1.Final")
                   .asCompatibleSubstituteFor("mysql"))
           .withPassword("debezium")
           .withUsername("mysqluser")
+          .withNetwork(network)
+          .withNetworkAliases("mysql")
           .withExposedPorts(3306)
           .waitingFor(
               new HttpWaitStrategy()
@@ -77,16 +87,31 @@ public class DebeziumIOMySqlConnectorIT {
                   .forStatusCodeMatching(response -> response == 200)
                   .withStartupTimeout(Duration.ofMinutes(2)));
 
+  @ClassRule
+  public static final KafkaContainer KAFKA_CONTAINER =
+      new KafkaContainer(KAFKA_IMAGE)
+          .withNetwork(network)
+          .withNetworkAliases("kafka")
+          .dependsOn(MY_SQL_CONTAINER);
+
   public static DataSource getMysqlDatasource(Void unused) {
     HikariConfig hikariConfig = new HikariConfig();
-    hikariConfig.setJdbcUrl(MY_SQL_CONTAINER.getJdbcUrl());
+
+    String jdbcUrl =
+        String.format(
+            "jdbc:mysql://%s:%d/inventory?allowPublicKeyRetrieval=true",
+            MY_SQL_CONTAINER.getHost(), MY_SQL_CONTAINER.getMappedPort(3306));
+    LOG.info("Hikari DataSource JDBC URL for mysqluser: {}", jdbcUrl);
+
+    hikariConfig.setJdbcUrl(jdbcUrl);
     hikariConfig.setUsername(MY_SQL_CONTAINER.getUsername());
     hikariConfig.setPassword(MY_SQL_CONTAINER.getPassword());
+    hikariConfig.addDataSourceProperty("allowPublicKeyRetrieval", "true");
     hikariConfig.setDriverClassName(MY_SQL_CONTAINER.getDriverClassName());
     return new HikariDataSource(hikariConfig);
   }
 
-  private void monitorEssentialMetrics() {
+  private void monitorEssentialMetrics() throws SQLException {
     DataSource ds = getMysqlDatasource(null);
     try {
       Connection conn = ds.getConnection();
@@ -98,11 +123,16 @@ public class DebeziumIOMySqlConnectorIT {
           rs.close();
           Thread.sleep(4000);
         } else {
-          throw new IllegalArgumentException("OIOI");
+          throw new IllegalArgumentException(
+              "Illegal Argument Exception in monitorEssentialMetrics.");
         }
       }
-    } catch (InterruptedException | SQLException ex) {
-      throw new IllegalArgumentException("Oi", ex);
+    } catch (SQLException ex) {
+      LOG.error("SQL error in monitoring thread. Shutting down.", ex);
+      throw (ex);
+    } catch (InterruptedException ex) {
+      LOG.info("Monitoring thread interrupted. Shutting down.");
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -110,7 +140,6 @@ public class DebeziumIOMySqlConnectorIT {
   public void testDebeziumSchemaTransformMysqlRead() throws 
InterruptedException {
     long writeSize = 500L;
     long testTime = writeSize * 200L;
-    MY_SQL_CONTAINER.start();
 
     PipelineOptions options = PipelineOptionsFactory.create();
     Pipeline writePipeline = Pipeline.create(options);
@@ -158,6 +187,17 @@ public class DebeziumIOMySqlConnectorIT {
                             .setHost("localhost")
                             .setTable("inventory.customers")
                             .setPort(MY_SQL_CONTAINER.getMappedPort(3306))
+                            .setDebeziumConnectionProperties(
+                                Lists.newArrayList(
+                                    "database.server.id=1849055",
+                                    
"schema.history.internal.kafka.bootstrap.servers="
+                                        + 
KAFKA_CONTAINER.getBootstrapServers(),
+                                    
"schema.history.internal.kafka.topic=schema-history-mysql-transform-"
+                                        + System.nanoTime(),
+                                    
"schema.history.internal=io.debezium.storage.kafka.history.KafkaSchemaHistory",
+                                    
"schema.history.internal.store.only.captured.tables.ddl=false",
+                                    "table.include.list=inventory.customers",
+                                    "snapshot.mode=initial_only"))
                             .build()))
             .get("output");
 
@@ -169,13 +209,29 @@ public class DebeziumIOMySqlConnectorIT {
               return null;
             });
     Thread writeThread = new Thread(() -> 
writePipeline.run().waitUntilFinish());
-    Thread monitorThread = new Thread(this::monitorEssentialMetrics);
+    Thread monitorThread =
+        new Thread(
+            () -> {
+              try {
+                monitorEssentialMetrics();
+              } catch (SQLException e) {
+                e.printStackTrace();
+                fail("Failed because of SQLException in 
monitorEssentialMetrics!");
+              }
+            });
     monitorThread.start();
     writeThread.start();
-    readPipeline.run().waitUntilFinish();
+
     writeThread.join();
+    LOG.info("Write thread for SchemaTransform test joined.");
+
+    LOG.info("Starting read pipeline for SchemaTransform test...");
+    readPipeline.run().waitUntilFinish();
+    LOG.info("Read pipeline for SchemaTransform test finished.");
+
     monitorThread.interrupt();
     monitorThread.join();
+    LOG.info("Monitor thread for SchemaTransform test joined.");
   }
 
   /**
@@ -185,9 +241,11 @@ public class DebeziumIOMySqlConnectorIT {
    */
   @Test
   public void testDebeziumIOMySql() {
-    MY_SQL_CONTAINER.start();
 
-    String host = MY_SQL_CONTAINER.getContainerIpAddress();
+    String kafkaBootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
+    String schemaHistoryTopic = "mysql-schema-history-io-" + System.nanoTime();
+
+    String host = MY_SQL_CONTAINER.getHost();
     String port = MY_SQL_CONTAINER.getMappedPort(3306).toString();
 
     PipelineOptions options = PipelineOptionsFactory.create();
@@ -203,15 +261,24 @@ public class DebeziumIOMySqlConnectorIT {
                         .withHostName(host)
                         .withPort(port)
                         .withConnectionProperty("database.server.id", "184054")
-                        .withConnectionProperty("database.server.name", 
"dbserver1")
                         .withConnectionProperty("database.include.list", 
"inventory")
-                        .withConnectionProperty("include.schema.changes", 
"false"))
+                        .withConnectionProperty("include.schema.changes", 
"false")
+                        .withConnectionProperty(
+                            "schema.history.internal.kafka.bootstrap.servers",
+                            kafkaBootstrapServers)
+                        .withConnectionProperty(
+                            "schema.history.internal.kafka.topic", 
schemaHistoryTopic)
+                        .withConnectionProperty(
+                            "schema.history.internal",
+                            
"io.debezium.storage.kafka.history.KafkaSchemaHistory")
+                        .withConnectionProperty(
+                            
"schema.history.internal.store.only.captured.tables.ddl", "true"))
                 .withFormatFunction(new 
SourceRecordJson.SourceRecordJsonMapper())
                 .withMaxNumberOfRecords(30)
                 .withCoder(StringUtf8Coder.of()));
     String expected =
-        
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
-            + 
"\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,"
+        
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"3.1.1.Final\",\"name\":\"beam-debezium-connector\","
+            + 
"\"database\":\"inventory\",\"schema\":\"binlog.000002\",\"table\":\"addresses\"},\"before\":null,"
             + "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\","
             + "\"street\":\"3183 Moore 
Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,"
             + "\"type\":\"SHIPPING\"}}}";
@@ -224,6 +291,5 @@ public class DebeziumIOMySqlConnectorIT {
             });
 
     p.run().waitUntilFinish();
-    MY_SQL_CONTAINER.stop();
   }
 }
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
index 970d9483850..c1f67e4e44d 100644
--- 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
@@ -56,7 +56,7 @@ public class DebeziumIOPostgresSqlConnectorIT {
   @ClassRule
   public static final PostgreSQLContainer<?> POSTGRES_SQL_CONTAINER =
       new PostgreSQLContainer<>(
-              DockerImageName.parse("quay.io/debezium/example-postgres:latest")
+              
DockerImageName.parse("quay.io/debezium/example-postgres:3.1.1.Final")
                   .asCompatibleSubstituteFor("postgres"))
           .withPassword("dbz")
           .withUsername("debezium")
@@ -74,8 +74,10 @@ public class DebeziumIOPostgresSqlConnectorIT {
   static DataSource getPostgresDatasource() {
     PGSimpleDataSource dataSource = new PGSimpleDataSource();
     dataSource.setDatabaseName("inventory");
-    dataSource.setServerName(POSTGRES_SQL_CONTAINER.getContainerIpAddress());
-    dataSource.setPortNumber(POSTGRES_SQL_CONTAINER.getMappedPort(5432));
+    String[] serverNames = new String[] {POSTGRES_SQL_CONTAINER.getHost()};
+    dataSource.setServerNames(serverNames);
+    int[] ports = new int[] {POSTGRES_SQL_CONTAINER.getMappedPort(5432)};
+    dataSource.setPortNumbers(ports);
     dataSource.setUser("debezium");
     dataSource.setPassword("dbz");
     return dataSource;
@@ -156,7 +158,7 @@ public class DebeziumIOPostgresSqlConnectorIT {
   public void testDebeziumIOPostgresSql() {
     POSTGRES_SQL_CONTAINER.start();
 
-    String host = POSTGRES_SQL_CONTAINER.getContainerIpAddress();
+    String host = POSTGRES_SQL_CONTAINER.getHost();
     String port = POSTGRES_SQL_CONTAINER.getMappedPort(5432).toString();
 
     PipelineOptions options = PipelineOptionsFactory.create();
@@ -173,13 +175,12 @@ public class DebeziumIOPostgresSqlConnectorIT {
                         .withPort(port)
                         .withConnectionProperty("database.dbname", "inventory")
                         .withConnectionProperty("database.server.name", 
"dbserver1")
-                        .withConnectionProperty("database.include.list", 
"inventory")
                         .withConnectionProperty("include.schema.changes", 
"false"))
                 .withFormatFunction(new 
SourceRecordJson.SourceRecordJsonMapper())
                 .withMaxNumberOfRecords(30)
                 .withCoder(StringUtf8Coder.of()));
     String expected =
-        
"{\"metadata\":{\"connector\":\"postgresql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
+        
"{\"metadata\":{\"connector\":\"postgresql\",\"version\":\"3.1.1.Final\",\"name\":\"beam-debezium-connector\","
             + 
"\"database\":\"inventory\",\"schema\":\"inventory\",\"table\":\"customers\"},\"before\":null,"
             + 
"\"after\":{\"fields\":{\"last_name\":\"Thomas\",\"id\":1001,\"first_name\":\"Sally\","
             + "\"email\":\"[email protected]\"}}}";
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
index 12d1d610ff9..88ecc4fdd90 100644
--- 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
@@ -44,7 +44,6 @@ public class DebeziumIOTest implements Serializable {
           .withConnectorClass(MySqlConnector.class)
           .withConnectionProperty("database.server.id", "184054")
           .withConnectionProperty("database.server.name", "dbserver1")
-          .withConnectionProperty("database.include.list", "inventory")
           .withConnectionProperty(
               "database.history", 
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName())
           .withConnectionProperty("include.schema.changes", "false");
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
index c4b5d2d1f89..2fc8996ba55 100644
--- 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.io.debezium;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 
+import io.debezium.DebeziumException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.stream.Collectors;
@@ -28,10 +29,11 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
-import org.apache.kafka.connect.errors.ConnectException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.hamcrest.Matchers;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.testcontainers.containers.Container;
@@ -43,6 +45,8 @@ import org.testcontainers.utility.DockerImageName;
 @RunWith(Parameterized.class)
 public class DebeziumReadSchemaTransformTest {
 
+  @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
   @ClassRule
   public static final PostgreSQLContainer<?> POSTGRES_SQL_CONTAINER =
       new PostgreSQLContainer<>(
@@ -104,6 +108,13 @@ public class DebeziumReadSchemaTransformTest {
                 // is "database.table".
                 .setTable("inventory.customers")
                 .setPort(port)
+                .setDebeziumConnectionProperties(
+                    Lists.newArrayList(
+                        "database.server.id=579676",
+                        
"schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory",
+                        String.format(
+                            "schema.history.internal.file.filename=%s",
+                            
tempFolder.getRoot().toPath().resolve("schema_history.dat"))))
                 .build());
   }
 
@@ -124,15 +135,16 @@ public class DebeziumReadSchemaTransformTest {
         result.getSchema().getFields().stream()
             .map(field -> field.getName())
             .collect(Collectors.toList()),
-        Matchers.containsInAnyOrder("before", "after", "source", "op", 
"ts_ms", "transaction"));
+        Matchers.containsInAnyOrder(
+            "before", "after", "source", "transaction", "op", "ts_ms", 
"ts_us", "ts_ns"));
   }
 
   @Test
   public void testWrongUser() {
     Pipeline readPipeline = Pipeline.create();
-    ConnectException ex =
+    DebeziumException ex =
         assertThrows(
-            ConnectException.class,
+            DebeziumException.class,
             () -> {
               PCollectionRowTuple.empty(readPipeline)
                   .apply(
@@ -151,9 +163,9 @@ public class DebeziumReadSchemaTransformTest {
   @Test
   public void testWrongPassword() {
     Pipeline readPipeline = Pipeline.create();
-    ConnectException ex =
+    DebeziumException ex =
         assertThrows(
-            ConnectException.class,
+            DebeziumException.class,
             () -> {
               PCollectionRowTuple.empty(readPipeline)
                   .apply(
@@ -172,9 +184,9 @@ public class DebeziumReadSchemaTransformTest {
   @Test
   public void testWrongPort() {
     Pipeline readPipeline = Pipeline.create();
-    ConnectException ex =
+    DebeziumException ex =
         assertThrows(
-            ConnectException.class,
+            DebeziumException.class,
             () -> {
               PCollectionRowTuple.empty(readPipeline)
                   .apply(makePtransform(userName, password, database, 12345, 
"localhost"))


Reply via email to