This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 441eec81a [FLINK-37332][cdc-base] Support any column as chunk key 
column
441eec81a is described below

commit 441eec81a1629ee101edd3ed3ab38bcefd65db9a
Author: SeungMin <[email protected]>
AuthorDate: Fri Apr 18 16:42:30 2025 +0900

    [FLINK-37332][cdc-base] Support any column as chunk key column
    
    This closes   #3928
    
    Co-authored-by: Hang Ruan <[email protected]>
---
 .../DebeziumJsonSerializationSchemaTest.java       | 17 +++---
 .../base/source/utils/JdbcChunkUtils.java          | 18 +++---
 .../connectors/mysql/source/utils/ChunkUtils.java  |  2 +-
 .../assigners/MySqlSnapshotSplitAssignerTest.java  | 18 +++---
 .../cdc/connectors/oracle/util/ChunkUtils.java     | 14 ++++-
 .../oracle/source/OracleSourceITCase.java          | 29 ++++-----
 .../postgres/source/utils/ChunkUtils.java          | 18 +++---
 .../postgres/source/PostgresSourceITCase.java      | 68 +++++++++-------------
 .../sqlserver/source/SqlServerSourceITCase.java    | 29 ++++-----
 9 files changed, 97 insertions(+), 116 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index 6108f24d8..d4ef2be7d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
@@ -49,6 +48,8 @@ import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Tests for {@link DebeziumJsonSerializationSchema}. */
 class DebeziumJsonSerializationSchemaTest {
 
@@ -74,7 +75,7 @@ class DebeziumJsonSerializationSchemaTest {
                         .primaryKey("col1")
                         .build();
         CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
-        
Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+        assertThat(serializationSchema.serialize(createTableEvent)).isNull();
         BinaryRecordDataGenerator generator =
                 new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.STRING()));
         // insert
