This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b42d78de3f [Feature][CDC] Support read no primary key table (#6098)
b42d78de3f is described below

commit b42d78de3f9118bc3526c636df446b5c68285db6
Author: hailin0 <[email protected]>
AuthorDate: Thu Dec 28 11:19:28 2023 +0800

    [Feature][CDC] Support read no primary key table (#6098)
---
 .../splitter/AbstractJdbcSourceChunkSplitter.java  | 66 +++++++++++-------
 .../fetch/scan/MySqlSnapshotSplitReadTask.java     |  2 +-
 .../seatunnel/cdc/mysql/utils/MySqlUtils.java      |  3 +-
 .../fetch/scan/SqlServerSnapshotSplitReadTask.java |  2 +-
 .../cdc/sqlserver/source/utils/SqlServerUtils.java |  3 +-
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 44 ++++++++++++
 .../src/test/resources/ddl/mysql_cdc.sql           | 79 +++++++++++++++++++++
 .../mysqlcdc_to_mysql_with_no_primary_key.conf     | 55 +++++++++++++++
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    | 80 ++++++++++++++++++----
 .../src/test/resources/ddl/column_type_test.sql    | 48 +++++++++++++
 ...servercdc_to_sqlserver_with_no_primary_key.conf | 58 ++++++++++++++++
 11 files changed, 396 insertions(+), 44 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index dc1d977358..151e003ecd 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -62,27 +62,42 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
             long start = System.currentTimeMillis();
 
             Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new RuntimeException("Failed to split chunks for table " 
+ tableId, e);
-            }
-
-            // convert chunks into splits
             List<SnapshotSplit> splits = new ArrayList<>();
-            SeaTunnelRowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
+            if (splitColumn == null) {
+                if (sourceConfig.isExactlyOnce()) {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Exactly once is enabled, but not found 
primary key or unique key for table %s",
+                                    tableId));
+                }
+                SnapshotSplit singleSplit = createSnapshotSplit(jdbc, tableId, 
0, null, null, null);
+                splits.add(singleSplit);
+                log.warn(
+                        "No evenly split column found for table {}, use single 
split {}",
+                        tableId,
+                        singleSplit);
+            } else {
+                final List<ChunkRange> chunks;
+                try {
+                    chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
+                } catch (SQLException e) {
+                    throw new RuntimeException("Failed to split chunks for 
table " + tableId, e);
+                }
+
+                // convert chunks into splits
+                SeaTunnelRowType splitType = getSplitType(splitColumn);
+                for (int i = 0; i < chunks.size(); i++) {
+                    ChunkRange chunk = chunks.get(i);
+                    SnapshotSplit split =
+                            createSnapshotSplit(
+                                    jdbc,
+                                    tableId,
+                                    i,
+                                    splitType,
+                                    chunk.getChunkStart(),
+                                    chunk.getChunkEnd());
+                    splits.add(split);
+                }
             }
 
             long end = System.currentTimeMillis();
@@ -371,6 +386,8 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
                     }
                 }
             }
+        } else {
+            log.warn("No primary key found for table {}", tableId);
         }
 
         List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
@@ -389,16 +406,15 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
                     }
                 }
             }
+        } else {
+            log.warn("No unique key found for table {}", tableId);
         }
         if (splitColumn != null) {
             return splitColumn;
         }
 
-        throw new UnsupportedOperationException(
-                String.format(
-                        "Incremental snapshot for tables requires primary 
key/unique key,"
-                                + " but table %s doesn't have primary key.",
-                        tableId));
+        log.warn("No evenly split column found for table {}", tableId);
+        return null;
     }
 
     protected String splitId(TableId tableId, int chunkId) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
index c0879193c6..61b0987cda 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
@@ -206,7 +206,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
                                 snapshotSplit.getSplitEnd() == null,
                                 snapshotSplit.getSplitStart(),
                                 snapshotSplit.getSplitEnd(),
