This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ae4240ca6b [Feature][Oracle-CDC] Support custom table primary key
(#6216)
ae4240ca6b is described below
commit ae4240ca6b2cf166d84226dcb00132963d300819
Author: hailin0 <[email protected]>
AuthorDate: Wed Jan 17 22:55:59 2024 +0800
[Feature][Oracle-CDC] Support custom table primary key (#6216)
---
docs/en/connector-v2/source/Oracle-CDC.md | 26 +++++++++
.../seatunnel/cdc/oracle/source/OracleDialect.java | 23 +++++++-
.../cdc/oracle/source/OracleIncrementalSource.java | 5 +-
.../source/OracleIncrementalSourceFactory.java | 14 ++++-
.../seatunnel/cdc/oracle/utils/OracleSchema.java | 11 +++-
.../seatunnel/cdc/oracle/OracleCDCIT.java | 46 +++++++++++++++
...raclecdc_to_oracle_with_custom_primary_key.conf | 68 ++++++++++++++++++++++
7 files changed, 186 insertions(+), 7 deletions(-)
diff --git a/docs/en/connector-v2/source/Oracle-CDC.md
b/docs/en/connector-v2/source/Oracle-CDC.md
index 02553b5a33..f34f6102f6 100644
--- a/docs/en/connector-v2/source/Oracle-CDC.md
+++ b/docs/en/connector-v2/source/Oracle-CDC.md
@@ -211,6 +211,7 @@ exit;
| database-names | List | No | -
| Database name of the database to monitor.
[...]
| schema-names | List | No | -
| Schema name of the database to monitor.
[...]
| table-names | List | Yes | -
| Table name of the database to monitor. The table name needs to include the
database name, for example: `database_name.table_name`
[...]
+| table-names-config | List | No | -
| Table config list. for example: [{"table":
"db1.schema1.table1","primaryKeys":["key1"]}]
[...]
| startup.mode | Enum | No |
INITIAL | Optional startup mode for Oracle CDC consumer, valid enumerations are
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize
historical data at startup, and then synchronize incremental data.<br/>
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup
from the latest offset.<br/> `specific`: Startup from user-supplied specific
offsets. [...]
| startup.specific-offset.file | String | No | -
| Start from the specified binlog file name. **Note, This option is required
when the `startup.mode` option used `specific`.**
[...]
| startup.specific-offset.pos | Long | No | -
| Start from the specified binlog file position. **Note, This option is
required when the `startup.mode` option used `specific`.**
[...]
@@ -254,6 +255,31 @@ source {
}
```
+### Support custom primary key for table
+
+```
+
+source {
+ Oracle-CDC {
+ result_table_name = "customers"
+ base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ source.reader.close.timeout = 120000
+ username = "system"
+ password = "oracle"
+ database-names = ["XE"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+ table-names-config = [
+ {
+ table = "XE.DEBEZIUM.FULL_TYPES"
+ primaryKeys = ["ID"]
+ }
+ ]
+ }
+}
+
+```
+
### Support debezium-compatible format send to kafka
> Must be used with kafka connector sink, see [compatible debezium
> format](../formats/cdc-compatible-debezium-json.md) for details
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
index a7823934f0..5ffef4cc3c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
@@ -24,6 +27,7 @@ import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnec
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.eumerator.OracleChunkSplitter;
@@ -40,6 +44,8 @@ import
io.debezium.relational.history.TableChanges.TableChange;
import java.sql.SQLException;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
@@ -49,10 +55,13 @@ public class OracleDialect implements JdbcDataSourceDialect
{
private final OracleSourceConfigFactory configFactory;
private final OracleSourceConfig sourceConfig;
private transient OracleSchema oracleSchema;
+ private final Map<TableId, CatalogTable> tableMap;
- public OracleDialect(OracleSourceConfigFactory configFactory) {
+ public OracleDialect(
+ OracleSourceConfigFactory configFactory, List<CatalogTable>
catalogTables) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.create(0);
+ this.tableMap = CatalogTableUtils.convertTables(catalogTables);
}
@Override
@@ -102,7 +111,7 @@ public class OracleDialect implements JdbcDataSourceDialect
{
@Override
public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (oracleSchema == null) {
- oracleSchema = new
OracleSchema(sourceConfig.getDbzConnectorConfig());
+ oracleSchema = new
OracleSchema(sourceConfig.getDbzConnectorConfig(), tableMap);
}
return oracleSchema.getTableSchema(jdbc, tableId);
}
@@ -121,4 +130,14 @@ public class OracleDialect implements
JdbcDataSourceDialect {
return new
OracleRedoLogFetchTask(sourceSplitBase.asIncrementalSplit());
}
}
+
+ @Override
+ public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection,
TableId tableId) {
+ return
Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey());
+ }
+
+ @Override
+ public List<ConstraintKey> getConstraintKeys(JdbcConnection
jdbcConnection, TableId tableId) {
+ return tableMap.get(tableId).getTableSchema().getConstraintKeys();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index 933c8cdc37..9bcd8f8845 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -121,7 +121,7 @@ public class OracleIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceC
@Override
public DataSourceDialect<JdbcSourceConfig>
createDataSourceDialect(ReadonlyConfig config) {
- return new OracleDialect((OracleSourceConfigFactory) configFactory);
+ return new OracleDialect((OracleSourceConfigFactory) configFactory,
catalogTables);
}
@Override
@@ -132,7 +132,8 @@ public class OracleIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceC
private Map<TableId, Struct> tableChanges() {
JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
- OracleDialect dialect = new OracleDialect((OracleSourceConfigFactory)
configFactory);
+ OracleDialect dialect =
+ new OracleDialect((OracleSourceConfigFactory) configFactory,
catalogTables);
List<TableId> discoverTables =
dialect.discoverDataCollections(jdbcSourceConfig);
ConnectTableChangeSerializer connectTableChangeSerializer =
new ConnectTableChangeSerializer();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
index f8f4fb3228..c80f0dc7ce 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
@@ -23,22 +23,26 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.List;
+import java.util.Optional;
@AutoService(Factory.class)
public class OracleIncrementalSourceFactory implements TableSourceFactory {
@@ -65,7 +69,8 @@ public class OracleIncrementalSourceFactory implements
TableSourceFactory {
JdbcSourceOptions.CONNECTION_POOL_SIZE,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
- JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
+ JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
+ JdbcSourceOptions.TABLE_NAMES_CONFIG)
.optional(OracleSourceOptions.STARTUP_MODE,
OracleSourceOptions.STOP_MODE)
.conditional(
OracleSourceOptions.STARTUP_MODE,
@@ -102,6 +107,13 @@ public class OracleIncrementalSourceFactory implements
TableSourceFactory {
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
+ Optional<List<JdbcSourceTableConfig>> tableConfigs =
+
context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
+ if (tableConfigs.isPresent()) {
+ catalogTables =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ catalogTables, tableConfigs.get(), s ->
TablePath.of(s, true));
+ }
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return new OracleIncrementalSource(context.getOptions(), dataType,
catalogTables);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
index 27cdb94663..6524192845 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
@@ -37,10 +39,13 @@ public class OracleSchema {
private final OracleConnectorConfig connectorConfig;
private final Map<TableId, TableChange> schemasByTableId;
+ private final Map<TableId, CatalogTable> tableMap;
- public OracleSchema(OracleConnectorConfig connectorConfig) {
+ public OracleSchema(
+ OracleConnectorConfig connectorConfig, Map<TableId, CatalogTable>
tableMap) {
this.connectorConfig = connectorConfig;
this.schemasByTableId = new HashMap<>();
+ this.tableMap = tableMap;
}
/**
@@ -71,7 +76,9 @@ public class OracleSchema {
null,
false);
- Table table = tables.forTable(tableId);
+ Table table =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ tables.forTable(tableId), tableMap.get(tableId));
TableChange tableChange = new
TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
} catch (SQLException e) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index d86114e1cb..a71b82852b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -221,6 +221,52 @@ public class OracleCDCIT extends TestSuiteBase implements
TestResource {
});
}
+ @TestTemplate
+ public void testOracleCdcCheckDataWithCustomPrimaryKey(TestContainer
container)
+ throws Exception {
+
+ clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+ clearTable(DATABASE, SINK_TABLE1);
+
+ insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/oraclecdc_to_oracle_with_custom_primary_key.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // snapshot stage
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(
+ getSourceQuerySQL(
+ DATABASE,
SOURCE_TABLE_NO_PRIMARY_KEY)),
+ querySql(getSourceQuerySQL(DATABASE,
SINK_TABLE1)));
+ });
+
+ // insert update delete
+ updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+ // stream stage
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ querySql(
+ getSourceQuerySQL(
+ DATABASE,
SOURCE_TABLE_NO_PRIMARY_KEY)),
+ querySql(getSourceQuerySQL(DATABASE,
SINK_TABLE1)));
+ });
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_custom_primary_key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_custom_primary_key.conf
new file mode 100644
index 0000000000..2b6a189ba6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_custom_primary_key.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Oracle-CDC {
+ result_table_name = "customers"
+ username = "system"
+ password = "oracle"
+ database-names = ["XE"]
+ schema-names = ["DEBEZIUM"]
+ base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ source.reader.close.timeout = 120000
+ connection.pool.size = 1
+ debezium {
+ # log.mining.strategy = "online_catalog"
+ # log.mining.continuous.mine = true
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+
+ table-names = ["XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"]
+ table-names-config = [
+ {
+ table = "XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"
+ primaryKeys = ["ID"]
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ source_table_name = "customers"
+ driver = "oracle.jdbc.driver.OracleDriver"
+ url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ user = "system"
+ password = "oracle"
+ generate_sink_sql = true
+ database = "XE"
+ table = "DEBEZIUM.SINK_FULL_TYPES"
+ batch_size = 1
+ primary_keys = ["ID"]
+ }
+}
\ No newline at end of file