This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f1c893f4 [FLINK-35093][cdc-source/postgres] Support COMMITTED_OFFSET 
startup mode to resume from existing replication slot
3f1c893f4 is described below

commit 3f1c893f41ba135cc8c41046423709f0dcd2d973
Author: Vinh Pham <[email protected]>
AuthorDate: Fri Apr 25 10:09:50 2025 +0100

    [FLINK-35093][cdc-source/postgres] Support COMMITTED_OFFSET startup mode to 
resume from existing replication slot
    
    This closes  #3950.
---
 .../docs/connectors/flink-sources/postgres-cdc.md  | 13 ++-
 .../docs/connectors/flink-sources/postgres-cdc.md  | 13 ++-
 .../base/config/JdbcSourceConfigFactory.java       |  1 +
 .../connectors/base/dialect/DataSourceDialect.java |  5 ++
 .../cdc/connectors/base/options/StartupMode.java   |  2 +-
 .../connectors/base/options/StartupOptions.java    | 11 +++
 .../base/source/assigner/StreamSplitAssigner.java  |  3 +
 .../connection/PostgresConnectionUtils.java        | 75 +++++++++++++++++
 .../postgres/source/PostgresDialect.java           | 12 +++
 .../postgres/table/PostgreSQLTableFactory.java     |  4 +
 .../postgres/table/PostgreSQLConnectorITCase.java  | 93 ++++++++++++++++++++++
 11 files changed, 227 insertions(+), 5 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
index d3082146b..25bfacede 100644
--- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
@@ -299,8 +299,7 @@ The following options is available only when 
`scan.incremental.snapshot.enabled=
       <td>optional</td>
       <td style="word-wrap: break-word;">initial</td>
       <td>String</td>
-      <td>Optional startup mode for Postgres CDC consumer, valid enumerations 
are "initial"
-           and "latest-offset".
+      <td>Optional startup mode for Postgres CDC consumer, valid enumerations 
are "initial", "latest-offset", "committed-offset" and "snapshot".
            Please see <a href="#startup-reading-position">Startup Reading 
Position</a> section for more detailed information.</td>
     </tr>
     <tr>
@@ -462,6 +461,16 @@ and then PostgreSQL CDC Source assigns the chunks to 
multiple readers to read th
 
 The Postgres CDC connector is a Flink Source connector which will read 
database snapshot first and then continues to read binlogs with **exactly-once 
processing** even failures happen. Please read [How the connector 
works](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#how-the-postgresql-connector-works).
 
+### Startup Reading Position
+
+The config option `scan.startup.mode` specifies the startup mode for 
PostgreSQL CDC consumer. The valid enumerations are:
+
+- `initial` (default): Performs an initial snapshot on the monitored database 
tables upon first startup, and continue to read the replication slot.
+- `latest-offset`: Never to perform snapshot on the monitored database tables 
upon first startup, just read from
+  the end of the replication which means only have the changes since the 
connector was started.
+- `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
+- `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
+
 ### DataStream Source
 
 The Postgres CDC connector can also be a DataStream source. There are two 
modes for the DataStream source:
diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md 
b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index a1ced51a6..b8e476801 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -296,8 +296,7 @@ The following options is available only when 
`scan.incremental.snapshot.enabled=
       <td>optional</td>
       <td style="word-wrap: break-word;">initial</td>
       <td>String</td>
-      <td>Optional startup mode for Postgres CDC consumer, valid enumerations 
are "initial"
-           and "latest-offset".
+      <td>Optional startup mode for Postgres CDC consumer, valid enumerations 
are "initial", "latest-offset", "committed-offset" and "snapshot".
            Please see <a href="#startup-reading-position">Startup Reading 
Position</a> section for more detailed information.</td>
     </tr>
     <tr>
@@ -463,6 +462,16 @@ and then PostgreSQL CDC Source assigns the chunks to 
multiple readers to read th
 
 The Postgres CDC connector is a Flink Source connector which will read 