@@ -90,7 +91,7 @@ class DebeziumJsonSerializationSchemaTest {
                 mapper.readTree(
                         
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
         DataChangeEvent insertEvent2 =
                 DataChangeEvent.insertEvent(
                         TABLE_1,
@@ -103,7 +104,7 @@ class DebeziumJsonSerializationSchemaTest {
                 mapper.readTree(
                         
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
         DataChangeEvent deleteEvent =
                 DataChangeEvent.deleteEvent(
                         TABLE_1,
@@ -116,7 +117,7 @@ class DebeziumJsonSerializationSchemaTest {
                 mapper.readTree(
                         
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
         DataChangeEvent updateEvent =
                 DataChangeEvent.updateEvent(
                         TABLE_1,
@@ -134,7 +135,7 @@ class DebeziumJsonSerializationSchemaTest {
                 mapper.readTree(
                         
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
         actual = mapper.readTree(serializationSchema.serialize(updateEvent));
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
     }
 
     @Test
@@ -203,7 +204,7 @@ class DebeziumJsonSerializationSchemaTest {
                         DataTypes.STRING());
 
         CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
-        
Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+        assertThat(serializationSchema.serialize(createTableEvent)).isNull();
         BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
         // insert
         DataChangeEvent insertEvent1 =
@@ -242,6 +243,6 @@ class DebeziumJsonSerializationSchemaTest {
                 mapper.readTree(
                         
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"doc\":\"_boolean
 
comment\",\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"10\"},\"field\":\"_varbinary\"},{\"t
 [...]
         JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
index 46b30310c..b6de3d211 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
@@ -109,17 +109,15 @@ public class JdbcChunkUtils {
      */
     public static Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
         List<Column> primaryKeys = table.primaryKeyColumns();
-        if (primaryKeys.isEmpty()) {
+        if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
             throw new ValidationException(
-                    String.format(
-                            "Incremental snapshot for tables requires primary 
key,"
-                                    + " but table %s doesn't have primary 
key.",
-                            table.id()));
+                    "To use incremental snapshot, 
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't 
have primary keys.");
         }
 
+        List<Column> searchColumns = table.columns();
         if (chunkKeyColumn != null) {
             Optional<Column> targetPkColumn =
-                    primaryKeys.stream()
+                    searchColumns.stream()
                             .filter(col -> chunkKeyColumn.equals(col.name()))
                             .findFirst();
             if (targetPkColumn.isPresent()) {
@@ -127,13 +125,15 @@ public class JdbcChunkUtils {
             }
             throw new ValidationException(
                     String.format(
-                            "Chunk key column '%s' doesn't exist in the 
primary key [%s] of the table %s.",
+                            "Chunk key column '%s' doesn't exist in the 
columns [%s] of the table %s.",
                             chunkKeyColumn,
-                            
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+                            searchColumns.stream()
+                                    .map(Column::name)
+                                    .collect(Collectors.joining(",")),
                             table.id()));
         }
 
-        // use first field in primary key as the split key
+        // use first column of primary key columns as the chunk key column by 
default
         return primaryKeys.get(0);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
index 405fd1f96..794abd2b5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
@@ -67,7 +67,7 @@ public class ChunkUtils {
         String chunkKeyColumn = findChunkKeyColumn(table.id(), 
chunkKeyColumns);
         if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
             throw new ValidationException(
-                    "'scan.incremental.snapshot.chunk.key-column' must be set 
when the table doesn't have primary keys.");
+                    "To use incremental snapshot, 
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't 
have primary keys.");
         }
 
         List<Column> searchColumns = table.columns();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 3511326cd..d97555593 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -394,17 +394,17 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
     @Test
     void testTableWithoutPrimaryKey() {
         String tableWithoutPrimaryKey = "customers_no_pk";
+
         Assertions.assertThatThrownBy(
-                        () ->
-                                getTestAssignSnapshotSplits(
-                                        4,
-                                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND
-                                                .defaultValue(),
-                                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
-                                                .defaultValue(),
-                                        new String[] {tableWithoutPrimaryKey}))
+                        () -> {
+                            getTestAssignSnapshotSplits(
+                                    4,
+                                    
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                                    
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                                    new String[] {tableWithoutPrimaryKey});
+                        })
                 .hasStackTraceContaining(
-                        "'scan.incremental.snapshot.chunk.key-column' must be 
set when the table doesn't have primary keys.");
+                        "To use incremental snapshot, 
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't 
have primary keys.");
     }
 
     @Test
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
index 2ccafc7ed..e3865da03 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
@@ -49,9 +49,15 @@ public class ChunkUtils {
     public static Column getChunkKeyColumn(Table table, @Nullable String 
chunkKeyColumn) {
         List<Column> primaryKeys = table.primaryKeyColumns();
 
+        if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
+            throw new ValidationException(
+                    "To use incremental snapshot, 
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't 
have primary keys.");
+        }
+
+        List<Column> searchColumns = table.columns();
         if (chunkKeyColumn != null) {
             Optional<Column> targetPkColumn =
-                    primaryKeys.stream()
+                    searchColumns.stream()
                             .filter(col -> chunkKeyColumn.equals(col.name()))
                             .findFirst();
             if (targetPkColumn.isPresent()) {
@@ -59,9 +65,11 @@ public class ChunkUtils {
             }
             throw new ValidationException(
                     String.format(
-                            "Chunk key column '%s' doesn't exist in the 
primary key [%s] of the table %s.",
+                            "Chunk key column '%s' doesn't exist in the 
columns [%s] of the table %s.",
                             chunkKeyColumn,
-                            
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+                            searchColumns.stream()
+                                    .map(Column::name)
+                                    .collect(Collectors.joining(",")),
                             table.id()));
         }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
index 9e87808d9..e61c433d4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
@@ -40,7 +40,6 @@ import org.apache.flink.util.CloseableIterator;
 import io.debezium.connector.oracle.OracleConnection;
 import io.debezium.jdbc.JdbcConfiguration;
 import org.apache.commons.lang3.StringUtils;
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
@@ -382,24 +381,18 @@ public class OracleSourceITCase extends 
OracleSourceTestBase {
     }
 
     @Test
-    void testTableWithChunkColumnOfNoPrimaryKey() {
+    public void testTableWithChunkColumnOfNoPrimaryKey() throws Exception {
         String chunkColumn = "NAME";
-        try {
-            testOracleParallelSource(
-                    1,
-                    FailoverType.NONE,
-                    FailoverPhase.NEVER,
-                    new String[] {"CUSTOMERS"},
-                    false,
-                    RestartStrategies.noRestart(),
-                    chunkColumn);
-        } catch (Exception e) {
-            Assertions.assertThat(e)
-                    .hasStackTraceContaining(
-                            String.format(
-                                    "Chunk key column '%s' doesn't exist in 
the primary key [%s] of the table %s.",
-                                    chunkColumn, "id", 
"customer.dbo.customers"));
-        }
+        testOracleParallelSource(
+                1,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                new String[] {"CUSTOMERS"},
+                false,
+                RestartStrategies.noRestart(),
+                chunkColumn);
+
+        // since `scan.incremental.snapshot.chunk.key-column` is set, an 
exception should not occur.
     }
 
     private List<String> testBackfillWhenWritingEvents(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
index e912943c4..822b58334 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
@@ -43,17 +43,15 @@ public class ChunkUtils {
 
     public static Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
         List<Column> primaryKeys = table.primaryKeyColumns();
-        if (primaryKeys.isEmpty()) {
+        if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
             throw new ValidationException(
-                    String.format(
-                            "Incremental snapshot for tables requires primary 
key,"
-                                    + " but table %s doesn't have primary 
key.",
-                            table.id()));
+                    "To use incremental snapshot, 
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't 
have primary keys.");
         }
 
+        List<Column> searchColumns = table.columns();
         if (chunkKeyColumn != null) {
             Optional<Column> targetPkColumn =
-                    primaryKeys.stream()
+                    searchColumns.stream()
                             .filter(col -> chunkKeyColumn.equals(col.name()))
                             .findFirst();
             if (targetPkColumn.isPresent()) {
@@ -61,13 +59,15 @@ public class ChunkUtils {
             }
             throw new ValidationException(
                     String.format(
-                            "Chunk key column '%s' doesn't exist in the 
primary key [%s] of the table %s.",
+                            "Chunk key column '%s' doesn't exist in the 
columns [%s] of the table %s.",
                             chunkKeyColumn,
-                            
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+                            searchColumns.stream()
+                                    .map(Column::name)
+                                    .collect(Collectors.joining(",")),
                             table.id()));
         }
 
-        // use first field in primary key as the split key
+        // use first column of primary key columns as the chunk key column by 
default
         return primaryKeys.get(0);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 30dbc77d2..0cee09744 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -42,7 +42,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.connector.postgresql.connection.PostgresConnection;
@@ -264,23 +263,18 @@ class PostgresSourceITCase extends PostgresTestBase {
     @ValueSource(strings = {"initial", "latest-offset"})
     void testConsumingTableWithoutPrimaryKey(String scanStartupMode) throws 
Exception {
         if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
-            try {
-                testPostgresParallelSource(
-                        1,
-                        scanStartupMode,
-                        PostgresTestUtils.FailoverType.NONE,
-                        PostgresTestUtils.FailoverPhase.NEVER,
-                        new String[] {"customers_no_pk"},
-                        RestartStrategies.noRestart());
-            } catch (Exception e) {
-                Assertions.assertThat(
-                                ExceptionUtils.findThrowableWithMessage(
-                                        e,
-                                        String.format(
-                                                "Incremental snapshot for 
tables requires primary key, but table %s doesn't have primary key",
-                                                SCHEMA_NAME + 
".customers_no_pk")))
-                        .isPresent();
-            }
+            Assertions.assertThatThrownBy(
+                            () -> {
+                                testPostgresParallelSource(
+                                        1,
+                                        scanStartupMode,
+                                        PostgresTestUtils.FailoverType.NONE,
+                                        PostgresTestUtils.FailoverPhase.NEVER,
+                                        new String[] {"customers_no_pk"},
+                                        RestartStrategies.noRestart());
+                            })
+                    .hasStackTraceContaining(
+                            "To use incremental snapshot, 
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't 
have primary keys.");
         } else {
             testPostgresParallelSource(
                     1,
@@ -292,10 +286,8 @@ class PostgresSourceITCase extends PostgresTestBase {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"initial", "latest-offset"})
-    void testReadSingleTableWithSingleParallelismAndSkipBackfill(String 
scanStartupMode)
-            throws Exception {
+    @Test
+    void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws 
Exception {
         testPostgresParallelSource(
                 DEFAULT_PARALLELISM,
                 DEFAULT_SCAN_STARTUP_MODE,
@@ -686,26 +678,20 @@ class PostgresSourceITCase extends PostgresTestBase {
 
     @ParameterizedTest
     @ValueSource(strings = {"initial", "latest-offset"})
-    void testTableWithChunkColumnOfNoPrimaryKey(String scanStartupMode) {
+    public void testTableWithChunkColumnOfNoPrimaryKey(String scanStartupMode) 
throws Exception {
         
Assumptions.assumeThat(scanStartupMode).isEqualTo(DEFAULT_SCAN_STARTUP_MODE);
-        String chunkColumn = "name";
-        try {
-            testPostgresParallelSource(
-                    1,
-                    scanStartupMode,
-                    PostgresTestUtils.FailoverType.NONE,
-                    PostgresTestUtils.FailoverPhase.NEVER,
-                    new String[] {"Customers"},
-                    RestartStrategies.noRestart(),
-                    Collections.singletonMap(
-                            "scan.incremental.snapshot.chunk.key-column", 
chunkColumn));
-        } catch (Exception e) {
-            Assertions.assertThat(e)
-                    .hasStackTraceContaining(
-                            String.format(
-                                    "Chunk key column '%s' doesn't exist in 
the primary key [%s] of the table %s.",
-                                    chunkColumn, "Id", "customer.Customers"));
-        }
+        String chunkColumn = "Name";
+        testPostgresParallelSource(
+                1,
+                scanStartupMode,
+                PostgresTestUtils.FailoverType.NONE,
+                PostgresTestUtils.FailoverPhase.NEVER,
+                new String[] {"Customers"},
+                RestartStrategies.noRestart(),
+                Collections.singletonMap(
+                        "scan.incremental.snapshot.chunk.key-column", 
chunkColumn));
+
+        // since `scan.incremental.snapshot.chunk.key-column` is set, an 
exception should not occur.
     }
 
     @ParameterizedTest
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
index e33e888d4..d92ce7cc9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
@@ -37,7 +37,6 @@ import org.apache.flink.util.CloseableIterator;
 
 import io.debezium.jdbc.JdbcConnection;
 import org.apache.commons.lang3.StringUtils;
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -280,24 +279,18 @@ class SqlServerSourceITCase extends 
SqlServerSourceTestBase {
     }
 
     @Test
-    void testTableWithChunkColumnOfNoPrimaryKey() {
+    public void testTableWithChunkColumnOfNoPrimaryKey() throws Exception {
         String chunkColumn = "name";
-        try {
-            testSqlServerParallelSource(
-                    1,
-                    FailoverType.NONE,
-                    FailoverPhase.NEVER,
-                    new String[] {"dbo.customers"},
-                    false,
-                    RestartStrategies.noRestart(),
-                    chunkColumn);
-        } catch (Exception e) {
-            Assertions.assertThat(e)
-                    .hasStackTraceContaining(
-                            String.format(
-                                    "Chunk key column '%s' doesn't exist in 
the primary key [%s] of the table %s.",
-                                    chunkColumn, "id", 
"customer.dbo.customers"));
-        }
+        testSqlServerParallelSource(
+                1,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                new String[] {"dbo.customers"},
+                false,
+                RestartStrategies.noRestart(),
+                chunkColumn);
+
+        // since `scan.incremental.snapshot.chunk.key-column` is set, an 
exception should not occur.
     }
 
     private List<String> testBackfillWhenWritingEvents(

Reply via email to