-                                
snapshotSplit.getSplitKeyType().getTotalFields(),
+                                snapshotSplit.getSplitKeyType(),
                                 connectorConfig.getSnapshotFetchSize());
                 ResultSet rs = selectStatement.executeQuery()) {
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 3dde38b422..706d7a8862 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -291,13 +291,14 @@ public class MySqlUtils {
             boolean isLastSplit,
             Object[] splitStart,
             Object[] splitEnd,
-            int primaryKeyNum,
+            SeaTunnelRowType splitKeyType,
             int fetchSize) {
         try {
             final PreparedStatement statement = initStatement(jdbc, sql, 
fetchSize);
             if (isFirstSplit && isLastSplit) {
                 return statement;
             }
+            int primaryKeyNum = splitKeyType.getTotalFields();
             if (isFirstSplit) {
                 for (int i = 0; i < primaryKeyNum; i++) {
                     statement.setObject(i + 1, splitEnd[i]);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
index 9857c18f06..30ffadd126 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
@@ -196,7 +196,7 @@ public class SqlServerSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventS
                                 snapshotSplit.getSplitEnd() == null,
                                 snapshotSplit.getSplitStart(),
                                 snapshotSplit.getSplitEnd(),
-                                
snapshotSplit.getSplitKeyType().getTotalFields(),
+                                snapshotSplit.getSplitKeyType(),
                                 connectorConfig.getSnapshotFetchSize());
                 ResultSet rs = selectStatement.executeQuery()) {
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index ab0c061d2c..2c83e02e0e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -293,13 +293,14 @@ public class SqlServerUtils {
             boolean isLastSplit,
             Object[] splitStart,
             Object[] splitEnd,
-            int primaryKeyNum,
+            SeaTunnelRowType splitKeyType,
             int fetchSize) {
         try {
             final PreparedStatement statement = initStatement(jdbc, sql, 
fetchSize);
             if (isFirstSplit && isLastSplit) {
                 return statement;
             }
+            int primaryKeyNum = splitKeyType.getTotalFields();
             if (isFirstSplit) {
                 for (int i = 0; i < primaryKeyNum; i++) {
                     statement.setObject(i + 1, splitEnd[i]);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 647ec9c140..366b3146b1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -94,6 +94,8 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
 
     private static final String SOURCE_TABLE_1 = "mysql_cdc_e2e_source_table";
     private static final String SOURCE_TABLE_2 = "mysql_cdc_e2e_source_table2";
+    private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
+            "mysql_cdc_e2e_source_table_no_primary_key";
     private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table";
 
     private static MySqlContainer createMySqlContainer(MySqlVersion version) {
@@ -211,6 +213,48 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testMysqlCdcCheckDataWithNoPrimaryKey(TestContainer container) 
{
+        // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE, SINK_TABLE);
+
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/mysqlcdc_to_mysql_with_no_primary_key.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            log.info(query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)).toString());
+                            Assertions.assertIterableEquals(
+                                    query(
+                                            getSourceQuerySQL(
+                                                    MYSQL_DATABASE, 
SOURCE_TABLE_NO_PRIMARY_KEY)),
+                                    query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)));
+                        });
+
+        // insert update delete
+        executeSql("DELETE FROM " + MYSQL_DATABASE + "." + 
SOURCE_TABLE_NO_PRIMARY_KEY);
+        upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+        // stream stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    query(
+                                            getSourceQuerySQL(
+                                                    MYSQL_DATABASE, 
SOURCE_TABLE_NO_PRIMARY_KEY)),
+                                    query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)));
+                        });
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index da42f2b8f0..91ae73bd27 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -122,6 +122,54 @@ CREATE TABLE mysql_cdc_e2e_source_table2
   DEFAULT CHARSET = utf8mb4
   COLLATE = utf8mb4_0900_ai_ci;
 