database snapshot first and then continues to read binlogs with **exactly-once 
processing** even failures happen. Please read [How the connector 
works](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#how-the-postgresql-connector-works).
 
+### Startup Reading Position
+
+The config option `scan.startup.mode` specifies the startup mode for 
PostgreSQL CDC consumer. The valid enumerations are:
+
+- `initial` (default): Performs an initial snapshot on the monitored database 
tables upon first startup, and continue to read the replication slot.
+- `latest-offset`: Never to perform snapshot on the monitored database tables 
upon first startup, just read from
+  the end of the replication which means only have the changes since the 
connector was started.
+- `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
+- `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
+
 ### DataStream Source
 
 The Postgres CDC connector can also be a DataStream source. There are two 
modes for the DataStream source:
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
index 2ab9bb889..82034f488 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
@@ -209,6 +209,7 @@ public abstract class JdbcSourceConfigFactory implements 
Factory<JdbcSourceConfi
             case INITIAL:
             case SNAPSHOT:
             case LATEST_OFFSET:
+            case COMMITTED_OFFSETS:
                 break;
             default:
                 throw new UnsupportedOperationException(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
index e1809a0ad..63fa7e930 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
@@ -61,6 +61,11 @@ public interface DataSourceDialect<C extends SourceConfig> 
extends Serializable,
      */
     Offset displayCurrentOffset(C sourceConfig);
 
+    /** Displays committed offset from the database e.g. query Postgresql 
confirmed_lsn */
+    default Offset displayCommittedOffset(C sourceConfig) {
+        throw new UnsupportedOperationException();
+    }
+
     /** Check if the CollectionId is case-sensitive or not. */
     boolean isDataCollectionIdCaseSensitive(C sourceConfig);
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
index 60b6a072c..78075304c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupMode.java
@@ -30,7 +30,7 @@ public enum StartupMode {
     LATEST_OFFSET,
 
     SPECIFIC_OFFSETS,
-
+    COMMITTED_OFFSETS,
     TIMESTAMP,
     SNAPSHOT
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
index 80f8d2e73..c397a8188 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
@@ -64,6 +64,15 @@ public final class StartupOptions implements Serializable {
         return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
     }
 
+    /**
+     * Never to perform snapshot on the monitored database tables upon first 
startup, just read from
+     * the previous committed posistion of the change log which means only 
have the changes since
+     * the connector last stopped.
+     */
+    public static StartupOptions committed() {
+        return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null, 
null);
+    }
+
     /**
      * Never to perform snapshot on the monitored database tables upon first 
startup, and directly
      * read change log from the specified offset.
@@ -101,6 +110,7 @@ public final class StartupOptions implements Serializable {
             case SNAPSHOT:
             case EARLIEST_OFFSET:
             case LATEST_OFFSET:
+            case COMMITTED_OFFSETS:
                 break;
             case SPECIFIC_OFFSETS:
                 checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't 
be null");
@@ -118,6 +128,7 @@ public final class StartupOptions implements Serializable {
         return startupMode == StartupMode.EARLIEST_OFFSET
                 || startupMode == StartupMode.LATEST_OFFSET
                 || startupMode == StartupMode.SPECIFIC_OFFSETS
+                || startupMode == StartupMode.COMMITTED_OFFSETS
                 || startupMode == StartupMode.TIMESTAMP;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
index cbb749932..758538811 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
@@ -187,6 +187,9 @@ public class StreamSplitAssigner implements SplitAssigner {
                                 startupOptions.specificOffsetFile,
                                 startupOptions.specificOffsetPos.longValue());
                 break;
+            case COMMITTED_OFFSETS:
+                startingOffset = dialect.displayCommittedOffset(sourceConfig);
+                break;
             default:
                 throw new IllegalStateException(
                         "Unsupported startup mode " + 
startupOptions.startupMode);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java
new file mode 100644
index 000000000..bd45a9d65
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.Utils;
+import io.debezium.time.Conversions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for accessing various Debezium PostgresConnection 
private-package classes/methods
+ */
+public class PostgresConnectionUtils {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+
+    public static PostgresOffset committedOffset(
+            PostgresConnection jdbcConnection, String slotName, String 
pluginName) {
+        Long lsn;
+        Long txId;
+        try {
+            ServerInfo.ReplicationSlot slot =
+                    jdbcConnection.readReplicationSlotInfo(slotName, 
pluginName);
+
+            if (slot == ServerInfo.ReplicationSlot.INVALID) {
+                return Utils.currentOffset(jdbcConnection);
+            }
+            lsn = slot.latestFlushedLsn().asLong();
+            txId = slot.catalogXmin();
+            LOGGER.trace("Read xlogStart at '{}' from transaction '{}'", 
Lsn.valueOf(lsn), txId);
+        } catch (SQLException | InterruptedException e) {
+            throw new FlinkRuntimeException("Error getting current Lsn/txId " 
+ e.getMessage(), e);
+        }
+
+        try {
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            throw new FlinkRuntimeException(
+                    "JDBC connection fails to commit: " + e.getMessage(), e);
+        }
+
+        Map<String, String> offsetMap = new HashMap<>();
+        offsetMap.put(SourceInfo.LSN_KEY, lsn.toString());
+        if (txId != null) {
+            offsetMap.put(SourceInfo.TXID_KEY, txId.toString());
+        }
+        offsetMap.put(
+                SourceInfo.TIMESTAMP_USEC_KEY,
+                String.valueOf(Conversions.toEpochMicros(Instant.MIN)));
+        return PostgresOffset.of(offsetMap);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
index 66be79448..000cfa9c4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
@@ -41,6 +41,7 @@ import io.debezium.connector.postgresql.PostgresSchema;
 import io.debezium.connector.postgresql.PostgresTaskContext;
 import io.debezium.connector.postgresql.PostgresTopicSelector;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.connector.postgresql.connection.PostgresConnectionUtils;
 import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
@@ -140,6 +141,17 @@ public class PostgresDialect implements 
JdbcDataSourceDialect {
         }
     }
 
+    public Offset displayCommittedOffset(JdbcSourceConfig sourceConfig) {
+
+        try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+            return PostgresConnectionUtils.committedOffset(
+                    (PostgresConnection) jdbc, getSlotName(), getPluginName());
+
+        } catch (SQLException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
     @Override
     public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig 
sourceConfig) {
         // from Postgres docs:
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
index b9caca44c..bd069391d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
@@ -216,6 +216,7 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
 
     private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
     private static final String SCAN_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_COMMITTED = 
"committed-offset";
 
     private static StartupOptions getStartupOptions(ReadableConfig config) {
         String modeString = config.get(SCAN_STARTUP_MODE);
@@ -227,6 +228,8 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
                 return StartupOptions.snapshot();
             case SCAN_STARTUP_MODE_VALUE_LATEST:
                 return StartupOptions.latest();
+            case SCAN_STARTUP_MODE_VALUE_COMMITTED:
+                return StartupOptions.committed();
 
             default:
                 throw new ValidationException(
@@ -236,6 +239,7 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
                                 SCAN_STARTUP_MODE_VALUE_INITIAL,
                                 SCAN_STARTUP_MODE_VALUE_SNAPSHOT,
                                 SCAN_STARTUP_MODE_VALUE_LATEST,
+                                SCAN_STARTUP_MODE_VALUE_COMMITTED,
                                 modeString));
         }
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index a163df921..ade1b215f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -47,6 +47,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Stream;
 
@@ -305,6 +306,98 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
         result.getJobClient().get().cancel().get();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true})
+    public void testStartupFromCommittedOffset(boolean parallelismSnapshot) 
throws Exception {
+        setup(parallelismSnapshot);
+        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'first','first description',0.1);");
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'second','second description',0.2);");
+        }
+
+        // newly create slot's confirmed lsn is latest. We will test whether 
committed mode starts
+        // from here.
+        String slotName = getSlotName();
+        String publicName = "dbz_publication_" + new Random().nextInt(1000);
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            // TODO: Remove it after adding publication to an existing 
replication slot.
+            statement.execute(
+                    String.format(
+                            "CREATE PUBLICATION %s FOR TABLE 
inventory.products", publicName));
+            statement.execute(
+                    String.format(
+                            "select 
pg_create_logical_replication_slot('%s','pgoutput');",
+                            slotName));
+        }
+
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'thirth','thirth description',0.1);");
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'forth','forth description',0.2);");
+        }
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE debezium_source ("
+                                + " id INT NOT NULL,"
+                                + " name STRING,"
+                                + " description STRING,"
+                                + " weight DECIMAL(10,3),"
+                                + " PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + " 'connector' = 'postgres-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'true',"
+                                + " 'decoding.plugin.name' = 'pgoutput',"
+                                + " 'slot.name' = '%s',"
+                                + " 'debezium.publication.name'  = '%s',"
+                                + " 'scan.lsn-commit.checkpoints-num-delay' = 
'0',"
+                                + " 'scan.startup.mode' = 'committed-offset'"
+                                + ")",
+                        POSTGRES_CONTAINER.getHost(),
+                        POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+                        POSTGRES_CONTAINER.getUsername(),
+                        POSTGRES_CONTAINER.getPassword(),
+                        POSTGRES_CONTAINER.getDatabaseName(),
+                        "inventory",
+                        "products",
+                        slotName,
+                        publicName);
+        String sinkDDL =
+                "CREATE TABLE sink "
+                        + " WITH ("
+                        + " 'connector' = 'values',"
+                        + " 'sink-insert-only' = 'false'"
+                        + ") LIKE debezium_source (EXCLUDING OPTIONS)";
+        tEnv.executeSql(sourceDDL);
+        tEnv.executeSql(sinkDDL);
+
+        TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM 
debezium_source");
+        waitForSinkSize("sink", 2);
+
+        String[] expected =
+                new String[] {
+                    "112,thirth,thirth description,0.100", "113,forth,forth 
description,0.200"
+                };
+
+        List<String> actual = 
TestValuesTableFactory.getResultsAsStrings("sink");
+        Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
+        result.getJobClient().get().cancel().get();
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testExceptionForReplicaIdentity(boolean parallelismSnapshot) throws 
Exception {

Reply via email to