This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3cb34c2b71 [Feature][Oracle-CDC] Support read no primary key table 
(#6209)
3cb34c2b71 is described below

commit 3cb34c2b71f18e08cbc89dfdab3493cb9434ff6c
Author: hailin0 <[email protected]>
AuthorDate: Mon Jan 15 21:27:27 2024 +0800

    [Feature][Oracle-CDC] Support read no primary key table (#6209)
---
 .../fetch/scan/OracleSnapshotSplitReadTask.java    |  2 +-
 .../seatunnel/cdc/oracle/utils/OracleUtils.java    |  3 +-
 .../seatunnel/cdc/oracle/OracleCDCIT.java          | 46 +++++++++++++++
 .../src/test/resources/ddl/column_type_test.sql    | 53 +++++++++++++++++
 .../oraclecdc_to_oracle_with_no_primary_key.conf   | 66 ++++++++++++++++++++++
 5 files changed, 168 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
index ef7dc5776a..1d48aa7f7e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
@@ -218,7 +218,7 @@ public class OracleSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSour
                                 snapshotSplit.getSplitEnd() == null,
                                 snapshotSplit.getSplitStart(),
                                 snapshotSplit.getSplitEnd(),
-                                
snapshotSplit.getSplitKeyType().getTotalFields(),
+                                snapshotSplit.getSplitKeyType(),
                                 connectorConfig.getSnapshotFetchSize());
                 ResultSet rs = selectStatement.executeQuery()) {
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
index f04baa1199..d5f9d9a8fb 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
@@ -293,13 +293,14 @@ public class OracleUtils {
             boolean isLastSplit,
             Object[] splitStart,
             Object[] splitEnd,
-            int primaryKeyNum,
+            SeaTunnelRowType splitKeyType,
             int fetchSize) {
         try {
             final PreparedStatement statement = initStatement(jdbc, sql, 
fetchSize);
             if (isFirstSplit && isLastSplit) {
                 return statement;
             }
+            int primaryKeyNum = splitKeyType.getTotalFields();
             if (isFirstSplit) {
                 for (int i = 0; i < primaryKeyNum; i++) {
                     statement.setObject(i + 1, splitEnd[i]);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 34e0a9d09d..d86114e1cb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -99,6 +99,7 @@ public class OracleCDCIT extends TestSuiteBase implements 
TestResource {
     private static final String DATABASE = "DEBEZIUM";
     private static final String SOURCE_TABLE1 = "FULL_TYPES";
     private static final String SOURCE_TABLE2 = "FULL_TYPES2";
+    private static final String SOURCE_TABLE_NO_PRIMARY_KEY = 
"FULL_TYPES_NO_PRIMARY_KEY";
 
     private static final String SINK_TABLE1 = "SINK_FULL_TYPES";
     private static final String SINK_TABLE2 = "SINK_FULL_TYPES2";
@@ -175,6 +176,51 @@ public class OracleCDCIT extends TestSuiteBase implements 
TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testOracleCdcCheckDataWithNoPrimaryKey(TestContainer 
container) throws Exception {
+
+        clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+        clearTable(DATABASE, SINK_TABLE1);
+
+        insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/oraclecdc_to_oracle_with_no_primary_key.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+
+        // snapshot stage
+        await().atMost(600000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(
+                                            getSourceQuerySQL(
+                                                    DATABASE, 
SOURCE_TABLE_NO_PRIMARY_KEY)),
+                                    querySql(getSourceQuerySQL(DATABASE, 
SINK_TABLE1)));
+                        });
+
+        // insert update delete
+        updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+        // stream stage
+        await().atMost(600000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(
+                                            getSourceQuerySQL(
+                                                    DATABASE, 
SOURCE_TABLE_NO_PRIMARY_KEY)),
+                                    querySql(getSourceQuerySQL(DATABASE, 
SINK_TABLE1)));
+                        });
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
index 57610b7b15..e334cc0647 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
@@ -102,6 +102,46 @@ create table DEBEZIUM.FULL_TYPES2 (
                                      primary key (ID)
 );
 
+create table DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY (
+                                      ID                           NUMBER(9) 
not null,
+                                      VAL_VARCHAR                  
VARCHAR2(1000),
+                                      VAL_VARCHAR2                 
VARCHAR2(1000),
+                                      VAL_NVARCHAR2                
NVARCHAR2(1000),
+                                      VAL_CHAR                     CHAR(3),
+                                      VAL_NCHAR                    NCHAR(3),
+                                      VAL_BF                       
BINARY_FLOAT,
+                                      VAL_BD                       
BINARY_DOUBLE,
+                                      VAL_F                        FLOAT,
+                                      VAL_F_10                     FLOAT(10),
+                                      VAL_NUM                      NUMBER(10, 
6),
+                                      VAL_DP                       FLOAT,
+                                      VAL_R                        FLOAT(63),
+                                      VAL_DECIMAL                  NUMBER(10, 
6),
+                                      VAL_NUMERIC                  NUMBER(10, 
6),
+                                      VAL_NUM_VS                   NUMBER,
+                                      VAL_INT                      NUMBER,
+                                      VAL_INTEGER                  NUMBER,
+                                      VAL_SMALLINT                 NUMBER,
+                                      VAL_NUMBER_38_NO_SCALE       NUMBER(38),
+                                      VAL_NUMBER_38_SCALE_0        NUMBER(38),
+                                      VAL_NUMBER_1                 NUMBER(1),
+                                      VAL_NUMBER_2                 NUMBER(2),
+                                      VAL_NUMBER_4                 NUMBER(4),
+                                      VAL_NUMBER_9                 NUMBER(9),
+                                      VAL_NUMBER_18                NUMBER(18),
+                                      VAL_NUMBER_2_NEGATIVE_SCALE  NUMBER(1, 
-1),
+                                      VAL_NUMBER_4_NEGATIVE_SCALE  NUMBER(2, 
-2),
+                                      VAL_NUMBER_9_NEGATIVE_SCALE  NUMBER(8, 
-1),
+                                      VAL_NUMBER_18_NEGATIVE_SCALE NUMBER(16, 
-2),
+                                      VAL_NUMBER_36_NEGATIVE_SCALE NUMBER(36, 
-2),
+                                      VAL_DATE                     DATE,
+                                      VAL_TS                       
TIMESTAMP(6),
+                                      VAL_TS_PRECISION2            
TIMESTAMP(2),
+                                      VAL_TS_PRECISION4            
TIMESTAMP(4),
+                                      VAL_TS_PRECISION9            
TIMESTAMP(6),
+                                      VAL_TSLTZ                    
TIMESTAMP(6) WITH LOCAL TIME ZONE
+);
+
 INSERT INTO DEBEZIUM.FULL_TYPES VALUES (
     1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',
     1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 
77.323,
@@ -128,6 +168,19 @@ INSERT INTO DEBEZIUM.FULL_TYPES2 VALUES (
                                            TO_TIMESTAMP_TZ('2022-10-30 
01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5')
                                        );
 
+INSERT INTO DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY VALUES (
+                                            1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',
+                                            1.1, 2.22, 3.33, 8.888, 4.4444, 
5.555, 6.66, 1234.567891, 1234.567891, 77.323,
+                                            1, 22, 333, 4444, 5555, 1, 99, 
9999, 999999999, 999999999999999999,
+                                            94, 9949, 999999994, 
999999999999999949, 99999999999999999999999999999999999949,
+                                            TO_DATE('2022-10-30', 
'yyyy-mm-dd'),
+                                            TO_TIMESTAMP('2022-10-30 
12:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'),
+                                            TO_TIMESTAMP('2022-10-30 
12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),
+                                            TO_TIMESTAMP('2022-10-30 
12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),
+                                            TO_TIMESTAMP('2022-10-30 
12:34:56.125456789', 'yyyy-mm-dd HH24:MI:SS.FF9'),
+                                            TO_TIMESTAMP_TZ('2022-10-30 
01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5')
+                                        );
+
 create table DEBEZIUM.SINK_FULL_TYPES (
                                           ID                           
NUMBER(9) not null,
                                           VAL_VARCHAR                  
VARCHAR2(1000),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_no_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_no_primary_key.conf
new file mode 100644
index 0000000000..f6e46b637a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_no_primary_key.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Oracle-CDC {
+    result_table_name = "customers"
+    username = "system"
+    password = "oracle"
+    database-names = ["XE"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"]
+    base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    source.reader.close.timeout = 120000
+    connection.pool.size = 1
+    debezium {
+      #  log.mining.strategy = "online_catalog"
+      #  log.mining.continuous.mine = true
+        database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+
+    exactly_once = false
+  }
+}
+
+transform {
+}
+
+sink {
+  Jdbc {
+    source_table_name = "customers"
+    driver = "oracle.jdbc.driver.OracleDriver"
+    url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    user = "system"
+    password = "oracle"
+    generate_sink_sql = true
+    database = "XE"
+    table = "DEBEZIUM.SINK_FULL_TYPES"
+    batch_size = 1
+    primary_keys = ["ID"]
+  }
+}
\ No newline at end of file

Reply via email to