This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 7f5671d2c [Hotfix][CDC] Fix multiple-table data read (#4200)
7f5671d2c is described below

commit 7f5671d2ce3e854e7aefd695c38ad6d485d9c4fa
Author: hailin0 <[email protected]>
AuthorDate: Fri Feb 24 14:54:51 2023 +0800

    [Hotfix][CDC] Fix multiple-table data read (#4200)
---
 .../external/IncrementalSourceScanFetcher.java     |  4 +--
 .../seatunnel/cdc/mysql/source/MySqlDialect.java   | 16 +---------
 .../reader/fetch/MySqlSourceFetchTaskContext.java  | 36 +++++++++++++++------
 .../sqlserver/source/source/SqlServerDialect.java  | 17 +---------
 .../fetch/SqlServerSourceFetchTaskContext.java     | 37 +++++++++++++++-------
 5 files changed, 55 insertions(+), 55 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 595d746eb..fb5342b4f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -35,8 +35,8 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -118,7 +118,7 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
             boolean reachChangeLogEnd = false;
             SourceRecord lowWatermark = null;
             SourceRecord highWatermark = null;
-            Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
+            Map<Struct, SourceRecord> outputBuffer = new LinkedHashMap<>();
             while (!reachChangeLogEnd) {
                 checkReadException();
                 List<DataChangeEvent> batch = queue.poll();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 590a000ff..313a0d5ca 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -27,8 +27,6 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
@@ -46,7 +44,6 @@ import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.List;
 
 /** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -111,19 +108,8 @@ public class MySqlDialect implements JdbcDataSourceDialect 
{
                 createMySqlConnection(taskSourceConfig.getDbzConfiguration());
         final BinaryLogClient binaryLogClient =
                 createBinaryClient(taskSourceConfig.getDbzConfiguration());
-        List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
-        // TODO: support save table schema
-        if (sourceSplitBase instanceof SnapshotSplit) {
-            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
-            tableChangeList.add(queryTableSchema(jdbcConnection, 
snapshotSplit.getTableId()));
-        } else {
-            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
-            for (TableId tableId : incrementalSplit.getTableIds()) {
-                tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
-            }
-        }
         return new MySqlSourceFetchTaskContext(
-                taskSourceConfig, this, jdbcConnection, binaryLogClient, 
tableChangeList);
+                taskSourceConfig, this, jdbcConnection, binaryLogClient);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index 41ea3e5b1..d9e7565b1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -25,6 +25,8 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
@@ -62,7 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Instant;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -86,31 +88,26 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
     private ChangeEventQueue<DataChangeEvent> queue;
     private MySqlErrorHandler errorHandler;
 
-    private Collection<TableChanges.TableChange> engineHistory;
     public MySqlSourceFetchTaskContext(
         JdbcSourceConfig sourceConfig,
         JdbcDataSourceDialect dataSourceDialect,
         MySqlConnection connection,
-        BinaryLogClient binaryLogClient,
-        Collection<TableChanges.TableChange> engineHistory) {
+        BinaryLogClient binaryLogClient) {
         super(sourceConfig, dataSourceDialect);
         this.connection = connection;
         this.binaryLogClient = binaryLogClient;
         this.metadataProvider = new MySqlEventMetadataProvider();
-        this.engineHistory = engineHistory;
     }
 
     @Override
     public void configure(SourceSplitBase sourceSplitBase) {
+        registerDatabaseHistory(sourceSplitBase);
+
         // initial stateful objects
         final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
         final boolean tableIdCaseInsensitive = 
connection.isTableIdCaseSensitive();
         this.topicSelector = 
MySqlTopicSelector.defaultSelector(connectorConfig);
-        EmbeddedDatabaseHistory.registerHistory(
-            sourceConfig
-                .getDbzConfiguration()
-                
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
-            engineHistory);
+
         this.databaseSchema =
             MySqlUtils.createMySqlDatabaseSchema(connectorConfig, 
tableIdCaseInsensitive);
         this.offsetContext =
@@ -283,6 +280,25 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
         schema.recover(offset);
     }
 
+    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
+        List<TableChanges.TableChange> engineHistory = new ArrayList<>();
+        // TODO: support save table schema
+        if (sourceSplitBase instanceof SnapshotSplit) {
+            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+            engineHistory.add(dataSourceDialect.queryTableSchema(connection, 
snapshotSplit.getTableId()));
+        } else {
+            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
+            for (TableId tableId : incrementalSplit.getTableIds()) {
+                
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
+            }
+        }
+
+        EmbeddedDatabaseHistory.registerHistory(
+            sourceConfig.getDbzConfiguration()
+                
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+            engineHistory);
+    }
+
     /**
      * A subclass implementation of {@link MySqlTaskContext} which reuses one 
BinaryLogClient.
      */
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index f439c07bb..6a0395dd9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -26,8 +26,6 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
@@ -44,7 +42,6 @@ import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.List;
 
 /** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -113,20 +110,8 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
         final SqlServerConnection metaDataConnection =
             createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
 
-        List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
-        // TODO: support save table schema
-        if (sourceSplitBase instanceof SnapshotSplit) {
-            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
-            tableChangeList.add(queryTableSchema(jdbcConnection, 
snapshotSplit.getTableId()));
-        } else {
-            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
-            for (TableId tableId : incrementalSplit.getTableIds()) {
-                tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
-            }
-        }
-
         return new SqlServerSourceFetchTaskContext(
-            taskSourceConfig, this, jdbcConnection, metaDataConnection, 
tableChangeList);
+            taskSourceConfig, this, jdbcConnection, metaDataConnection);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index 80f0761e3..7b66cb0dd 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -23,6 +23,8 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
@@ -57,7 +59,8 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.time.Instant;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -80,29 +83,20 @@ public class SqlServerSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext
 
     private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
 
-    private Collection<TableChanges.TableChange> engineHistory;
-
     public SqlServerSourceFetchTaskContext(
         JdbcSourceConfig sourceConfig,
         JdbcDataSourceDialect dataSourceDialect,
         SqlServerConnection dataConnection,
-        SqlServerConnection metadataConnection,
-        Collection<TableChanges.TableChange> engineHistory) {
+        SqlServerConnection metadataConnection) {
         super(sourceConfig, dataSourceDialect);
         this.dataConnection = dataConnection;
         this.metadataConnection = metadataConnection;
         this.metadataProvider = new SqlServerEventMetadataProvider();
-        this.engineHistory = engineHistory;
     }
 
     @Override
     public void configure(SourceSplitBase sourceSplitBase) {
-
-        EmbeddedDatabaseHistory.registerHistory(
-            sourceConfig
-                .getDbzConfiguration()
-                
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
-            engineHistory);
+        registerDatabaseHistory(sourceSplitBase);
 
         // initial stateful objects
         final SqlServerConnectorConfig connectorConfig = 
getDbzConnectorConfig();
@@ -242,6 +236,25 @@ public class SqlServerSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext
         return sqlServerOffsetContext;
     }
 
+    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
+        List<TableChanges.TableChange> engineHistory = new ArrayList<>();
+        // TODO: support save table schema
+        if (sourceSplitBase instanceof SnapshotSplit) {
+            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+            
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, 
snapshotSplit.getTableId()));
+        } else {
+            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
+            for (TableId tableId : incrementalSplit.getTableIds()) {
+                
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
+            }
+        }
+
+        EmbeddedDatabaseHistory.registerHistory(
+            sourceConfig.getDbzConfiguration()
+                
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+            engineHistory);
+    }
+
     public static class SqlServerEventMetadataProvider implements 
EventMetadataProvider {
 
         @Override

Reply via email to