This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 3f1c893f4 [FLINK-35093][cdc-source/postgres] Support COMMITTED_OFFSET
startup mode to resume from existing replication slot
3f1c893f4 is described below
commit 3f1c893f41ba135cc8c41046423709f0dcd2d973
Author: Vinh Pham <[email protected]>
AuthorDate: Fri Apr 25 10:09:50 2025 +0100
[FLINK-35093][cdc-source/postgres] Support COMMITTED_OFFSET startup mode to
resume from existing replication slot
This closes #3950.
---
.../docs/connectors/flink-sources/postgres-cdc.md | 13 ++-
.../docs/connectors/flink-sources/postgres-cdc.md | 13 ++-
.../base/config/JdbcSourceConfigFactory.java | 1 +
.../connectors/base/dialect/DataSourceDialect.java | 5 ++
.../cdc/connectors/base/options/StartupMode.java | 2 +-
.../connectors/base/options/StartupOptions.java | 11 +++
.../base/source/assigner/StreamSplitAssigner.java | 3 +
.../connection/PostgresConnectionUtils.java | 75 +++++++++++++++++
.../postgres/source/PostgresDialect.java | 12 +++
.../postgres/table/PostgreSQLTableFactory.java | 4 +
.../postgres/table/PostgreSQLConnectorITCase.java | 93 ++++++++++++++++++++++
11 files changed, 227 insertions(+), 5 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
index d3082146b..25bfacede 100644
--- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
@@ -299,8 +299,7 @@ The following options is available only when
`scan.incremental.snapshot.enabled=
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
- <td>Optional startup mode for Postgres CDC consumer, valid enumerations
are "initial"
- and "latest-offset".
+ <td>Optional startup mode for Postgres CDC consumer, valid enumerations
are "initial", "latest-offset", "committed-offset" and "snapshot".
Please see <a href="#startup-reading-position">Startup Reading
Position</a> section for more detailed information.</td>
</tr>
<tr>
@@ -462,6 +461,16 @@ and then PostgreSQL CDC Source assigns the chunks to
multiple readers to read th
The Postgres CDC connector is a Flink Source connector which will read
database snapshot first and then continues to read binlogs with **exactly-once
processing** even failures happen. Please read [How the connector
works](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#how-the-postgresql-connector-works).
+### Startup Reading Position
+
+The config option `scan.startup.mode` specifies the startup mode for
PostgreSQL CDC consumer. The valid enumerations are:
+
+- `initial` (default): Performs an initial snapshot on the monitored database
tables upon first startup, and continue to read the replication slot.
+- `latest-offset`: Never to perform snapshot on the monitored database tables
upon first startup, just read from
+ the end of the replication which means only have the changes since the
connector was started.
+- `committed-offset`: Skip snapshot phase and start reading events from a
`confirmed_flush_lsn` offset of replication slot.
+- `snapshot`: Only the snapshot phase is performed and exits after the
snapshot phase reading is completed.
+
### DataStream Source
The Postgres CDC connector can also be a DataStream source. There are two
modes for the DataStream source:
diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index a1ced51a6..b8e476801 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -296,8 +296,7 @@ The following options is available only when
`scan.incremental.snapshot.enabled=
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
- <td>Optional startup mode for Postgres CDC consumer, valid enumerations
are "initial"
- and "latest-offset".
+ <td>Optional startup mode for Postgres CDC consumer, valid enumerations
are "initial", "latest-offset", "committed-offset" and "snapshot".
Please see <a href="#startup-reading-position">Startup Reading
Position</a> section for more detailed information.</td>
</tr>
<tr>
@@ -463,6 +462,16 @@ and then PostgreSQL CDC Source assigns the chunks to
multiple readers to read th
The Postgres CDC connector is a Flink Source connector which will read
database snapshot first and then continues to read binlogs with **exactly-once
processing** even failures happen. Please read [How the connector
works](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#how-the-postgresql-connector-works).
+### Startup Reading Position
+
+The config option `scan.startup.mode` specifies the startup mode for
PostgreSQL CDC consumer. The valid enumerations are:
+
+- `initial` (default): Performs an initial snapshot on the monitored database
tables upon first startup, and continue to read the replication slot.
+- `latest-offset`: Never to perform snapshot on the monitored database tables
upon first startup, just read from
+ the end of the replication which means only have the changes since the
connector was started.
+- `committed-offset`: Skip snapshot phase and start reading events from a
`confirmed_flush_lsn` offset of replication slot.
+- `snapshot`: Only the snapshot phase is performed and exits after the
snapshot phase reading is completed.
+
### DataStream Source
The Postgres CDC connector can also be a DataStream source. There are two
modes for the DataStream source:
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
index 2ab9bb889..82034f488 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
@@ -209,6 +209,7 @@ public abstract class JdbcSourceConfigFactory implements
Factory<JdbcSourceConfi
case INITIAL:
case SNAPSHOT:
case LATEST_OFFSET:
+ case COMMITTED_OFFSETS:
break;
default:
throw new UnsupportedOperationException(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
index e1809a0ad..63fa7e930 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
@@ -61,6 +61,11 @@ public interface DataSourceDialect<C extends SourceConfig>
extends Serializable,
*/
Offset displayCurrentOffset(C sourceConfig);
+ /** Displays committed offset from the database e.g. query Postgresql
confirmed_lsn */
+ default Offset displayCommittedOffset(C sourceConfig) {
+ throw new UnsupportedOperationException();
+ }
+
/** Check if the CollectionId is case-sensitive or not. */
boolean isDataCollectionIdCaseSensitive(C sourceConfig);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
index 60b6a072c..78075304c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
@@ -30,7 +30,7 @@ public enum StartupMode {
LATEST_OFFSET,
SPECIFIC_OFFSETS,
-
+ COMMITTED_OFFSETS,
TIMESTAMP,
SNAPSHOT
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
index 80f8d2e73..c397a8188 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
@@ -64,6 +64,15 @@ public final class StartupOptions implements Serializable {
return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
}
+ /**
+ * Never to perform snapshot on the monitored database tables upon first
startup, just read from
+ * the previous committed posistion of the change log which means only
have the changes since
+ * the connector last stopped.
+ */
+ public static StartupOptions committed() {
+ return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null,
null);
+ }
+
/**
* Never to perform snapshot on the monitored database tables upon first
startup, and directly
* read change log from the specified offset.
@@ -101,6 +110,7 @@ public final class StartupOptions implements Serializable {
case SNAPSHOT:
case EARLIEST_OFFSET:
case LATEST_OFFSET:
+ case COMMITTED_OFFSETS:
break;
case SPECIFIC_OFFSETS:
checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't
be null");
@@ -118,6 +128,7 @@ public final class StartupOptions implements Serializable {
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.LATEST_OFFSET
|| startupMode == StartupMode.SPECIFIC_OFFSETS
+ || startupMode == StartupMode.COMMITTED_OFFSETS
|| startupMode == StartupMode.TIMESTAMP;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
index cbb749932..758538811 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
@@ -187,6 +187,9 @@ public class StreamSplitAssigner implements SplitAssigner {
startupOptions.specificOffsetFile,
startupOptions.specificOffsetPos.longValue());
break;
+ case COMMITTED_OFFSETS:
+ startingOffset = dialect.displayCommittedOffset(sourceConfig);
+ break;
default:
throw new IllegalStateException(
"Unsupported startup mode " +
startupOptions.startupMode);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java
new file mode 100644
index 000000000..bd45a9d65
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.Utils;
+import io.debezium.time.Conversions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for accessing various Debezium PostgresConnection
private-package classes/methods
+ */
+public class PostgresConnectionUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+
+ public static PostgresOffset committedOffset(
+ PostgresConnection jdbcConnection, String slotName, String
pluginName) {
+ Long lsn;
+ Long txId;
+ try {
+ ServerInfo.ReplicationSlot slot =
+ jdbcConnection.readReplicationSlotInfo(slotName,
pluginName);
+
+ if (slot == ServerInfo.ReplicationSlot.INVALID) {
+ return Utils.currentOffset(jdbcConnection);
+ }
+ lsn = slot.latestFlushedLsn().asLong();
+ txId = slot.catalogXmin();
+ LOGGER.trace("Read xlogStart at '{}' from transaction '{}'",
Lsn.valueOf(lsn), txId);
+ } catch (SQLException | InterruptedException e) {
+ throw new FlinkRuntimeException("Error getting current Lsn/txId "
+ e.getMessage(), e);
+ }
+
+ try {
+ jdbcConnection.commit();
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(
+ "JDBC connection fails to commit: " + e.getMessage(), e);
+ }
+
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(SourceInfo.LSN_KEY, lsn.toString());
+ if (txId != null) {
+ offsetMap.put(SourceInfo.TXID_KEY, txId.toString());
+ }
+ offsetMap.put(
+ SourceInfo.TIMESTAMP_USEC_KEY,
+ String.valueOf(Conversions.toEpochMicros(Instant.MIN)));
+ return PostgresOffset.of(offsetMap);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
index 66be79448..000cfa9c4 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
@@ -41,6 +41,7 @@ import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.connector.postgresql.connection.PostgresConnectionUtils;
import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
@@ -140,6 +141,17 @@ public class PostgresDialect implements
JdbcDataSourceDialect {
}
}
+ public Offset displayCommittedOffset(JdbcSourceConfig sourceConfig) {
+
+ try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+ return PostgresConnectionUtils.committedOffset(
+ (PostgresConnection) jdbc, getSlotName(), getPluginName());
+
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
@Override
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig
sourceConfig) {
// from Postgres docs:
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
index b9caca44c..bd069391d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
@@ -216,6 +216,7 @@ public class PostgreSQLTableFactory implements
DynamicTableSourceFactory {
private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST =
"latest-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_COMMITTED =
"committed-offset";
private static StartupOptions getStartupOptions(ReadableConfig config) {
String modeString = config.get(SCAN_STARTUP_MODE);
@@ -227,6 +228,8 @@ public class PostgreSQLTableFactory implements
DynamicTableSourceFactory {
return StartupOptions.snapshot();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
+ case SCAN_STARTUP_MODE_VALUE_COMMITTED:
+ return StartupOptions.committed();
default:
throw new ValidationException(
@@ -236,6 +239,7 @@ public class PostgreSQLTableFactory implements
DynamicTableSourceFactory {
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_SNAPSHOT,
SCAN_STARTUP_MODE_VALUE_LATEST,
+ SCAN_STARTUP_MODE_VALUE_COMMITTED,
modeString));
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index a163df921..ade1b215f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -47,6 +47,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
@@ -305,6 +306,98 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
result.getJobClient().get().cancel().get();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true})
+ public void testStartupFromCommittedOffset(boolean parallelismSnapshot)
throws Exception {
+ setup(parallelismSnapshot);
+ initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "INSERT INTO inventory.products VALUES
(default,'first','first description',0.1);");
+ statement.execute(
+ "INSERT INTO inventory.products VALUES
(default,'second','second description',0.2);");
+ }
+
+ // newly create slot's confirmed lsn is latest. We will test whether
committed mode starts
+ // from here.
+ String slotName = getSlotName();
+ String publicName = "dbz_publication_" + new Random().nextInt(1000);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ // TODO: Remove it after adding publication to an existing
replication slot.
+ statement.execute(
+ String.format(
+ "CREATE PUBLICATION %s FOR TABLE
inventory.products", publicName));
+ statement.execute(
+ String.format(
+ "select
pg_create_logical_replication_slot('%s','pgoutput');",
+ slotName));
+ }
+
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "INSERT INTO inventory.products VALUES
(default,'thirth','thirth description',0.1);");
+ statement.execute(
+ "INSERT INTO inventory.products VALUES
(default,'forth','forth description',0.2);");
+ }
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE debezium_source ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " description STRING,"
+ + " weight DECIMAL(10,3),"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'postgres-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' =
'true',"
+ + " 'decoding.plugin.name' = 'pgoutput',"
+ + " 'slot.name' = '%s',"
+ + " 'debezium.publication.name' = '%s',"
+ + " 'scan.lsn-commit.checkpoints-num-delay' =
'0',"
+ + " 'scan.startup.mode' = 'committed-offset'"
+ + ")",
+ POSTGRES_CONTAINER.getHost(),
+ POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+ POSTGRES_CONTAINER.getUsername(),
+ POSTGRES_CONTAINER.getPassword(),
+ POSTGRES_CONTAINER.getDatabaseName(),
+ "inventory",
+ "products",
+ slotName,
+ publicName);
+ String sinkDDL =
+ "CREATE TABLE sink "
+ + " WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ") LIKE debezium_source (EXCLUDING OPTIONS)";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM
debezium_source");
+ waitForSinkSize("sink", 2);
+
+ String[] expected =
+ new String[] {
+ "112,thirth,thirth description,0.100", "113,forth,forth
description,0.200"
+ };
+
+ List<String> actual =
TestValuesTableFactory.getResultsAsStrings("sink");
+ Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
+ result.getJobClient().get().cancel().get();
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testExceptionForReplicaIdentity(boolean parallelismSnapshot) throws
Exception {