This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3cb34c2b71 [Feature][Oracle-CDC] Support read no primary key table
(#6209)
3cb34c2b71 is described below
commit 3cb34c2b71f18e08cbc89dfdab3493cb9434ff6c
Author: hailin0 <[email protected]>
AuthorDate: Mon Jan 15 21:27:27 2024 +0800
[Feature][Oracle-CDC] Support read no primary key table (#6209)
---
.../fetch/scan/OracleSnapshotSplitReadTask.java | 2 +-
.../seatunnel/cdc/oracle/utils/OracleUtils.java | 3 +-
.../seatunnel/cdc/oracle/OracleCDCIT.java | 46 +++++++++++++++
.../src/test/resources/ddl/column_type_test.sql | 53 +++++++++++++++++
.../oraclecdc_to_oracle_with_no_primary_key.conf | 66 ++++++++++++++++++++++
5 files changed, 168 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
index ef7dc5776a..1d48aa7f7e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotSplitReadTask.java
@@ -218,7 +218,7 @@ public class OracleSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSour
snapshotSplit.getSplitEnd() == null,
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
-
snapshotSplit.getSplitKeyType().getTotalFields(),
+ snapshotSplit.getSplitKeyType(),
connectorConfig.getSnapshotFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
index f04baa1199..d5f9d9a8fb 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
@@ -293,13 +293,14 @@ public class OracleUtils {
boolean isLastSplit,
Object[] splitStart,
Object[] splitEnd,
- int primaryKeyNum,
+ SeaTunnelRowType splitKeyType,
int fetchSize) {
try {
final PreparedStatement statement = initStatement(jdbc, sql,
fetchSize);
if (isFirstSplit && isLastSplit) {
return statement;
}
+ int primaryKeyNum = splitKeyType.getTotalFields();
if (isFirstSplit) {
for (int i = 0; i < primaryKeyNum; i++) {
statement.setObject(i + 1, splitEnd[i]);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 34e0a9d09d..d86114e1cb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -99,6 +99,7 @@ public class OracleCDCIT extends TestSuiteBase implements
TestResource {
private static final String DATABASE = "DEBEZIUM";
private static final String SOURCE_TABLE1 = "FULL_TYPES";
private static final String SOURCE_TABLE2 = "FULL_TYPES2";
+ private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
"FULL_TYPES_NO_PRIMARY_KEY";
private static final String SINK_TABLE1 = "SINK_FULL_TYPES";
private static final String SINK_TABLE2 = "SINK_FULL_TYPES2";
@@ -175,6 +176,51 @@ public class OracleCDCIT extends TestSuiteBase implements
TestResource {
});
}
+ @TestTemplate
+ public void testOracleCdcCheckDataWithNoPrimaryKey(TestContainer
container) throws Exception {
+
+ clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+ clearTable(DATABASE, SINK_TABLE1);
+
+ insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/oraclecdc_to_oracle_with_no_primary_key.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // snapshot stage
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(
+ getSourceQuerySQL(
+ DATABASE,
SOURCE_TABLE_NO_PRIMARY_KEY)),
+ querySql(getSourceQuerySQL(DATABASE,
SINK_TABLE1)));
+ });
+
+ // insert update delete
+ updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+ // stream stage
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(
+ getSourceQuerySQL(
+ DATABASE,
SOURCE_TABLE_NO_PRIMARY_KEY)),
+ querySql(getSourceQuerySQL(DATABASE,
SINK_TABLE1)));
+ });
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
index 57610b7b15..e334cc0647 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/ddl/column_type_test.sql
@@ -102,6 +102,46 @@ create table DEBEZIUM.FULL_TYPES2 (
primary key (ID)
);
+create table DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY (
+ ID NUMBER(9)
not null,
+ VAL_VARCHAR
VARCHAR2(1000),
+ VAL_VARCHAR2
VARCHAR2(1000),
+ VAL_NVARCHAR2
NVARCHAR2(1000),
+ VAL_CHAR CHAR(3),
+ VAL_NCHAR NCHAR(3),
+ VAL_BF
BINARY_FLOAT,
+ VAL_BD
BINARY_DOUBLE,
+ VAL_F FLOAT,
+ VAL_F_10 FLOAT(10),
+ VAL_NUM NUMBER(10,
6),
+ VAL_DP FLOAT,
+ VAL_R FLOAT(63),
+ VAL_DECIMAL NUMBER(10,
6),
+ VAL_NUMERIC NUMBER(10,
6),
+ VAL_NUM_VS NUMBER,
+ VAL_INT NUMBER,
+ VAL_INTEGER NUMBER,
+ VAL_SMALLINT NUMBER,
+ VAL_NUMBER_38_NO_SCALE NUMBER(38),
+ VAL_NUMBER_38_SCALE_0 NUMBER(38),
+ VAL_NUMBER_1 NUMBER(1),
+ VAL_NUMBER_2 NUMBER(2),
+ VAL_NUMBER_4 NUMBER(4),
+ VAL_NUMBER_9 NUMBER(9),
+ VAL_NUMBER_18 NUMBER(18),
+ VAL_NUMBER_2_NEGATIVE_SCALE NUMBER(1,
-1),
+ VAL_NUMBER_4_NEGATIVE_SCALE NUMBER(2,
-2),
+ VAL_NUMBER_9_NEGATIVE_SCALE NUMBER(8,
-1),
+ VAL_NUMBER_18_NEGATIVE_SCALE NUMBER(16,
-2),
+ VAL_NUMBER_36_NEGATIVE_SCALE NUMBER(36,
-2),
+ VAL_DATE DATE,
+ VAL_TS
TIMESTAMP(6),
+ VAL_TS_PRECISION2
TIMESTAMP(2),
+ VAL_TS_PRECISION4
TIMESTAMP(4),
+ VAL_TS_PRECISION9
TIMESTAMP(6),
+ VAL_TSLTZ
TIMESTAMP(6) WITH LOCAL TIME ZONE
+);
+
INSERT INTO DEBEZIUM.FULL_TYPES VALUES (
1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',
1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891,
77.323,
@@ -128,6 +168,19 @@ INSERT INTO DEBEZIUM.FULL_TYPES2 VALUES (
TO_TIMESTAMP_TZ('2022-10-30
01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5')
);
+INSERT INTO DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY VALUES (
+ 1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',
+ 1.1, 2.22, 3.33, 8.888, 4.4444,
5.555, 6.66, 1234.567891, 1234.567891, 77.323,
+ 1, 22, 333, 4444, 5555, 1, 99,
9999, 999999999, 999999999999999999,
+ 94, 9949, 999999994,
999999999999999949, 99999999999999999999999999999999999949,
+ TO_DATE('2022-10-30',
'yyyy-mm-dd'),
+ TO_TIMESTAMP('2022-10-30
12:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'),
+ TO_TIMESTAMP('2022-10-30
12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),
+ TO_TIMESTAMP('2022-10-30
12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),
+ TO_TIMESTAMP('2022-10-30
12:34:56.125456789', 'yyyy-mm-dd HH24:MI:SS.FF9'),
+ TO_TIMESTAMP_TZ('2022-10-30
01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5')
+ );
+
create table DEBEZIUM.SINK_FULL_TYPES (
ID
NUMBER(9) not null,
VAL_VARCHAR
VARCHAR2(1000),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_no_primary_key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_no_primary_key.conf
new file mode 100644
index 0000000000..f6e46b637a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_no_primary_key.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Oracle-CDC {
+ result_table_name = "customers"
+ username = "system"
+ password = "oracle"
+ database-names = ["XE"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"]
+ base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ source.reader.close.timeout = 120000
+ connection.pool.size = 1
+ debezium {
+ # log.mining.strategy = "online_catalog"
+ # log.mining.continuous.mine = true
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+
+ exactly_once = false
+ }
+}
+
+transform {
+}
+
+sink {
+ Jdbc {
+ source_table_name = "customers"
+ driver = "oracle.jdbc.driver.OracleDriver"
+ url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ user = "system"
+ password = "oracle"
+ generate_sink_sql = true
+ database = "XE"
+ table = "DEBEZIUM.SINK_FULL_TYPES"
+ batch_size = 1
+ primary_keys = ["ID"]
+ }
+}
\ No newline at end of file