This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b42d78de3f [Feature][CDC] Support read no primary key table (#6098)
b42d78de3f is described below
commit b42d78de3f9118bc3526c636df446b5c68285db6
Author: hailin0 <[email protected]>
AuthorDate: Thu Dec 28 11:19:28 2023 +0800
[Feature][CDC] Support read no primary key table (#6098)
---
.../splitter/AbstractJdbcSourceChunkSplitter.java | 66 +++++++++++-------
.../fetch/scan/MySqlSnapshotSplitReadTask.java | 2 +-
.../seatunnel/cdc/mysql/utils/MySqlUtils.java | 3 +-
.../fetch/scan/SqlServerSnapshotSplitReadTask.java | 2 +-
.../cdc/sqlserver/source/utils/SqlServerUtils.java | 3 +-
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 44 ++++++++++++
.../src/test/resources/ddl/mysql_cdc.sql | 79 +++++++++++++++++++++
.../mysqlcdc_to_mysql_with_no_primary_key.conf | 55 +++++++++++++++
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 80 ++++++++++++++++++----
.../src/test/resources/ddl/column_type_test.sql | 48 +++++++++++++
...servercdc_to_sqlserver_with_no_primary_key.conf | 58 ++++++++++++++++
11 files changed, 396 insertions(+), 44 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index dc1d977358..151e003ecd 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -62,27 +62,42 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
long start = System.currentTimeMillis();
Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new RuntimeException("Failed to split chunks for table "
+ tableId, e);
- }
-
- // convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
- SeaTunnelRowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
+ if (splitColumn == null) {
+ if (sourceConfig.isExactlyOnce()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Exactly once is enabled, but not found
primary key or unique key for table %s",
+ tableId));
+ }
+ SnapshotSplit singleSplit = createSnapshotSplit(jdbc, tableId,
0, null, null, null);
+ splits.add(singleSplit);
+ log.warn(
+ "No evenly split column found for table {}, use single
split {}",
+ tableId,
+ singleSplit);
+ } else {
+ final List<ChunkRange> chunks;
+ try {
+ chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to split chunks for
table " + tableId, e);
+ }
+
+ // convert chunks into splits
+ SeaTunnelRowType splitType = getSplitType(splitColumn);
+ for (int i = 0; i < chunks.size(); i++) {
+ ChunkRange chunk = chunks.get(i);
+ SnapshotSplit split =
+ createSnapshotSplit(
+ jdbc,
+ tableId,
+ i,
+ splitType,
+ chunk.getChunkStart(),
+ chunk.getChunkEnd());
+ splits.add(split);
+ }
}
long end = System.currentTimeMillis();
@@ -371,6 +386,8 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
}
}
}
+ } else {
+ log.warn("No primary key found for table {}", tableId);
}
List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
@@ -389,16 +406,15 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
}
}
}
+ } else {
+ log.warn("No unique key found for table {}", tableId);
}
if (splitColumn != null) {
return splitColumn;
}
- throw new UnsupportedOperationException(
- String.format(
- "Incremental snapshot for tables requires primary
key/unique key,"
- + " but table %s doesn't have primary key.",
- tableId));
+ log.warn("No evenly split column found for table {}", tableId);
+ return null;
}
protected String splitId(TableId tableId, int chunkId) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
index c0879193c6..61b0987cda 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
@@ -206,7 +206,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
snapshotSplit.getSplitEnd() == null,
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
-
snapshotSplit.getSplitKeyType().getTotalFields(),
+ snapshotSplit.getSplitKeyType(),
connectorConfig.getSnapshotFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 3dde38b422..706d7a8862 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -291,13 +291,14 @@ public class MySqlUtils {
boolean isLastSplit,
Object[] splitStart,
Object[] splitEnd,
- int primaryKeyNum,
+ SeaTunnelRowType splitKeyType,
int fetchSize) {
try {
final PreparedStatement statement = initStatement(jdbc, sql,
fetchSize);
if (isFirstSplit && isLastSplit) {
return statement;
}
+ int primaryKeyNum = splitKeyType.getTotalFields();
if (isFirstSplit) {
for (int i = 0; i < primaryKeyNum; i++) {
statement.setObject(i + 1, splitEnd[i]);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
index 9857c18f06..30ffadd126 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
@@ -196,7 +196,7 @@ public class SqlServerSnapshotSplitReadTask extends
AbstractSnapshotChangeEventS
snapshotSplit.getSplitEnd() == null,
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
-
snapshotSplit.getSplitKeyType().getTotalFields(),
+ snapshotSplit.getSplitKeyType(),
connectorConfig.getSnapshotFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index ab0c061d2c..2c83e02e0e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -293,13 +293,14 @@ public class SqlServerUtils {
boolean isLastSplit,
Object[] splitStart,
Object[] splitEnd,
- int primaryKeyNum,
+ SeaTunnelRowType splitKeyType,
int fetchSize) {
try {
final PreparedStatement statement = initStatement(jdbc, sql,
fetchSize);
if (isFirstSplit && isLastSplit) {
return statement;
}
+ int primaryKeyNum = splitKeyType.getTotalFields();
if (isFirstSplit) {
for (int i = 0; i < primaryKeyNum; i++) {
statement.setObject(i + 1, splitEnd[i]);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 647ec9c140..366b3146b1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -94,6 +94,8 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
private static final String SOURCE_TABLE_1 = "mysql_cdc_e2e_source_table";
private static final String SOURCE_TABLE_2 = "mysql_cdc_e2e_source_table2";
+ private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
+ "mysql_cdc_e2e_source_table_no_primary_key";
private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table";
private static MySqlContainer createMySqlContainer(MySqlVersion version) {
@@ -211,6 +213,48 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
});
}
+ @TestTemplate
+ public void testMysqlCdcCheckDataWithNoPrimaryKey(TestContainer container)
{
+ // Clear related content to ensure that multiple operations are not
affected
+ clearTable(MYSQL_DATABASE, SINK_TABLE);
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/mysqlcdc_to_mysql_with_no_primary_key.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ log.info(query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)).toString());
+ Assertions.assertIterableEquals(
+ query(
+ getSourceQuerySQL(
+ MYSQL_DATABASE,
SOURCE_TABLE_NO_PRIMARY_KEY)),
+ query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
+ });
+
+ // insert update delete
+ executeSql("DELETE FROM " + MYSQL_DATABASE + "." +
SOURCE_TABLE_NO_PRIMARY_KEY);
+ upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(
+ getSourceQuerySQL(
+ MYSQL_DATABASE,
SOURCE_TABLE_NO_PRIMARY_KEY)),
+ query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
+ });
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index da42f2b8f0..91ae73bd27 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -122,6 +122,54 @@ CREATE TABLE mysql_cdc_e2e_source_table2
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_0900_ai_ci;
+CREATE TABLE mysql_cdc_e2e_source_table_no_primary_key
+(
+ `id` int NOT NULL,
+ `f_binary` binary(64) DEFAULT NULL,
+ `f_blob` blob,
+ `f_long_varbinary` mediumblob,
+ `f_longblob` longblob,
+ `f_tinyblob` tinyblob,
+ `f_varbinary` varbinary(100) DEFAULT NULL,
+ `f_smallint` smallint DEFAULT NULL,
+ `f_smallint_unsigned` smallint unsigned DEFAULT NULL,
+ `f_mediumint` mediumint DEFAULT NULL,
+ `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL,
+ `f_int` int DEFAULT NULL,
+ `f_int_unsigned` int unsigned DEFAULT NULL,
+ `f_integer` int DEFAULT NULL,
+ `f_integer_unsigned` int unsigned DEFAULT NULL,
+ `f_bigint` bigint DEFAULT NULL,
+ `f_bigint_unsigned` bigint unsigned DEFAULT NULL,
+ `f_numeric` decimal(10, 0) DEFAULT NULL,
+ `f_decimal` decimal(10, 0) DEFAULT NULL,
+ `f_float` float DEFAULT NULL,
+ `f_double` double DEFAULT NULL,
+ `f_double_precision` double DEFAULT NULL,
+ `f_longtext` longtext,
+ `f_mediumtext` mediumtext,
+ `f_text` text,
+ `f_tinytext` tinytext,
+ `f_varchar` varchar(100) DEFAULT NULL,
+ `f_date` date DEFAULT NULL,
+ `f_datetime` datetime DEFAULT NULL,
+ `f_timestamp` timestamp NULL DEFAULT NULL,
+ `f_bit1` bit(1) DEFAULT NULL,
+ `f_bit64` bit(64) DEFAULT NULL,
+ `f_char` char(1) DEFAULT NULL,
+ `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL,
+ `f_mediumblob` mediumblob,
+ `f_long_varchar` mediumtext,
+ `f_real` double DEFAULT NULL,
+ `f_time` time DEFAULT NULL,
+ `f_tinyint` tinyint DEFAULT NULL,
+ `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
+ `f_json` json DEFAULT NULL,
+ `f_year` year DEFAULT NULL
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4
+ COLLATE = utf8mb4_0900_ai_ci;
+
CREATE TABLE mysql_cdc_e2e_sink_table
(
`id` int NOT NULL AUTO_INCREMENT,
@@ -174,6 +222,7 @@ CREATE TABLE mysql_cdc_e2e_sink_table
truncate table mysql_cdc_e2e_source_table;
truncate table mysql_cdc_e2e_source_table2;
+truncate table mysql_cdc_e2e_source_table_no_primary_key;
truncate table mysql_cdc_e2e_sink_table;
INSERT INTO mysql_cdc_e2e_source_table ( id, f_binary, f_blob,
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
@@ -238,6 +287,36 @@ VALUES ( 1,
0x616263740000000000000000000000000000000000000000000000000000000000
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field', 112.345,
'14:30:00', -128, 22, '{ "key": "value" }', 2021 );
+INSERT INTO mysql_cdc_e2e_source_table_no_primary_key ( id, f_binary, f_blob,
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
+ f_smallint_unsigned, f_mediumint,
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,
+ f_integer_unsigned, f_bigint,
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,
+ f_double_precision, f_longtext,
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,
+ f_timestamp, f_bit1, f_bit64,
f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,
+ f_tinyint, f_tinyint_unsigned,
f_json, f_year )
+VALUES ( 1,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
+ 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456,
654321, 1234567, 7654321, 1234567, 7654321,
+ 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long
text field', 'This is a medium text field',
+ 'This is a text field', 'This is a tiny text field', 'This is a
varchar field', '2022-04-27', '2022-04-27 14:30:00',
+ '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',
+
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',
+ 12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),
+ ( 2,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
0x74696E79626C6F62,
+ 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567,
7654321, 1234567, 7654321, 123456789, 987654321,
+ 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is
a medium text field', 'This is a text field',
+ 'This is a tiny text field', 'This is a varchar field', '2022-04-27',
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+ 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',
+
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',
+ 112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ),
+ ( 3,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+ 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
0x74696E79626C6F62,
+ 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567,
7654321, 1234567, 7654321, 123456789, 987654321, 123,
+ 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a
medium text field', 'This is a text field',
+ 'This is a tiny text field', 'This is a varchar field', '2022-04-27',
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+ 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',
+
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field', 112.345,
+ '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
CREATE DATABASE IF NOT EXISTS `mysql_cdc2`;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
new file mode 100644
index 0000000000..46df806ae7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc"
+ server-id = 5652
+ username = "st_user"
+ password = "seatunnel"
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table_no_primary_key"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+ exactly_once = false
+ }
+}
+
+sink {
+ jdbc {
+ source_table_name = "customers_mysql_cdc"
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user"
+ password = "seatunnel"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = mysql_cdc
+ table = mysql_cdc_e2e_sink_table
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 458d80bb35..597838096b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -66,7 +66,7 @@ import static org.awaitility.Awaitility.await;
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
- disabledReason = "Currently SPARK and FLINK do not support cdc")
+ disabledReason = "Currently SPARK do not support cdc")
public class SqlServerCDCIT extends TestSuiteBase implements TestResource {
private static final String HOST = "sqlserver-host";
@@ -80,8 +80,11 @@ public class SqlServerCDCIT extends TestSuiteBase implements
TestResource {
private static final String DISABLE_DB_CDC =
"IF EXISTS(select 1 from sys.databases where name='#' AND
is_cdc_enabled=1)\n"
+ "EXEC sys.sp_cdc_disable_db";
-
- private static final String SOURCE_SQL =
+ private static final String SOURCE_TABLE =
"column_type_test.dbo.full_types";
+ private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
+ "column_type_test.dbo.full_types_no_primary_key";
+ private static final String SINK_TABLE =
"column_type_test.dbo.full_types_sink";
+ private static final String SELECT_SOURCE_SQL =
"select\n"
+ " id,\n"
+ " val_char,\n"
@@ -109,8 +112,8 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
+ " val_xml,\n"
+ " val_datetimeoffset,\n"
+ " CONVERT(varchar(100), val_varbinary) as
val_varbinary\n"
- + "from column_type_test.dbo.full_types";
- private static final String SINK_SQL =
+ + "from %s order by id asc";
+ private static final String SELECT_SINK_SQL =
"select\n"
+ " id,\n"
+ " val_char,\n"
@@ -138,7 +141,7 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
+ " val_xml,\n"
+ " val_datetimeoffset,\n"
+ " CONVERT(varchar(100), val_varbinary) as
val_varbinary\n"
- + "from column_type_test.dbo.full_types_sink";
+ + "from %s order by id asc";
public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER =
new
MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
@@ -207,18 +210,58 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
- querySql(SOURCE_SQL), querySql(SINK_SQL));
+ querySql(SELECT_SOURCE_SQL, SOURCE_TABLE),
+ querySql(SELECT_SINK_SQL, SINK_TABLE));
+ });
+
+ // insert update delete
+ updateSourceTable(SOURCE_TABLE);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(SELECT_SOURCE_SQL, SOURCE_TABLE),
+ querySql(SELECT_SINK_SQL, SINK_TABLE));
+ });
+ }
+
+ @TestTemplate
+ public void testCDCWithNoPrimaryKey(TestContainer container) {
+ initializeSqlServerTable("column_type_test");
+
+ CompletableFuture<Void> executeJobFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/sqlservercdc_to_sqlserver_with_no_primary_key.conf");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // snapshot stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(SELECT_SOURCE_SQL,
SOURCE_TABLE_NO_PRIMARY_KEY),
+ querySql(SELECT_SINK_SQL, SINK_TABLE));
});
// insert update delete
- updateSourceTable();
+ updateSourceTable(SOURCE_TABLE_NO_PRIMARY_KEY);
// stream stage
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
- querySql(SOURCE_SQL), querySql(SINK_SQL));
+ querySql(SELECT_SOURCE_SQL,
SOURCE_TABLE_NO_PRIMARY_KEY),
+ querySql(SELECT_SINK_SQL, SINK_TABLE));
});
}
@@ -255,26 +298,29 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
}
}
- private void updateSourceTable() {
+ private void updateSourceTable(String table) {
executeSql(
- "INSERT INTO column_type_test.dbo.full_types VALUES (3,\n"
+ "INSERT INTO "
+ + table
+ + " VALUES (3,\n"
+ " 'cč3', 'vcč', 'tč',
N'cč', N'vcč', N'tč',\n"
+ " 1.123, 2, 3.323,
4.323, 5.323, 6.323,\n"
+ " 1, 22, 333, 4444,
55555,\n"
+ " '2018-07-13',
'10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13
14:23:45',\n"
+ "
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));");
executeSql(
- "INSERT INTO column_type_test.dbo.full_types VALUES (4,\n"
+ "INSERT INTO "
+ + table
+ + " VALUES (4,\n"
+ " 'cč4', 'vcč', 'tč',
N'cč', N'vcč', N'tč',\n"
+ " 1.123, 2, 3.323,
4.323, 5.323, 6.323,\n"
+ " 1, 22, 333, 4444,
55555,\n"
+ " '2018-07-13',
'10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13
14:23:45',\n"
+ "
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));");
- executeSql("DELETE FROM column_type_test.dbo.full_types where id = 2");
+ executeSql("DELETE FROM " + table + " where id = 2");
- executeSql(
- "UPDATE column_type_test.dbo.full_types SET val_varchar =
'newvcč' where id = 1");
+ executeSql("UPDATE " + table + " SET val_varchar = 'newvcč' where id =
1");
}
private Connection getJdbcConnection() throws SQLException {
@@ -284,6 +330,10 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
MSSQL_SERVER_CONTAINER.getPassword());
}
+ private List<List<Object>> querySql(String sql, String table) {
+ return querySql(String.format(sql, table));
+ }
+
private List<List<Object>> querySql(String sql) {
try (Connection connection = getJdbcConnection()) {
ResultSet resultSet =
connection.createStatement().executeQuery(sql);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
index 6bb95d3c95..d227c34615 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
@@ -72,6 +72,54 @@ INSERT INTO full_types VALUES (2,
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name =
'full_types', @role_name = NULL, @supports_net_changes = 0;
+CREATE TABLE full_types_no_primary_key (
+ id int NOT NULL,
+ val_char char(3),
+ val_varchar varchar(1000),
+ val_text text,
+ val_nchar nchar(3),
+ val_nvarchar nvarchar(1000),
+ val_ntext ntext,
+ val_decimal decimal(6,3),
+ val_numeric numeric,
+ val_float float,
+ val_real real,
+ val_smallmoney smallmoney,
+ val_money money,
+ val_bit bit,
+ val_tinyint tinyint,
+ val_smallint smallint,
+ val_int int,
+ val_bigint bigint,
+ val_date date,
+ val_time time,
+ val_datetime2 datetime2,
+ val_datetime datetime,
+ val_smalldatetime smalldatetime,
+ val_xml xml,
+ val_datetimeoffset DATETIMEOFFSET(4),
+ val_varbinary varbinary(100)
+);
+INSERT INTO full_types_no_primary_key VALUES (0,
+ 'cč0', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+ 1.123, 2, 3.323, 4.323, 5.323, 6.323,
+ 1, 22, 333, 4444, 55555,
+ '2018-07-13', '10:23:45', '2018-07-13
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_no_primary_key VALUES (1,
+ 'cč1', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+ 1.123, 2, 3.323, 4.323, 5.323, 6.323,
+ 1, 22, 333, 4444, 55555,
+ '2018-07-13', '10:23:45', '2018-07-13
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_no_primary_key VALUES (2,
+ 'cč2', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+ 1.123, 2, 3.323, 4.323, 5.323, 6.323,
+ 1, 22, 333, 4444, 55555,
+ '2018-07-13', '10:23:45', '2018-07-13
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name =
'full_types_no_primary_key', @role_name = NULL, @supports_net_changes = 0;
+
CREATE TABLE full_types_sink (
id int NOT NULL,
val_char char(3),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_no_primary_key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_no_primary_key.conf
new file mode 100644
index 0000000000..753a6f3a7a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_no_primary_key.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ SqlServer-CDC {
+ result_table_name = "customers"
+ username = "sa"
+ password = "Password!"
+ database-names = ["column_type_test"]
+ table-names = ["column_type_test.dbo.full_types_no_primary_key"]
+ base-url =
"jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+
+ exactly_once = false
+ }
+}
+
+transform {
+}
+
+sink {
+ Jdbc {
+ source_table_name = "customers"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false"
+ user = "sa"
+ password = "Password!"
+ generate_sink_sql = true
+ database = "column_type_test"
+ table = "dbo.full_types_sink"
+ batch_size = 1
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file