This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 7f5671d2c [Hotfix][CDC] Fix multiple-table data read (#4200)
7f5671d2c is described below
commit 7f5671d2ce3e854e7aefd695c38ad6d485d9c4fa
Author: hailin0 <[email protected]>
AuthorDate: Fri Feb 24 14:54:51 2023 +0800
[Hotfix][CDC] Fix multiple-table data read (#4200)
---
.../external/IncrementalSourceScanFetcher.java | 4 +--
.../seatunnel/cdc/mysql/source/MySqlDialect.java | 16 +---------
.../reader/fetch/MySqlSourceFetchTaskContext.java | 36 +++++++++++++++------
.../sqlserver/source/source/SqlServerDialect.java | 17 +---------
.../fetch/SqlServerSourceFetchTaskContext.java | 37 +++++++++++++++-------
5 files changed, 55 insertions(+), 55 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 595d746eb..fb5342b4f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -35,8 +35,8 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -118,7 +118,7 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
boolean reachChangeLogEnd = false;
SourceRecord lowWatermark = null;
SourceRecord highWatermark = null;
- Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
+ Map<Struct, SourceRecord> outputBuffer = new LinkedHashMap<>();
while (!reachChangeLogEnd) {
checkReadException();
List<DataChangeEvent> batch = queue.poll();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 590a000ff..313a0d5ca 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -27,8 +27,6 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
@@ -46,7 +44,6 @@ import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.List;
/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -111,19 +108,8 @@ public class MySqlDialect implements JdbcDataSourceDialect
{
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(taskSourceConfig.getDbzConfiguration());
- List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- tableChangeList.add(queryTableSchema(jdbcConnection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- for (TableId tableId : incrementalSplit.getTableIds()) {
- tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
- }
- }
return new MySqlSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, binaryLogClient,
tableChangeList);
+ taskSourceConfig, this, jdbcConnection, binaryLogClient);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index 41ea3e5b1..d9e7565b1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -25,6 +25,8 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
@@ -62,7 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -86,31 +88,26 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
private ChangeEventQueue<DataChangeEvent> queue;
private MySqlErrorHandler errorHandler;
- private Collection<TableChanges.TableChange> engineHistory;
public MySqlSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
MySqlConnection connection,
- BinaryLogClient binaryLogClient,
- Collection<TableChanges.TableChange> engineHistory) {
+ BinaryLogClient binaryLogClient) {
super(sourceConfig, dataSourceDialect);
this.connection = connection;
this.binaryLogClient = binaryLogClient;
this.metadataProvider = new MySqlEventMetadataProvider();
- this.engineHistory = engineHistory;
}
@Override
public void configure(SourceSplitBase sourceSplitBase) {
+ registerDatabaseHistory(sourceSplitBase);
+
// initial stateful objects
final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
final boolean tableIdCaseInsensitive =
connection.isTableIdCaseSensitive();
this.topicSelector =
MySqlTopicSelector.defaultSelector(connectorConfig);
- EmbeddedDatabaseHistory.registerHistory(
- sourceConfig
- .getDbzConfiguration()
-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
- engineHistory);
+
this.databaseSchema =
MySqlUtils.createMySqlDatabaseSchema(connectorConfig,
tableIdCaseInsensitive);
this.offsetContext =
@@ -283,6 +280,25 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
schema.recover(offset);
}
+ private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
+ List<TableChanges.TableChange> engineHistory = new ArrayList<>();
+ // TODO: support save table schema
+ if (sourceSplitBase instanceof SnapshotSplit) {
+ SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+ engineHistory.add(dataSourceDialect.queryTableSchema(connection,
snapshotSplit.getTableId()));
+ } else {
+ IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
+ for (TableId tableId : incrementalSplit.getTableIds()) {
+
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
+ }
+ }
+
+ EmbeddedDatabaseHistory.registerHistory(
+ sourceConfig.getDbzConfiguration()
+
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+ engineHistory);
+ }
+
/**
* A subclass implementation of {@link MySqlTaskContext} which reuses one
BinaryLogClient.
*/
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index f439c07bb..6a0395dd9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -26,8 +26,6 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
@@ -44,7 +42,6 @@ import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.List;
/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -113,20 +110,8 @@ public class SqlServerDialect implements
JdbcDataSourceDialect {
final SqlServerConnection metaDataConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
- List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- tableChangeList.add(queryTableSchema(jdbcConnection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- for (TableId tableId : incrementalSplit.getTableIds()) {
- tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
- }
- }
-
return new SqlServerSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, metaDataConnection,
tableChangeList);
+ taskSourceConfig, this, jdbcConnection, metaDataConnection);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index 80f0761e3..7b66cb0dd 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -23,6 +23,8 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
@@ -57,7 +59,8 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.time.Instant;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
@@ -80,29 +83,20 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
- private Collection<TableChanges.TableChange> engineHistory;
-
public SqlServerSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
SqlServerConnection dataConnection,
- SqlServerConnection metadataConnection,
- Collection<TableChanges.TableChange> engineHistory) {
+ SqlServerConnection metadataConnection) {
super(sourceConfig, dataSourceDialect);
this.dataConnection = dataConnection;
this.metadataConnection = metadataConnection;
this.metadataProvider = new SqlServerEventMetadataProvider();
- this.engineHistory = engineHistory;
}
@Override
public void configure(SourceSplitBase sourceSplitBase) {
-
- EmbeddedDatabaseHistory.registerHistory(
- sourceConfig
- .getDbzConfiguration()
-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
- engineHistory);
+ registerDatabaseHistory(sourceSplitBase);
// initial stateful objects
final SqlServerConnectorConfig connectorConfig =
getDbzConnectorConfig();
@@ -242,6 +236,25 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
return sqlServerOffsetContext;
}
+ private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
+ List<TableChanges.TableChange> engineHistory = new ArrayList<>();
+ // TODO: support save table schema
+ if (sourceSplitBase instanceof SnapshotSplit) {
+ SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection,
snapshotSplit.getTableId()));
+ } else {
+ IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
+ for (TableId tableId : incrementalSplit.getTableIds()) {
+
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
+ }
+ }
+
+ EmbeddedDatabaseHistory.registerHistory(
+ sourceConfig.getDbzConfiguration()
+
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+ engineHistory);
+ }
+
public static class SqlServerEventMetadataProvider implements
EventMetadataProvider {
@Override