This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ae4240ca6b [Feature][Oracle-CDC] Support custom table primary key 
(#6216)
ae4240ca6b is described below

commit ae4240ca6b2cf166d84226dcb00132963d300819
Author: hailin0 <[email protected]>
AuthorDate: Wed Jan 17 22:55:59 2024 +0800

    [Feature][Oracle-CDC] Support custom table primary key (#6216)
---
 docs/en/connector-v2/source/Oracle-CDC.md          | 26 +++++++++
 .../seatunnel/cdc/oracle/source/OracleDialect.java | 23 +++++++-
 .../cdc/oracle/source/OracleIncrementalSource.java |  5 +-
 .../source/OracleIncrementalSourceFactory.java     | 14 ++++-
 .../seatunnel/cdc/oracle/utils/OracleSchema.java   | 11 +++-
 .../seatunnel/cdc/oracle/OracleCDCIT.java          | 46 +++++++++++++++
 ...raclecdc_to_oracle_with_custom_primary_key.conf | 68 ++++++++++++++++++++++
 7 files changed, 186 insertions(+), 7 deletions(-)

diff --git a/docs/en/connector-v2/source/Oracle-CDC.md 
b/docs/en/connector-v2/source/Oracle-CDC.md
index 02553b5a33..f34f6102f6 100644
--- a/docs/en/connector-v2/source/Oracle-CDC.md
+++ b/docs/en/connector-v2/source/Oracle-CDC.md
@@ -211,6 +211,7 @@ exit;
 | database-names                                 | List     | No       | -     
  | Database name of the database to monitor.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | schema-names                                   | List     | No       | -     
  | Schema name of the database to monitor.                                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | table-names                                    | List     | Yes      | -     
  | Table name of the database to monitor. The table name needs to include the 
database name, for example: `database_name.table_name`                          
                                                                                
                                                                                
                                                                                
               [...]
+| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys":["key1"]}]                                   
                                                                                
                                                                                
                                                                                
                                               [...]
 | startup.mode                                   | Enum     | No       | 
INITIAL | Optional startup mode for Oracle CDC consumer, valid enumerations are 
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize 
historical data at startup, and then synchronize incremental data.<br/> 
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup 
from the latest offset.<br/> `specific`: Startup from user-supplied specific 
offsets.                            [...]
 | startup.specific-offset.file                   | String   | No       | -     
  | Start from the specified binlog file name. **Note, This option is required 
when the `startup.mode` option used `specific`.**                               
                                                                                
                                                                                
                                                                                
               [...]
 | startup.specific-offset.pos                    | Long     | No       | -     
  | Start from the specified binlog file position. **Note, This option is 
required when the `startup.mode` option used `specific`.**                      
                                                                                
                                                                                
                                                                                
                    [...]
@@ -254,6 +255,31 @@ source {
 }
 ```
 
+### Support custom primary key for table
+
+```
+
+source {
+  Oracle-CDC {
+    result_table_name = "customers"
+    base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    source.reader.close.timeout = 120000
+    username = "system"
+    password = "oracle"
+    database-names = ["XE"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+    table-names-config = [
+      {
+        table = "XE.DEBEZIUM.FULL_TYPES"
+        primaryKeys = ["ID"]
+      }
+    ]
+  }
+}
+
+```
+
 ### Support debezium-compatible format send to kafka
 
 > Must be used with kafka connector sink, see [compatible debezium 
 > format](../formats/cdc-compatible-debezium-json.md) for details
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
index a7823934f0..5ffef4cc3c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
@@ -24,6 +27,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnec
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.eumerator.OracleChunkSplitter;
@@ -40,6 +44,8 @@ import 
io.debezium.relational.history.TableChanges.TableChange;
 
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
 
@@ -49,10 +55,13 @@ public class OracleDialect implements JdbcDataSourceDialect 
{
     private final OracleSourceConfigFactory configFactory;
     private final OracleSourceConfig sourceConfig;
     private transient OracleSchema oracleSchema;
+    private final Map<TableId, CatalogTable> tableMap;
 
-    public OracleDialect(OracleSourceConfigFactory configFactory) {
+    public OracleDialect(
+            OracleSourceConfigFactory configFactory, List<CatalogTable> 
catalogTables) {
         this.configFactory = configFactory;
         this.sourceConfig = configFactory.create(0);
+        this.tableMap = CatalogTableUtils.convertTables(catalogTables);
     }
 
     @Override
@@ -102,7 +111,7 @@ public class OracleDialect implements JdbcDataSourceDialect 
{
     @Override
     public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
         if (oracleSchema == null) {
-            oracleSchema = new 
OracleSchema(sourceConfig.getDbzConnectorConfig());
+            oracleSchema = new 
OracleSchema(sourceConfig.getDbzConnectorConfig(), tableMap);
         }
         return oracleSchema.getTableSchema(jdbc, tableId);
     }
@@ -121,4 +130,14 @@ public class OracleDialect implements 
JdbcDataSourceDialect {
             return new 
OracleRedoLogFetchTask(sourceSplitBase.asIncrementalSplit());
         }
     }
+
+    @Override
+    public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, 
TableId tableId) {
+        return 
Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey());
+    }
+
+    @Override
+    public List<ConstraintKey> getConstraintKeys(JdbcConnection 
jdbcConnection, TableId tableId) {
+        return tableMap.get(tableId).getTableSchema().getConstraintKeys();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index 933c8cdc37..9bcd8f8845 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -121,7 +121,7 @@ public class OracleIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceC
 
     @Override
     public DataSourceDialect<JdbcSourceConfig> 
createDataSourceDialect(ReadonlyConfig config) {
-        return new OracleDialect((OracleSourceConfigFactory) configFactory);
+        return new OracleDialect((OracleSourceConfigFactory) configFactory, 
catalogTables);
     }
 
     @Override
@@ -132,7 +132,8 @@ public class OracleIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceC
 
     private Map<TableId, Struct> tableChanges() {
         JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
-        OracleDialect dialect = new OracleDialect((OracleSourceConfigFactory) 
configFactory);
+        OracleDialect dialect =
+                new OracleDialect((OracleSourceConfigFactory) configFactory, 
catalogTables);
         List<TableId> discoverTables = 
dialect.discoverDataCollections(jdbcSourceConfig);
         ConnectTableChangeSerializer connectTableChangeSerializer =
                 new ConnectTableChangeSerializer();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
index f8f4fb3228..c80f0dc7ce 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
@@ -23,22 +23,26 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Optional;
 
 @AutoService(Factory.class)
 public class OracleIncrementalSourceFactory implements TableSourceFactory {
@@ -65,7 +69,8 @@ public class OracleIncrementalSourceFactory implements 
TableSourceFactory {
                         JdbcSourceOptions.CONNECTION_POOL_SIZE,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
-                        JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
+                        JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
+                        JdbcSourceOptions.TABLE_NAMES_CONFIG)
                 .optional(OracleSourceOptions.STARTUP_MODE, 
OracleSourceOptions.STOP_MODE)
                 .conditional(
                         OracleSourceOptions.STARTUP_MODE,
@@ -102,6 +107,13 @@ public class OracleIncrementalSourceFactory implements 
TableSourceFactory {
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
+            Optional<List<JdbcSourceTableConfig>> tableConfigs =
+                    
context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
+            if (tableConfigs.isPresent()) {
+                catalogTables =
+                        CatalogTableUtils.mergeCatalogTableConfig(
+                                catalogTables, tableConfigs.get(), s -> 
TablePath.of(s, true));
+            }
             SeaTunnelDataType<SeaTunnelRow> dataType =
                     CatalogTableUtil.convertToMultipleRowType(catalogTables);
             return new OracleIncrementalSource(context.getOptions(), dataType, 
catalogTables);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
index 27cdb94663..6524192845 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 
 import io.debezium.connector.oracle.OracleConnection;
 import io.debezium.connector.oracle.OracleConnectorConfig;
@@ -37,10 +39,13 @@ public class OracleSchema {
 
     private final OracleConnectorConfig connectorConfig;
     private final Map<TableId, TableChange> schemasByTableId;
+    private final Map<TableId, CatalogTable> tableMap;
 
-    public OracleSchema(OracleConnectorConfig connectorConfig) {
+    public OracleSchema(
+            OracleConnectorConfig connectorConfig, Map<TableId, CatalogTable> 
tableMap) {
         this.connectorConfig = connectorConfig;
         this.schemasByTableId = new HashMap<>();
+        this.tableMap = tableMap;
     }
 
     /**
@@ -71,7 +76,9 @@ public class OracleSchema {
                     null,
                     false);
 
-            Table table = tables.forTable(tableId);
+            Table table =
+                    CatalogTableUtils.mergeCatalogTableConfig(
+                            tables.forTable(tableId), tableMap.get(tableId));
             TableChange tableChange = new 
TableChange(TableChanges.TableChangeType.CREATE, table);
             tableChangeMap.put(tableId, tableChange);
         } catch (SQLException e) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index d86114e1cb..a71b82852b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -221,6 +221,52 @@ public class OracleCDCIT extends TestSuiteBase implements 
TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testOracleCdcCheckDataWithCustomPrimaryKey(TestContainer 
container)
+            throws Exception {
+
+        clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+        clearTable(DATABASE, SINK_TABLE1);
+
+        insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/oraclecdc_to_oracle_with_custom_primary_key.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+
+        // snapshot stage
+        await().atMost(600000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(
+                                            getSourceQuerySQL(
+                                                    DATABASE, 
SOURCE_TABLE_NO_PRIMARY_KEY)),
+                                    querySql(getSourceQuerySQL(DATABASE, 
SINK_TABLE1)));
+                        });
+
+        // insert update delete
+        updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+        // stream stage
+        await().atMost(600000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(
+                                            getSourceQuerySQL(
+                                                    DATABASE, 
SOURCE_TABLE_NO_PRIMARY_KEY)),
+                                    querySql(getSourceQuerySQL(DATABASE, 
SINK_TABLE1)));
+                        });
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_custom_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_custom_primary_key.conf
new file mode 100644
index 0000000000..2b6a189ba6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_custom_primary_key.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Oracle-CDC {
+    result_table_name = "customers"
+    username = "system"
+    password = "oracle"
+    database-names = ["XE"]
+    schema-names = ["DEBEZIUM"]
+    base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    source.reader.close.timeout = 120000
+    connection.pool.size = 1
+    debezium {
+      #  log.mining.strategy = "online_catalog"
+      #  log.mining.continuous.mine = true
+        database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+
+    table-names = ["XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"]
+    table-names-config = [
+      {
+        table = "XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"
+        primaryKeys = ["ID"]
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    source_table_name = "customers"
+    driver = "oracle.jdbc.driver.OracleDriver"
+    url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    user = "system"
+    password = "oracle"
+    generate_sink_sql = true
+    database = "XE"
+    table = "DEBEZIUM.SINK_FULL_TYPES"
+    batch_size = 1
+    primary_keys = ["ID"]
+  }
+}
\ No newline at end of file

Reply via email to