This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cecfa6128ce [IO] Update Debezium in DebeziumIO to 3.1.1 (#34763)
cecfa6128ce is described below
commit cecfa6128ceaecf3b8d02a8a42788b3505595845
Author: Tobias Kaymak <[email protected]>
AuthorDate: Sat Jun 14 03:01:33 2025 +0200
[IO] Update Debezium in DebeziumIO to 3.1.1 (#34763)
* Updating Debezium IO to 3.1.1
Enforce JDK17 in build.gradle of Debezium IO
* adjust to review comments by @Abacn
* cleanup
* mention which PR needs to merge before unpinning
* Mention Beam version 2.66 in the README for Debezium
---------
Co-authored-by: Shunping Huang <[email protected]>
---
CHANGES.md | 1 +
sdks/java/io/debezium/build.gradle | 85 +++++++++++++------
.../io/debezium/expansion-service/build.gradle | 5 +-
sdks/java/io/debezium/src/README.md | 12 +--
.../org/apache/beam/io/debezium/DebeziumIO.java | 20 +++--
.../DebeziumReadSchemaTransformProvider.java | 2 +-
.../beam/io/debezium/KafkaSourceConsumerFn.java | 8 +-
.../io/debezium/DebeziumIOMySqlConnectorIT.java | 98 ++++++++++++++++++----
.../debezium/DebeziumIOPostgresSqlConnectorIT.java | 13 +--
.../apache/beam/io/debezium/DebeziumIOTest.java | 1 -
.../debezium/DebeziumReadSchemaTransformTest.java | 28 +++++--
11 files changed, 198 insertions(+), 75 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d4af889b2d2..2e75be8a79c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
## I/Os
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* Debezium IO upgraded to 3.1.1 requires Java 17 (Java)
([#34747](https://github.com/apache/beam/issues/34747)).
## New Features / Improvements
diff --git a/sdks/java/io/debezium/build.gradle
b/sdks/java/io/debezium/build.gradle
index 2070bcfc873..dcdfd33ce0a 100644
--- a/sdks/java/io/debezium/build.gradle
+++ b/sdks/java/io/debezium/build.gradle
@@ -24,6 +24,7 @@ applyJavaNature(
[id: 'io.confluent', url:
'https://packages.confluent.io/maven/']
],
enableSpotbugs: false,
+ requireJavaVersion: JavaVersion.VERSION_17,
)
provideIntegrationTestingDependencies()
@@ -38,10 +39,16 @@ dependencies {
implementation library.java.joda_time
provided library.java.jackson_dataformat_csv
permitUnusedDeclared library.java.jackson_dataformat_csv
- testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
- testImplementation project(path: ":sdks:java:io:common")
+
+ // Kafka connect dependencies
+ implementation "org.apache.kafka:connect-api:3.9.0"
+
+ // Debezium dependencies
+ implementation group: 'io.debezium', name: 'debezium-core', version:
'3.1.1.Final'
// Test dependencies
+ testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
+ testImplementation project(path: ":sdks:java:io:common")
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:jdbc")
testRuntimeOnly library.java.slf4j_jdk14
@@ -49,29 +56,66 @@ dependencies {
testImplementation project(":runners:google-cloud-dataflow-java")
testImplementation library.java.hamcrest
testImplementation library.java.testcontainers_base
- testImplementation library.java.testcontainers_mysql
- testImplementation library.java.testcontainers_postgresql
- // TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires
Java11+
- testImplementation 'com.zaxxer:HikariCP:4.0.3'
+ testImplementation "org.testcontainers:kafka"
+ testImplementation "org.testcontainers:mysql"
+ testImplementation "org.testcontainers:postgresql"
+ testImplementation
"io.debezium:debezium-testing-testcontainers:3.1.1.Final"
+ testImplementation 'com.zaxxer:HikariCP:5.1.0'
- // Kafka connect dependencies
- implementation "org.apache.kafka:connect-api:2.5.0"
- implementation "org.apache.kafka:connect-json:2.5.0"
- permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761
+ // Debezium connector implementations for testing
+ testImplementation group: 'io.debezium', name: 'debezium-connector-mysql',
version: '3.1.1.Final'
+ testImplementation group: 'io.debezium', name:
'debezium-connector-postgres', version: '3.1.1.Final'
+}
- // Debezium dependencies
- implementation group: 'io.debezium', name: 'debezium-core', version:
'1.3.1.Final'
- testImplementation group: 'io.debezium', name: 'debezium-connector-mysql',
version: '1.3.1.Final'
- testImplementation group: 'io.debezium', name:
'debezium-connector-postgres', version: '1.3.1.Final'
+// TODO: Remove pin after Beam has unpinned it (PR #35231)
+// Pin the Antlr version
+configurations.all {
+ resolutionStrategy {
+ force 'org.antlr:antlr4:4.10', 'org.antlr:antlr4-runtime:4.10'
+ }
+}
+
+// TODO: Remove pin after upgrading Beam's Jackson version
+// Force Jackson versions for the test runtime classpath
+configurations.named("testRuntimeClasspath") {
+ resolutionStrategy.force (
+ 'com.fasterxml.jackson.core:jackson-core:2.17.1',
+ 'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
+ 'com.fasterxml.jackson.core:jackson-databind:2.17.1',
+ 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1',
+ 'com.fasterxml.jackson.module:jackson-module-afterburner:2.17.1'
+ )
+}
+
+def configureTestJvmArgs(Task task) {
+ List<String> currentJvmArgs = task.jvmArgs ? new ArrayList<>(task.jvmArgs)
: new ArrayList<>()
+
+ // Add standard opens required for Jackson, Afterburner, and other
reflective frameworks
+ // when dealing with Java Modules or complex classloader scenarios.
+ currentJvmArgs.addAll([
+ '--add-opens=java.base/java.lang=ALL-UNNAMED',
+ '--add-opens=java.base/java.util=ALL-UNNAMED',
+ '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
+ '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
+ '--add-opens=java.base/java.io=ALL-UNNAMED',
+ '--add-opens=java.base/java.nio=ALL-UNNAMED',
+ '--add-opens=java.base/java.math=ALL-UNNAMED',
+ '--add-opens=java.base/java.time=ALL-UNNAMED', // For JSR310 types
+ ])
+
+ task.jvmArgs = currentJvmArgs.unique() // Assign the new, filtered list
back, removing duplicates
+ project.logger.lifecycle("Task ${task.name} final jvmArgs:
${task.jvmArgs.join(' ')}")
}
test {
testLogging {
outputs.upToDateWhen {false}
showStandardStreams = true
+ exceptionFormat = 'full'
}
-}
+ configureTestJvmArgs(delegate)
+}
task integrationTest(type: Test, dependsOn: processTestResources) {
group = "Verification"
@@ -89,14 +133,3 @@ task integrationTest(type: Test, dependsOn:
processTestResources) {
useJUnit {
}
}
-
-configurations.all (Configuration it) -> {
- resolutionStrategy {
- // Force protobuf 3 because debezium is currently incompatible with
protobuf 4.
- // TODO - remove this and upgrade the version of debezium once a proto-4
compatible version is available
- // https://github.com/apache/beam/pull/33526 does some of this, but was
abandoned because it still doesn't
- // work with protobuf 4.
- force "com.google.protobuf:protobuf-java:3.25.5"
- force "com.google.protobuf:protobuf-java-util:3.25.5"
- }
-}
diff --git a/sdks/java/io/debezium/expansion-service/build.gradle
b/sdks/java/io/debezium/expansion-service/build.gradle
index 3cc6905893f..3c244d3bcef 100644
--- a/sdks/java/io/debezium/expansion-service/build.gradle
+++ b/sdks/java/io/debezium/expansion-service/build.gradle
@@ -25,6 +25,7 @@ applyJavaNature(
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
+ requireJavaVersion: JavaVersion.VERSION_17,
)
description = "Apache Beam :: SDKs :: Java :: IO :: Debezium :: Expansion
Service"
@@ -38,10 +39,10 @@ dependencies {
runtimeOnly library.java.slf4j_jdk14
// Debezium runtime dependencies
- def debezium_version = '1.3.1.Final'
+ def debezium_version = '3.1.1.Final'
runtimeOnly group: 'io.debezium', name: 'debezium-connector-mysql',
version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-postgres',
version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-sqlserver',
version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-oracle',
version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version:
debezium_version
-}
\ No newline at end of file
+}
diff --git a/sdks/java/io/debezium/src/README.md
b/sdks/java/io/debezium/src/README.md
index 4cf9be81c61..e56ac370b70 100644
--- a/sdks/java/io/debezium/src/README.md
+++ b/sdks/java/io/debezium/src/README.md
@@ -25,7 +25,7 @@ DebeziumIO is an Apache Beam connector that lets users
connect their Events-Driv
### Getting Started
-DebeziumIO uses [Debezium Connectors
v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect
to Apache Beam. All you need to do is choose the Debezium Connector that suits
your Debezium setup and pick a [Serializable
Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html),
then you will be able to connect to Apache Beam and start building your own
Pipelines.
+DebeziumIO uses [Debezium Connectors
v3.1](https://debezium.io/documentation/reference/3.1/connectors/) to connect
to Apache Beam. All you need to do is choose the Debezium Connector that suits
your Debezium setup and pick a [Serializable
Function](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/SerializableFunction.html),
then you will be able to connect to Apache Beam and start building your own
Pipelines.
These connectors have been successfully tested and are known to work fine:
* MySQL Connector
@@ -65,7 +65,7 @@ You can also add more configuration, such as
Connector-specific Properties with
|Method|Params|Description|
|-|-|-|
|`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a
custom property to the connector.|
-> **Note:** For more information on custom properties, see your [Debezium
Connector](https://debezium.io/documentation/reference/1.3/connectors/)
specific documentation.
+> **Note:** For more information on custom properties, see your [Debezium
Connector](https://debezium.io/documentation/reference/3.1/connectors/)
specific documentation.
Example of a MySQL Debezium Connector setup:
```
@@ -149,7 +149,7 @@ p.run().waitUntilFinish();
### KafkaSourceConsumerFn and Restrictions
-KafkaSourceConsumerFn (KSC onwards) is a
[DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html)
in charge of the Database replication and CDC.
+KafkaSourceConsumerFn (KSC onwards) is a
[DoFn](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/DoFn.html)
in charge of the Database replication and CDC.
There are two ways of initializing KSC:
* Restricted by number of records
@@ -164,9 +164,9 @@ By default, DebeziumIO initializes it with the former,
though user may choose th
### Requirements and Supported versions
-- JDK v8
-- Debezium Connectors v1.3
-- Apache Beam 2.25
+- JDK v17
+- Debezium Connectors v3.1
+- Apache Beam 2.66
## Running Unit Tests
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
index 30ad8a5f9f7..be418aed5ca 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
@@ -75,7 +75,6 @@ import org.slf4j.LoggerFactory;
* .withConnectorClass(MySqlConnector.class)
* .withConnectionProperty("database.server.id", "184054")
* .withConnectionProperty("database.server.name", "serverid")
- * .withConnectionProperty("database.include.list", "dbname")
* .withConnectionProperty("database.history",
DebeziumSDFDatabaseHistory.class.getName())
* .withConnectionProperty("include.schema.changes", "false");
*
@@ -513,9 +512,14 @@ public class DebeziumIO {
checkArgument(
getConnectionProperties().get() != null, "connectionProperties can
not be null");
- ConnectorConfiguration config = builder().build();
- config.getConnectionProperties().get().putIfAbsent(key, value);
- return config;
+ // Create a new map, copy existing properties if they exist, or start
fresh.
+ Map<String, String> newRawMap = new
HashMap<>(getConnectionProperties().get());
+ newRawMap.put(key, value);
+ // Create a new ValueProvider for the updated map.
+ ValueProvider<Map<String, String>> newConnectionPropertiesProvider =
+ ValueProvider.StaticValueProvider.of(newRawMap);
+ // Create a new ConnectorConfiguration instance , replace only the
connectionProperties field.
+ return
builder().setConnectionProperties(newConnectionPropertiesProvider).build();
}
/**
@@ -554,10 +558,16 @@ public class DebeziumIO {
configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue());
}
- // Set default Database History impl. if not provided
+ // Set default Database History impl. if not provided implementation and
Kafka topic prefix,
+ // if not provided
configuration.computeIfAbsent(
"database.history",
k ->
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName());
+ configuration.computeIfAbsent("topic.prefix", k ->
"beam-debezium-connector");
+ configuration.computeIfAbsent(
+ "schema.history.internal.kafka.bootstrap.servers", k ->
"localhost:9092");
+ configuration.computeIfAbsent(
+ "schema.history.internal.kafka.topic", k ->
"schema-changes.inventory");
String stringProperties = Joiner.on('\n').withKeyValueSeparator(" ->
").join(configuration);
LOG.debug("---------------- Connector configuration: {}",
stringProperties);
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
index 9f227708e5e..a0838174759 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
@@ -128,7 +128,7 @@ public class DebeziumReadSchemaTransformProvider
String[] parts = connectionProperty.split("=", -1);
String key = parts[0];
String value = parts[1];
- connectorConfiguration.withConnectionProperty(key, value);
+ connectorConfiguration =
connectorConfiguration.withConnectionProperty(key, value);
}
}
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
index 54330d62047..a1edfb21054 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -22,9 +22,9 @@ import static
org.apache.beam.io.debezium.KafkaConnectUtils.debeziumRecordInstan
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
-import io.debezium.relational.history.AbstractDatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.SchemaHistoryException;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
@@ -474,7 +474,7 @@ public class KafkaSourceConsumerFn<T> extends
DoFn<Map<String, String>, T> {
}
}
- public static class DebeziumSDFDatabaseHistory extends
AbstractDatabaseHistory {
+ public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory
{
private List<byte[]> history;
public DebeziumSDFDatabaseHistory() {
@@ -497,7 +497,7 @@ public class KafkaSourceConsumerFn<T> extends
DoFn<Map<String, String>, T> {
}
@Override
- protected void storeRecord(HistoryRecord record) throws
DatabaseHistoryException {
+ protected void storeRecord(HistoryRecord record) throws
SchemaHistoryException {
LOG.debug("------------- Adding history! {}", record);
history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document()));
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
index 12ba57bad45..e926a4b3062 100644
---
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.io.debezium.DebeziumIOPostgresSqlConnectorIT.TABLE
import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.fail;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@@ -50,7 +51,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.DockerImageName;
@@ -63,13 +66,20 @@ public class DebeziumIOMySqlConnectorIT {
*
* <p>Creates a docker container using the image used by the debezium
tutorial.
*/
+ private static final DockerImageName KAFKA_IMAGE =
+ DockerImageName.parse("confluentinc/cp-kafka:7.6.0");
+
+ @ClassRule public static Network network = Network.newNetwork();
+
@ClassRule
public static final MySQLContainer<?> MY_SQL_CONTAINER =
new MySQLContainer<>(
- DockerImageName.parse("debezium/example-mysql:1.4")
+
DockerImageName.parse("quay.io/debezium/example-mysql:3.1.1.Final")
.asCompatibleSubstituteFor("mysql"))
.withPassword("debezium")
.withUsername("mysqluser")
+ .withNetwork(network)
+ .withNetworkAliases("mysql")
.withExposedPorts(3306)
.waitingFor(
new HttpWaitStrategy()
@@ -77,16 +87,31 @@ public class DebeziumIOMySqlConnectorIT {
.forStatusCodeMatching(response -> response == 200)
.withStartupTimeout(Duration.ofMinutes(2)));
+ @ClassRule
+ public static final KafkaContainer KAFKA_CONTAINER =
+ new KafkaContainer(KAFKA_IMAGE)
+ .withNetwork(network)
+ .withNetworkAliases("kafka")
+ .dependsOn(MY_SQL_CONTAINER);
+
public static DataSource getMysqlDatasource(Void unused) {
HikariConfig hikariConfig = new HikariConfig();
- hikariConfig.setJdbcUrl(MY_SQL_CONTAINER.getJdbcUrl());
+
+ String jdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%d/inventory?allowPublicKeyRetrieval=true",
+ MY_SQL_CONTAINER.getHost(), MY_SQL_CONTAINER.getMappedPort(3306));
+ LOG.info("Hikari DataSource JDBC URL for mysqluser: {}", jdbcUrl);
+
+ hikariConfig.setJdbcUrl(jdbcUrl);
hikariConfig.setUsername(MY_SQL_CONTAINER.getUsername());
hikariConfig.setPassword(MY_SQL_CONTAINER.getPassword());
+ hikariConfig.addDataSourceProperty("allowPublicKeyRetrieval", "true");
hikariConfig.setDriverClassName(MY_SQL_CONTAINER.getDriverClassName());
return new HikariDataSource(hikariConfig);
}
- private void monitorEssentialMetrics() {
+ private void monitorEssentialMetrics() throws SQLException {
DataSource ds = getMysqlDatasource(null);
try {
Connection conn = ds.getConnection();
@@ -98,11 +123,16 @@ public class DebeziumIOMySqlConnectorIT {
rs.close();
Thread.sleep(4000);
} else {
- throw new IllegalArgumentException("OIOI");
+ throw new IllegalArgumentException(
+ "Illegal Argument Exception in monitorEssentialMetrics.");
}
}
- } catch (InterruptedException | SQLException ex) {
- throw new IllegalArgumentException("Oi", ex);
+ } catch (SQLException ex) {
+ LOG.error("SQL error in monitoring thread. Shutting down.", ex);
+ throw (ex);
+ } catch (InterruptedException ex) {
+ LOG.info("Monitoring thread interrupted. Shutting down.");
+ Thread.currentThread().interrupt();
}
}
@@ -110,7 +140,6 @@ public class DebeziumIOMySqlConnectorIT {
public void testDebeziumSchemaTransformMysqlRead() throws
InterruptedException {
long writeSize = 500L;
long testTime = writeSize * 200L;
- MY_SQL_CONTAINER.start();
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline writePipeline = Pipeline.create(options);
@@ -158,6 +187,17 @@ public class DebeziumIOMySqlConnectorIT {
.setHost("localhost")
.setTable("inventory.customers")
.setPort(MY_SQL_CONTAINER.getMappedPort(3306))
+ .setDebeziumConnectionProperties(
+ Lists.newArrayList(
+ "database.server.id=1849055",
+
"schema.history.internal.kafka.bootstrap.servers="
+ +
KAFKA_CONTAINER.getBootstrapServers(),
+
"schema.history.internal.kafka.topic=schema-history-mysql-transform-"
+ + System.nanoTime(),
+
"schema.history.internal=io.debezium.storage.kafka.history.KafkaSchemaHistory",
+
"schema.history.internal.store.only.captured.tables.ddl=false",
+ "table.include.list=inventory.customers",
+ "snapshot.mode=initial_only"))
.build()))
.get("output");
@@ -169,13 +209,29 @@ public class DebeziumIOMySqlConnectorIT {
return null;
});
Thread writeThread = new Thread(() ->
writePipeline.run().waitUntilFinish());
- Thread monitorThread = new Thread(this::monitorEssentialMetrics);
+ Thread monitorThread =
+ new Thread(
+ () -> {
+ try {
+ monitorEssentialMetrics();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail("Failed because of SQLException in
monitorEssentialMetrics!");
+ }
+ });
monitorThread.start();
writeThread.start();
- readPipeline.run().waitUntilFinish();
+
writeThread.join();
+ LOG.info("Write thread for SchemaTransform test joined.");
+
+ LOG.info("Starting read pipeline for SchemaTransform test...");
+ readPipeline.run().waitUntilFinish();
+ LOG.info("Read pipeline for SchemaTransform test finished.");
+
monitorThread.interrupt();
monitorThread.join();
+ LOG.info("Monitor thread for SchemaTransform test joined.");
}
/**
@@ -185,9 +241,11 @@ public class DebeziumIOMySqlConnectorIT {
*/
@Test
public void testDebeziumIOMySql() {
- MY_SQL_CONTAINER.start();
- String host = MY_SQL_CONTAINER.getContainerIpAddress();
+ String kafkaBootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
+ String schemaHistoryTopic = "mysql-schema-history-io-" + System.nanoTime();
+
+ String host = MY_SQL_CONTAINER.getHost();
String port = MY_SQL_CONTAINER.getMappedPort(3306).toString();
PipelineOptions options = PipelineOptionsFactory.create();
@@ -203,15 +261,24 @@ public class DebeziumIOMySqlConnectorIT {
.withHostName(host)
.withPort(port)
.withConnectionProperty("database.server.id", "184054")
- .withConnectionProperty("database.server.name",
"dbserver1")
.withConnectionProperty("database.include.list",
"inventory")
- .withConnectionProperty("include.schema.changes",
"false"))
+ .withConnectionProperty("include.schema.changes",
"false")
+ .withConnectionProperty(
+ "schema.history.internal.kafka.bootstrap.servers",
+ kafkaBootstrapServers)
+ .withConnectionProperty(
+ "schema.history.internal.kafka.topic",
schemaHistoryTopic)
+ .withConnectionProperty(
+ "schema.history.internal",
+
"io.debezium.storage.kafka.history.KafkaSchemaHistory")
+ .withConnectionProperty(
+
"schema.history.internal.store.only.captured.tables.ddl", "true"))
.withFormatFunction(new
SourceRecordJson.SourceRecordJsonMapper())
.withMaxNumberOfRecords(30)
.withCoder(StringUtf8Coder.of()));
String expected =
-
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
- +
"\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,"
+
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"3.1.1.Final\",\"name\":\"beam-debezium-connector\","
+ +
"\"database\":\"inventory\",\"schema\":\"binlog.000002\",\"table\":\"addresses\"},\"before\":null,"
+ "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\","
+ "\"street\":\"3183 Moore
Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,"
+ "\"type\":\"SHIPPING\"}}}";
@@ -224,6 +291,5 @@ public class DebeziumIOMySqlConnectorIT {
});
p.run().waitUntilFinish();
- MY_SQL_CONTAINER.stop();
}
}
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
index 970d9483850..c1f67e4e44d 100644
---
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java
@@ -56,7 +56,7 @@ public class DebeziumIOPostgresSqlConnectorIT {
@ClassRule
public static final PostgreSQLContainer<?> POSTGRES_SQL_CONTAINER =
new PostgreSQLContainer<>(
- DockerImageName.parse("quay.io/debezium/example-postgres:latest")
+
DockerImageName.parse("quay.io/debezium/example-postgres:3.1.1.Final")
.asCompatibleSubstituteFor("postgres"))
.withPassword("dbz")
.withUsername("debezium")
@@ -74,8 +74,10 @@ public class DebeziumIOPostgresSqlConnectorIT {
static DataSource getPostgresDatasource() {
PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setDatabaseName("inventory");
- dataSource.setServerName(POSTGRES_SQL_CONTAINER.getContainerIpAddress());
- dataSource.setPortNumber(POSTGRES_SQL_CONTAINER.getMappedPort(5432));
+ String[] serverNames = new String[] {POSTGRES_SQL_CONTAINER.getHost()};
+ dataSource.setServerNames(serverNames);
+ int[] ports = new int[] {POSTGRES_SQL_CONTAINER.getMappedPort(5432)};
+ dataSource.setPortNumbers(ports);
dataSource.setUser("debezium");
dataSource.setPassword("dbz");
return dataSource;
@@ -156,7 +158,7 @@ public class DebeziumIOPostgresSqlConnectorIT {
public void testDebeziumIOPostgresSql() {
POSTGRES_SQL_CONTAINER.start();
- String host = POSTGRES_SQL_CONTAINER.getContainerIpAddress();
+ String host = POSTGRES_SQL_CONTAINER.getHost();
String port = POSTGRES_SQL_CONTAINER.getMappedPort(5432).toString();
PipelineOptions options = PipelineOptionsFactory.create();
@@ -173,13 +175,12 @@ public class DebeziumIOPostgresSqlConnectorIT {
.withPort(port)
.withConnectionProperty("database.dbname", "inventory")
.withConnectionProperty("database.server.name",
"dbserver1")
- .withConnectionProperty("database.include.list",
"inventory")
.withConnectionProperty("include.schema.changes",
"false"))
.withFormatFunction(new
SourceRecordJson.SourceRecordJsonMapper())
.withMaxNumberOfRecords(30)
.withCoder(StringUtf8Coder.of()));
String expected =
-
"{\"metadata\":{\"connector\":\"postgresql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
+
"{\"metadata\":{\"connector\":\"postgresql\",\"version\":\"3.1.1.Final\",\"name\":\"beam-debezium-connector\","
+
"\"database\":\"inventory\",\"schema\":\"inventory\",\"table\":\"customers\"},\"before\":null,"
+
"\"after\":{\"fields\":{\"last_name\":\"Thomas\",\"id\":1001,\"first_name\":\"Sally\","
+ "\"email\":\"[email protected]\"}}}";
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
index 12d1d610ff9..88ecc4fdd90 100644
---
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
@@ -44,7 +44,6 @@ public class DebeziumIOTest implements Serializable {
.withConnectorClass(MySqlConnector.class)
.withConnectionProperty("database.server.id", "184054")
.withConnectionProperty("database.server.name", "dbserver1")
- .withConnectionProperty("database.include.list", "inventory")
.withConnectionProperty(
"database.history",
KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName())
.withConnectionProperty("include.schema.changes", "false");
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
index c4b5d2d1f89..2fc8996ba55 100644
---
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.io.debezium;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
+import io.debezium.DebeziumException;
import java.time.Duration;
import java.util.Arrays;
import java.util.stream.Collectors;
@@ -28,10 +29,11 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.kafka.connect.errors.ConnectException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.containers.Container;
@@ -43,6 +45,8 @@ import org.testcontainers.utility.DockerImageName;
@RunWith(Parameterized.class)
public class DebeziumReadSchemaTransformTest {
+ @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
@ClassRule
public static final PostgreSQLContainer<?> POSTGRES_SQL_CONTAINER =
new PostgreSQLContainer<>(
@@ -104,6 +108,13 @@ public class DebeziumReadSchemaTransformTest {
// is "database.table".
.setTable("inventory.customers")
.setPort(port)
+ .setDebeziumConnectionProperties(
+ Lists.newArrayList(
+ "database.server.id=579676",
+
"schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory",
+ String.format(
+ "schema.history.internal.file.filename=%s",
+
tempFolder.getRoot().toPath().resolve("schema_history.dat"))))
.build());
}
@@ -124,15 +135,16 @@ public class DebeziumReadSchemaTransformTest {
result.getSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toList()),
- Matchers.containsInAnyOrder("before", "after", "source", "op",
"ts_ms", "transaction"));
+ Matchers.containsInAnyOrder(
+ "before", "after", "source", "transaction", "op", "ts_ms",
"ts_us", "ts_ns"));
}
@Test
public void testWrongUser() {
Pipeline readPipeline = Pipeline.create();
- ConnectException ex =
+ DebeziumException ex =
assertThrows(
- ConnectException.class,
+ DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(
@@ -151,9 +163,9 @@ public class DebeziumReadSchemaTransformTest {
@Test
public void testWrongPassword() {
Pipeline readPipeline = Pipeline.create();
- ConnectException ex =
+ DebeziumException ex =
assertThrows(
- ConnectException.class,
+ DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(
@@ -172,9 +184,9 @@ public class DebeziumReadSchemaTransformTest {
@Test
public void testWrongPort() {
Pipeline readPipeline = Pipeline.create();
- ConnectException ex =
+ DebeziumException ex =
assertThrows(
- ConnectException.class,
+ DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(makePtransform(userName, password, database, 12345,
"localhost"))