This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 441eec81a [FLINK-37332][cdc-base] Support any column as chunk key
column
441eec81a is described below
commit 441eec81a1629ee101edd3ed3ab38bcefd65db9a
Author: SeungMin <[email protected]>
AuthorDate: Fri Apr 18 16:42:30 2025 +0900
[FLINK-37332][cdc-base] Support any column as chunk key column
This closes #3928
Co-authored-by: Hang Ruan <[email protected]>
---
.../DebeziumJsonSerializationSchemaTest.java | 17 +++---
.../base/source/utils/JdbcChunkUtils.java | 18 +++---
.../connectors/mysql/source/utils/ChunkUtils.java | 2 +-
.../assigners/MySqlSnapshotSplitAssignerTest.java | 18 +++---
.../cdc/connectors/oracle/util/ChunkUtils.java | 14 ++++-
.../oracle/source/OracleSourceITCase.java | 29 ++++-----
.../postgres/source/utils/ChunkUtils.java | 18 +++---
.../postgres/source/PostgresSourceITCase.java | 68 +++++++++-------------
.../sqlserver/source/SqlServerSourceITCase.java | 29 ++++-----
9 files changed, 97 insertions(+), 116 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index 6108f24d8..d4ef2be7d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -40,7 +40,6 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
@@ -49,6 +48,8 @@ import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Tests for {@link DebeziumJsonSerializationSchema}. */
class DebeziumJsonSerializationSchemaTest {
@@ -74,7 +75,7 @@ class DebeziumJsonSerializationSchemaTest {
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
-
Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+ assertThat(serializationSchema.serialize(createTableEvent)).isNull();
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(),
DataTypes.STRING()));
// insert
@@ -90,7 +91,7 @@ class DebeziumJsonSerializationSchemaTest {
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
JsonNode actual =
mapper.readTree(serializationSchema.serialize(insertEvent1));
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
TABLE_1,
@@ -103,7 +104,7 @@ class DebeziumJsonSerializationSchemaTest {
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
DataChangeEvent deleteEvent =
DataChangeEvent.deleteEvent(
TABLE_1,
@@ -116,7 +117,7 @@ class DebeziumJsonSerializationSchemaTest {
mapper.readTree(
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
DataChangeEvent updateEvent =
DataChangeEvent.updateEvent(
TABLE_1,
@@ -134,7 +135,7 @@ class DebeziumJsonSerializationSchemaTest {
mapper.readTree(
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
}
@Test
@@ -203,7 +204,7 @@ class DebeziumJsonSerializationSchemaTest {
DataTypes.STRING());
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
-
Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+ assertThat(serializationSchema.serialize(createTableEvent)).isNull();
BinaryRecordDataGenerator generator = new
BinaryRecordDataGenerator(rowType);
// insert
DataChangeEvent insertEvent1 =
@@ -242,6 +243,6 @@ class DebeziumJsonSerializationSchemaTest {
mapper.readTree(
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"doc\":\"_boolean
comment\",\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"10\"},\"field\":\"_varbinary\"},{\"t
[...]
JsonNode actual =
mapper.readTree(serializationSchema.serialize(insertEvent1));
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
index 46b30310c..b6de3d211 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
@@ -109,17 +109,15 @@ public class JdbcChunkUtils {
*/
public static Column getSplitColumn(Table table, @Nullable String
chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();
- if (primaryKeys.isEmpty()) {
+ if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
throw new ValidationException(
- String.format(
- "Incremental snapshot for tables requires primary
key,"
- + " but table %s doesn't have primary
key.",
- table.id()));
+ "To use incremental snapshot,
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't
have primary keys.");
}
+ List<Column> searchColumns = table.columns();
if (chunkKeyColumn != null) {
Optional<Column> targetPkColumn =
- primaryKeys.stream()
+ searchColumns.stream()
.filter(col -> chunkKeyColumn.equals(col.name()))
.findFirst();
if (targetPkColumn.isPresent()) {
@@ -127,13 +125,15 @@ public class JdbcChunkUtils {
}
throw new ValidationException(
String.format(
- "Chunk key column '%s' doesn't exist in the
primary key [%s] of the table %s.",
+ "Chunk key column '%s' doesn't exist in the
columns [%s] of the table %s.",
chunkKeyColumn,
-
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+ searchColumns.stream()
+ .map(Column::name)
+ .collect(Collectors.joining(",")),
table.id()));
}
- // use first field in primary key as the split key
+ // use first column of primary key columns as the chunk key column by
default
return primaryKeys.get(0);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
index 405fd1f96..794abd2b5 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
@@ -67,7 +67,7 @@ public class ChunkUtils {
String chunkKeyColumn = findChunkKeyColumn(table.id(),
chunkKeyColumns);
if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
throw new ValidationException(
- "'scan.incremental.snapshot.chunk.key-column' must be set
when the table doesn't have primary keys.");
+ "To use incremental snapshot,
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't
have primary keys.");
}
List<Column> searchColumns = table.columns();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 3511326cd..d97555593 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -394,17 +394,17 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
@Test
void testTableWithoutPrimaryKey() {
String tableWithoutPrimaryKey = "customers_no_pk";
+
Assertions.assertThatThrownBy(
- () ->
- getTestAssignSnapshotSplits(
- 4,
-
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND
- .defaultValue(),
-
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
- .defaultValue(),
- new String[] {tableWithoutPrimaryKey}))
+ () -> {
+ getTestAssignSnapshotSplits(
+ 4,
+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+ new String[] {tableWithoutPrimaryKey});
+ })
.hasStackTraceContaining(
- "'scan.incremental.snapshot.chunk.key-column' must be
set when the table doesn't have primary keys.");
+ "To use incremental snapshot,
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't
have primary keys.");
}
@Test
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
index 2ccafc7ed..e3865da03 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/util/ChunkUtils.java
@@ -49,9 +49,15 @@ public class ChunkUtils {
public static Column getChunkKeyColumn(Table table, @Nullable String
chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();
+ if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
+ throw new ValidationException(
+ "To use incremental snapshot,
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't
have primary keys.");
+ }
+
+ List<Column> searchColumns = table.columns();
if (chunkKeyColumn != null) {
Optional<Column> targetPkColumn =
- primaryKeys.stream()
+ searchColumns.stream()
.filter(col -> chunkKeyColumn.equals(col.name()))
.findFirst();
if (targetPkColumn.isPresent()) {
@@ -59,9 +65,11 @@ public class ChunkUtils {
}
throw new ValidationException(
String.format(
- "Chunk key column '%s' doesn't exist in the
primary key [%s] of the table %s.",
+ "Chunk key column '%s' doesn't exist in the
columns [%s] of the table %s.",
chunkKeyColumn,
-
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+ searchColumns.stream()
+ .map(Column::name)
+ .collect(Collectors.joining(",")),
table.id()));
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
index 9e87808d9..e61c433d4 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
@@ -40,7 +40,6 @@ import org.apache.flink.util.CloseableIterator;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.jdbc.JdbcConfiguration;
import org.apache.commons.lang3.StringUtils;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@@ -382,24 +381,18 @@ public class OracleSourceITCase extends
OracleSourceTestBase {
}
@Test
- void testTableWithChunkColumnOfNoPrimaryKey() {
+ public void testTableWithChunkColumnOfNoPrimaryKey() throws Exception {
String chunkColumn = "NAME";
- try {
- testOracleParallelSource(
- 1,
- FailoverType.NONE,
- FailoverPhase.NEVER,
- new String[] {"CUSTOMERS"},
- false,
- RestartStrategies.noRestart(),
- chunkColumn);
- } catch (Exception e) {
- Assertions.assertThat(e)
- .hasStackTraceContaining(
- String.format(
- "Chunk key column '%s' doesn't exist in
the primary key [%s] of the table %s.",
- chunkColumn, "id",
"customer.dbo.customers"));
- }
+ testOracleParallelSource(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ new String[] {"CUSTOMERS"},
+ false,
+ RestartStrategies.noRestart(),
+ chunkColumn);
+
+ // since `scan.incremental.snapshot.chunk.key-column` is set, an
exception should not occur.
}
private List<String> testBackfillWhenWritingEvents(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
index e912943c4..822b58334 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/ChunkUtils.java
@@ -43,17 +43,15 @@ public class ChunkUtils {
public static Column getSplitColumn(Table table, @Nullable String
chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();
- if (primaryKeys.isEmpty()) {
+ if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
throw new ValidationException(
- String.format(
- "Incremental snapshot for tables requires primary
key,"
- + " but table %s doesn't have primary
key.",
- table.id()));
+ "To use incremental snapshot,
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't
have primary keys.");
}
+ List<Column> searchColumns = table.columns();
if (chunkKeyColumn != null) {
Optional<Column> targetPkColumn =
- primaryKeys.stream()
+ searchColumns.stream()
.filter(col -> chunkKeyColumn.equals(col.name()))
.findFirst();
if (targetPkColumn.isPresent()) {
@@ -61,13 +59,15 @@ public class ChunkUtils {
}
throw new ValidationException(
String.format(
- "Chunk key column '%s' doesn't exist in the
primary key [%s] of the table %s.",
+ "Chunk key column '%s' doesn't exist in the
columns [%s] of the table %s.",
chunkKeyColumn,
-
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+ searchColumns.stream()
+ .map(Column::name)
+ .collect(Collectors.joining(",")),
table.id()));
}
- // use first field in primary key as the split key
+ // use first column of primary key columns as the chunk key column by
default
return primaryKeys.get(0);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 30dbc77d2..0cee09744 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -42,7 +42,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.postgresql.connection.PostgresConnection;
@@ -264,23 +263,18 @@ class PostgresSourceITCase extends PostgresTestBase {
@ValueSource(strings = {"initial", "latest-offset"})
void testConsumingTableWithoutPrimaryKey(String scanStartupMode) throws
Exception {
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
- try {
- testPostgresParallelSource(
- 1,
- scanStartupMode,
- PostgresTestUtils.FailoverType.NONE,
- PostgresTestUtils.FailoverPhase.NEVER,
- new String[] {"customers_no_pk"},
- RestartStrategies.noRestart());
- } catch (Exception e) {
- Assertions.assertThat(
- ExceptionUtils.findThrowableWithMessage(
- e,
- String.format(
- "Incremental snapshot for
tables requires primary key, but table %s doesn't have primary key",
- SCHEMA_NAME +
".customers_no_pk")))
- .isPresent();
- }
+ Assertions.assertThatThrownBy(
+ () -> {
+ testPostgresParallelSource(
+ 1,
+ scanStartupMode,
+ PostgresTestUtils.FailoverType.NONE,
+ PostgresTestUtils.FailoverPhase.NEVER,
+ new String[] {"customers_no_pk"},
+ RestartStrategies.noRestart());
+ })
+ .hasStackTraceContaining(
+ "To use incremental snapshot,
'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't
have primary keys.");
} else {
testPostgresParallelSource(
1,
@@ -292,10 +286,8 @@ class PostgresSourceITCase extends PostgresTestBase {
}
}
- @ParameterizedTest
- @ValueSource(strings = {"initial", "latest-offset"})
- void testReadSingleTableWithSingleParallelismAndSkipBackfill(String
scanStartupMode)
- throws Exception {
+ @Test
+ void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws
Exception {
testPostgresParallelSource(
DEFAULT_PARALLELISM,
DEFAULT_SCAN_STARTUP_MODE,
@@ -686,26 +678,20 @@ class PostgresSourceITCase extends PostgresTestBase {
@ParameterizedTest
@ValueSource(strings = {"initial", "latest-offset"})
- void testTableWithChunkColumnOfNoPrimaryKey(String scanStartupMode) {
+ public void testTableWithChunkColumnOfNoPrimaryKey(String scanStartupMode)
throws Exception {
Assumptions.assumeThat(scanStartupMode).isEqualTo(DEFAULT_SCAN_STARTUP_MODE);
- String chunkColumn = "name";
- try {
- testPostgresParallelSource(
- 1,
- scanStartupMode,
- PostgresTestUtils.FailoverType.NONE,
- PostgresTestUtils.FailoverPhase.NEVER,
- new String[] {"Customers"},
- RestartStrategies.noRestart(),
- Collections.singletonMap(
- "scan.incremental.snapshot.chunk.key-column",
chunkColumn));
- } catch (Exception e) {
- Assertions.assertThat(e)
- .hasStackTraceContaining(
- String.format(
- "Chunk key column '%s' doesn't exist in
the primary key [%s] of the table %s.",
- chunkColumn, "Id", "customer.Customers"));
- }
+ String chunkColumn = "Name";
+ testPostgresParallelSource(
+ 1,
+ scanStartupMode,
+ PostgresTestUtils.FailoverType.NONE,
+ PostgresTestUtils.FailoverPhase.NEVER,
+ new String[] {"Customers"},
+ RestartStrategies.noRestart(),
+ Collections.singletonMap(
+ "scan.incremental.snapshot.chunk.key-column",
chunkColumn));
+
+ // since `scan.incremental.snapshot.chunk.key-column` is set, an
exception should not occur.
}
@ParameterizedTest
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
index e33e888d4..d92ce7cc9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
@@ -37,7 +37,6 @@ import org.apache.flink.util.CloseableIterator;
import io.debezium.jdbc.JdbcConnection;
import org.apache.commons.lang3.StringUtils;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -280,24 +279,18 @@ class SqlServerSourceITCase extends
SqlServerSourceTestBase {
}
@Test
- void testTableWithChunkColumnOfNoPrimaryKey() {
+ public void testTableWithChunkColumnOfNoPrimaryKey() throws Exception {
String chunkColumn = "name";
- try {
- testSqlServerParallelSource(
- 1,
- FailoverType.NONE,
- FailoverPhase.NEVER,
- new String[] {"dbo.customers"},
- false,
- RestartStrategies.noRestart(),
- chunkColumn);
- } catch (Exception e) {
- Assertions.assertThat(e)
- .hasStackTraceContaining(
- String.format(
- "Chunk key column '%s' doesn't exist in
the primary key [%s] of the table %s.",
- chunkColumn, "id",
"customer.dbo.customers"));
- }
+ testSqlServerParallelSource(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ new String[] {"dbo.customers"},
+ false,
+ RestartStrategies.noRestart(),
+ chunkColumn);
+
+ // since `scan.incremental.snapshot.chunk.key-column` is set, an
exception should not occur.
}
private List<String> testBackfillWhenWritingEvents(