+CREATE TABLE mysql_cdc_e2e_source_table_no_primary_key
+(
+    `id`                   int                            NOT NULL,
+    `f_binary`             binary(64)                     DEFAULT NULL,
+    `f_blob`               blob,
+    `f_long_varbinary`     mediumblob,
+    `f_longblob`           longblob,
+    `f_tinyblob`           tinyblob,
+    `f_varbinary`          varbinary(100)                 DEFAULT NULL,
+    `f_smallint`           smallint                       DEFAULT NULL,
+    `f_smallint_unsigned`  smallint unsigned              DEFAULT NULL,
+    `f_mediumint`          mediumint                      DEFAULT NULL,
+    `f_mediumint_unsigned` mediumint unsigned             DEFAULT NULL,
+    `f_int`                int                            DEFAULT NULL,
+    `f_int_unsigned`       int unsigned                   DEFAULT NULL,
+    `f_integer`            int                            DEFAULT NULL,
+    `f_integer_unsigned`   int unsigned                   DEFAULT NULL,
+    `f_bigint`             bigint                         DEFAULT NULL,
+    `f_bigint_unsigned`    bigint unsigned                DEFAULT NULL,
+    `f_numeric`            decimal(10, 0)                 DEFAULT NULL,
+    `f_decimal`            decimal(10, 0)                 DEFAULT NULL,
+    `f_float`              float                          DEFAULT NULL,
+    `f_double`             double                         DEFAULT NULL,
+    `f_double_precision`   double                         DEFAULT NULL,
+    `f_longtext`           longtext,
+    `f_mediumtext`         mediumtext,
+    `f_text`               text,
+    `f_tinytext`           tinytext,
+    `f_varchar`            varchar(100)                   DEFAULT NULL,
+    `f_date`               date                           DEFAULT NULL,
+    `f_datetime`           datetime                       DEFAULT NULL,
+    `f_timestamp`          timestamp NULL                 DEFAULT NULL,
+    `f_bit1`               bit(1)                         DEFAULT NULL,
+    `f_bit64`              bit(64)                        DEFAULT NULL,
+    `f_char`               char(1)                        DEFAULT NULL,
+    `f_enum`               enum ('enum1','enum2','enum3') DEFAULT NULL,
+    `f_mediumblob`         mediumblob,
+    `f_long_varchar`       mediumtext,
+    `f_real`               double                         DEFAULT NULL,
+    `f_time`               time                           DEFAULT NULL,
+    `f_tinyint`            tinyint                        DEFAULT NULL,
+    `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
+    `f_json`               json                           DEFAULT NULL,
+    `f_year`               year                           DEFAULT NULL
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4
+  COLLATE = utf8mb4_0900_ai_ci;
+
 CREATE TABLE mysql_cdc_e2e_sink_table
 (
     `id`                   int       NOT NULL AUTO_INCREMENT,
@@ -174,6 +222,7 @@ CREATE TABLE mysql_cdc_e2e_sink_table
 
 truncate table mysql_cdc_e2e_source_table;
 truncate table mysql_cdc_e2e_source_table2;
+truncate table mysql_cdc_e2e_source_table_no_primary_key;
 truncate table mysql_cdc_e2e_sink_table;
 
 INSERT INTO mysql_cdc_e2e_source_table ( id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
@@ -238,6 +287,36 @@ VALUES ( 1, 
0x616263740000000000000000000000000000000000000000000000000000000000
          
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', 112.345,
          '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
 
+INSERT INTO mysql_cdc_e2e_source_table_no_primary_key ( id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
+                                          f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,
+                                          f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,
+                                          f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,
+                                          f_timestamp, f_bit1, f_bit64, 
f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,
+                                          f_tinyint, f_tinyint_unsigned, 
f_json, f_year )
+VALUES ( 1, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
+         0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 
654321, 1234567, 7654321, 1234567, 7654321,
+         123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long 
text field', 'This is a medium text field',
+         'This is a text field', 'This is a tiny text field', 'This is a 
varchar field', '2022-04-27', '2022-04-27 14:30:00',
+         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',
+         12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),
+       ( 2, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,
+         0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 
7654321, 1234567, 7654321, 123456789, 987654321,
+         123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is 
a medium text field', 'This is a text field',
+         'This is a tiny text field', 'This is a varchar field', '2022-04-27', 
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',
+         112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ),
+       ( 3, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,
+         0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 
7654321, 1234567, 7654321, 123456789, 987654321, 123,
+         789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a 
medium text field', 'This is a text field',
+         'This is a tiny text field', 'This is a varchar field', '2022-04-27', 
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', 112.345,
+         '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
 
 CREATE DATABASE IF NOT EXISTS `mysql_cdc2`;
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
new file mode 100644
index 0000000000..46df806ae7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MySQL-CDC {
+    result_table_name = "customers_mysql_cdc"
+    server-id = 5652
+    username = "st_user"
+    password = "seatunnel"
+    table-names = ["mysql_cdc.mysql_cdc_e2e_source_table_no_primary_key"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+    exactly_once = false
+  }
+}
+
+sink {
+  jdbc {
+    source_table_name = "customers_mysql_cdc"
+    url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user"
+    password = "seatunnel"
+
+    generate_sink_sql = true
+    # You need to configure both database and table
+    database = mysql_cdc
+    table = mysql_cdc_e2e_sink_table
+    primary_keys = ["id"]
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 458d80bb35..597838096b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -66,7 +66,7 @@ import static org.awaitility.Awaitility.await;
 @DisabledOnContainer(
         value = {},
         type = {EngineType.SPARK},
-        disabledReason = "Currently SPARK and FLINK do not support cdc")
+        disabledReason = "Currently SPARK do not support cdc")
 public class SqlServerCDCIT extends TestSuiteBase implements TestResource {
 
     private static final String HOST = "sqlserver-host";
@@ -80,8 +80,11 @@ public class SqlServerCDCIT extends TestSuiteBase implements 
TestResource {
     private static final String DISABLE_DB_CDC =
             "IF EXISTS(select 1 from sys.databases where name='#' AND 
is_cdc_enabled=1)\n"
                     + "EXEC sys.sp_cdc_disable_db";
-
-    private static final String SOURCE_SQL =
+    private static final String SOURCE_TABLE = 
"column_type_test.dbo.full_types";
+    private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
+            "column_type_test.dbo.full_types_no_primary_key";
+    private static final String SINK_TABLE = 
"column_type_test.dbo.full_types_sink";
+    private static final String SELECT_SOURCE_SQL =
             "select\n"
                     + "  id,\n"
                     + "  val_char,\n"
@@ -109,8 +112,8 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                     + "  val_xml,\n"
                     + "  val_datetimeoffset,\n"
                     + "  CONVERT(varchar(100), val_varbinary) as 
val_varbinary\n"
-                    + "from column_type_test.dbo.full_types";
-    private static final String SINK_SQL =
+                    + "from %s order by id asc";
+    private static final String SELECT_SINK_SQL =
             "select\n"
                     + "  id,\n"
                     + "  val_char,\n"
@@ -138,7 +141,7 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                     + "  val_xml,\n"
                     + "  val_datetimeoffset,\n"
                     + "  CONVERT(varchar(100), val_varbinary) as 
val_varbinary\n"
-                    + "from column_type_test.dbo.full_types_sink";
+                    + "from %s order by id asc";
 
     public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER =
             new 
MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
@@ -207,18 +210,58 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                 .untilAsserted(
                         () -> {
                             Assertions.assertIterableEquals(
-                                    querySql(SOURCE_SQL), querySql(SINK_SQL));
+                                    querySql(SELECT_SOURCE_SQL, SOURCE_TABLE),
+                                    querySql(SELECT_SINK_SQL, SINK_TABLE));
+                        });
+
+        // insert update delete
+        updateSourceTable(SOURCE_TABLE);
+
+        // stream stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(SELECT_SOURCE_SQL, SOURCE_TABLE),
+                                    querySql(SELECT_SINK_SQL, SINK_TABLE));
+                        });
+    }
+
+    @TestTemplate
+    public void testCDCWithNoPrimaryKey(TestContainer container) {
+        initializeSqlServerTable("column_type_test");
+
+        CompletableFuture<Void> executeJobFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                container.executeJob(
+                                        
"/sqlservercdc_to_sqlserver_with_no_primary_key.conf");
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        });
+
+        // snapshot stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(SELECT_SOURCE_SQL, 
SOURCE_TABLE_NO_PRIMARY_KEY),
+                                    querySql(SELECT_SINK_SQL, SINK_TABLE));
                         });
 
         // insert update delete
-        updateSourceTable();
+        updateSourceTable(SOURCE_TABLE_NO_PRIMARY_KEY);
 
         // stream stage
         await().atMost(60000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () -> {
                             Assertions.assertIterableEquals(
-                                    querySql(SOURCE_SQL), querySql(SINK_SQL));
+                                    querySql(SELECT_SOURCE_SQL, 
SOURCE_TABLE_NO_PRIMARY_KEY),
+                                    querySql(SELECT_SINK_SQL, SINK_TABLE));
                         });
     }
 
@@ -255,26 +298,29 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
-    private void updateSourceTable() {
+    private void updateSourceTable(String table) {
         executeSql(
-                "INSERT INTO column_type_test.dbo.full_types VALUES (3,\n"
+                "INSERT INTO "
+                        + table
+                        + " VALUES (3,\n"
                         + "                               'cč3', 'vcč', 'tč', 
N'cč', N'vcč', N'tč',\n"
                         + "                               1.123, 2, 3.323, 
4.323, 5.323, 6.323,\n"
                         + "                               1, 22, 333, 4444, 
55555,\n"
                         + "                               '2018-07-13', 
'10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 
14:23:45',\n"
                         + "                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));");
         executeSql(
-                "INSERT INTO column_type_test.dbo.full_types VALUES (4,\n"
+                "INSERT INTO "
+                        + table
+                        + " VALUES (4,\n"
                         + "                               'cč4', 'vcč', 'tč', 
N'cč', N'vcč', N'tč',\n"
                         + "                               1.123, 2, 3.323, 
4.323, 5.323, 6.323,\n"
                         + "                               1, 22, 333, 4444, 
55555,\n"
                         + "                               '2018-07-13', 
'10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 
14:23:45',\n"
                         + "                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));");
 
-        executeSql("DELETE FROM column_type_test.dbo.full_types where id = 2");
+        executeSql("DELETE FROM " + table + " where id = 2");
 
-        executeSql(
-                "UPDATE column_type_test.dbo.full_types SET val_varchar = 
'newvcč' where id = 1");
+        executeSql("UPDATE " + table + " SET val_varchar = 'newvcč' where id = 
1");
     }
 
     private Connection getJdbcConnection() throws SQLException {
@@ -284,6 +330,10 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                 MSSQL_SERVER_CONTAINER.getPassword());
     }
 
+    private List<List<Object>> querySql(String sql, String table) {
+        return querySql(String.format(sql, table));
+    }
+
     private List<List<Object>> querySql(String sql) {
         try (Connection connection = getJdbcConnection()) {
             ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
index 6bb95d3c95..d227c34615 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
@@ -72,6 +72,54 @@ INSERT INTO full_types VALUES (2,
                                
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
 EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 
'full_types', @role_name = NULL, @supports_net_changes = 0;
 
+CREATE TABLE full_types_no_primary_key (
+                            id int NOT NULL,
+                            val_char char(3),
+                            val_varchar varchar(1000),
+                            val_text text,
+                            val_nchar nchar(3),
+                            val_nvarchar nvarchar(1000),
+                            val_ntext ntext,
+                            val_decimal decimal(6,3),
+                            val_numeric numeric,
+                            val_float float,
+                            val_real real,
+                            val_smallmoney smallmoney,
+                            val_money money,
+                            val_bit bit,
+                            val_tinyint tinyint,
+                            val_smallint smallint,
+                            val_int int,
+                            val_bigint bigint,
+                            val_date date,
+                            val_time time,
+                            val_datetime2 datetime2,
+                            val_datetime datetime,
+                            val_smalldatetime smalldatetime,
+                            val_xml xml,
+                            val_datetimeoffset DATETIMEOFFSET(4),
+                            val_varbinary  varbinary(100)
+);
+INSERT INTO full_types_no_primary_key VALUES (0,
+                               'cč0', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+                               1.123, 2, 3.323, 4.323, 5.323, 6.323,
+                               1, 22, 333, 4444, 55555,
+                               '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_no_primary_key VALUES (1,
+                               'cč1', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+                               1.123, 2, 3.323, 4.323, 5.323, 6.323,
+                               1, 22, 333, 4444, 55555,
+                               '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_no_primary_key VALUES (2,
+                               'cč2', 'vcč', 'tč', N'cč', N'vcč', N'tč',
+                               1.123, 2, 3.323, 4.323, 5.323, 6.323,
+                               1, 22, 333, 4444, 55555,
+                               '2018-07-13', '10:23:45', '2018-07-13 
11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+                               
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 
'full_types_no_primary_key', @role_name = NULL, @supports_net_changes = 0;
+
 CREATE TABLE full_types_sink (
                             id int NOT NULL,
                             val_char char(3),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_no_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_no_primary_key.conf
new file mode 100644
index 0000000000..753a6f3a7a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_no_primary_key.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  SqlServer-CDC {
+    result_table_name = "customers"
+    username = "sa"
+    password = "Password!"
+    database-names = ["column_type_test"]
+    table-names = ["column_type_test.dbo.full_types_no_primary_key"]
+    base-url = 
"jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+
+    exactly_once = false
+  }
+}
+
+transform {
+}
+
+sink {
+  Jdbc {
+    source_table_name = "customers"
+    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false"
+    user = "sa"
+    password = "Password!"
+    generate_sink_sql = true
+    database = "column_type_test"
+    table = "dbo.full_types_sink"
+    batch_size = 1
+    primary_keys = ["id"]
+  }
+}
\ No newline at end of file


Reply via email to