This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 641cc72f2f [Improve][CDC] Filter ddl for snapshot phase (#8911)
641cc72f2f is described below
commit 641cc72f2fdb7d35b818c08c0bc252e7f1f2af7f
Author: hailin0 <[email protected]>
AuthorDate: Mon Mar 10 14:10:23 2025 +0800
[Improve][CDC] Filter ddl for snapshot phase (#8911)
---
.../cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java | 2 ++
.../cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java | 2 ++
2 files changed, 4 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
index 6fd98e5358..a9f0339e12 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
@@ -152,6 +153,7 @@ public class MySqlSnapshotFetchTask implements
FetchTask<SourceSplitBase> {
context.getSourceConfig()
.getDbzConfiguration()
.edit()
+ .with(MySqlSourceConfigFactory.SCHEMA_CHANGE_KEY,
"false")
.with("table.include.list",
split.getTableId().toString())
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java
index 8b6131eeb6..fa0e0dee5e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask;
@@ -159,6 +160,7 @@ public class OracleSnapshotFetchTask implements
FetchTask<SourceSplitBase> {
context.getSourceConfig()
.getDbzConfiguration()
.edit()
+ .with(OracleSourceConfigFactory.SCHEMA_CHANGE_KEY,
"false")
.with(
"table.include.list",
split.getTableId()