This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b922bb90e6 [Improve][CDC] Extract duplicate code (#8906)
b922bb90e6 is described below
commit b922bb90e65614a034cf5c2e5d2c5d180b76b6c2
Author: hailin0 <[email protected]>
AuthorDate: Thu Mar 6 16:42:47 2025 +0800
[Improve][CDC] Extract duplicate code (#8906)
---
.../external/JdbcSourceFetchTaskContext.java | 53 ++++++++++++++++++++++
.../reader/fetch/MySqlSourceFetchTaskContext.java | 53 +---------------------
.../reader/fetch/OracleSourceFetchTaskContext.java | 53 +---------------------
.../reader/PostgresSourceFetchTaskContext.java | 28 +-----------
.../fetch/SqlServerSourceFetchTaskContext.java | 29 +-----------
5 files changed, 57 insertions(+), 159 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
index 48380d024f..e8d058f2a2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -22,26 +22,35 @@ import
org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import
org.apache.seatunnel.connectors.cdc.debezium.ConnectTableChangeSerializer;
+import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
+import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope;
+import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -155,6 +164,50 @@ public abstract class JdbcSourceFetchTaskContext
implements FetchTask.Context {
.collect(Collectors.toList());
}
+ protected void registerDatabaseHistory(
+ SourceSplitBase sourceSplitBase, JdbcConnection connection) {
+ List<TableChanges.TableChange> engineHistory = new ArrayList<>();
+ // TODO: support save table schema
+ if (sourceSplitBase instanceof SnapshotSplit) {
+ SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+ engineHistory.add(
+ dataSourceDialect.queryTableSchema(connection,
snapshotSplit.getTableId()));
+ } else {
+ IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
+ Map<TableId, byte[]> historyTableChanges =
incrementalSplit.getHistoryTableChanges();
+ for (TableId tableId : incrementalSplit.getTableIds()) {
+ if (historyTableChanges != null &&
historyTableChanges.containsKey(tableId)) {
+ SchemaAndValue schemaAndValue =
+ jsonConverter.toConnectData("topic",
historyTableChanges.get(tableId));
+ Struct deserializedStruct = (Struct)
schemaAndValue.value();
+
+ TableChanges tableChanges =
+ tableChangeSerializer.deserialize(
+
Collections.singletonList(deserializedStruct), false);
+
+ Iterator<TableChanges.TableChange> iterator =
tableChanges.iterator();
+ TableChanges.TableChange tableChange = null;
+ while (iterator.hasNext()) {
+ if (tableChange != null) {
+ throw new IllegalStateException(
+ "The table changes should only have one
element");
+ }
+ tableChange = iterator.next();
+ }
+ engineHistory.add(tableChange);
+ continue;
+ }
+
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
+ }
+ }
+
+ EmbeddedDatabaseHistory.registerHistory(
+ sourceConfig
+ .getDbzConfiguration()
+
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+ engineHistory);
+ }
+
public SourceConfig getSourceConfig() {
return sourceConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index 93e7cd1f2e..fcf25d50ea 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -24,16 +24,12 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
-import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -66,7 +62,6 @@ import
io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
-import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
@@ -75,9 +70,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -118,7 +110,7 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
@Override
public void configure(SourceSplitBase sourceSplitBase) {
- registerDatabaseHistory(sourceSplitBase);
+ super.registerDatabaseHistory(sourceSplitBase, connection);
// initial stateful objects
final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -385,49 +377,6 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
schema.recover(Offsets.of(mySqlPartition, offset));
}
- private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
- List<TableChanges.TableChange> engineHistory = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- engineHistory.add(
- dataSourceDialect.queryTableSchema(connection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- Map<TableId, byte[]> historyTableChanges =
incrementalSplit.getHistoryTableChanges();
- for (TableId tableId : incrementalSplit.getTableIds()) {
- if (historyTableChanges != null &&
historyTableChanges.containsKey(tableId)) {
- SchemaAndValue schemaAndValue =
- jsonConverter.toConnectData("topic",
historyTableChanges.get(tableId));
- Struct deserializedStruct = (Struct)
schemaAndValue.value();
-
- TableChanges tableChanges =
- tableChangeSerializer.deserialize(
-
Collections.singletonList(deserializedStruct), false);
-
- Iterator<TableChanges.TableChange> iterator =
tableChanges.iterator();
- TableChanges.TableChange tableChange = null;
- while (iterator.hasNext()) {
- if (tableChange != null) {
- throw new IllegalStateException(
- "The table changes should only have one
element");
- }
- tableChange = iterator.next();
- }
- engineHistory.add(tableChange);
- continue;
- }
-
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
- }
- }
-
- EmbeddedDatabaseHistory.registerHistory(
- sourceConfig
- .getDbzConfiguration()
-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
- engineHistory);
- }
-
/** A subclass implementation of {@link MySqlTaskContext} which reuses one
BinaryLogClient. */
public class MySqlTaskContextImpl extends MySqlTaskContext {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index 942532a7f6..b7a5dc9c17 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -23,15 +23,11 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
-import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -66,11 +62,7 @@ import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
@@ -105,7 +97,7 @@ public class OracleSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
@Override
public void configure(SourceSplitBase sourceSplitBase) {
// Initializes the table schema
- registerDatabaseHistory(sourceSplitBase);
+ super.registerDatabaseHistory(sourceSplitBase, connection);
// initial stateful objects
final OracleConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -258,49 +250,6 @@ public class OracleSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
return oracleOffsetContext;
}
- private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
- List<TableChanges.TableChange> engineHistory = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- engineHistory.add(
- dataSourceDialect.queryTableSchema(connection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- Map<TableId, byte[]> historyTableChanges =
incrementalSplit.getHistoryTableChanges();
- for (TableId tableId : incrementalSplit.getTableIds()) {
- if (historyTableChanges != null &&
historyTableChanges.containsKey(tableId)) {
- SchemaAndValue schemaAndValue =
- jsonConverter.toConnectData("topic",
historyTableChanges.get(tableId));
- Struct deserializedStruct = (Struct)
schemaAndValue.value();
-
- TableChanges tableChanges =
- tableChangeSerializer.deserialize(
-
Collections.singletonList(deserializedStruct), false);
-
- Iterator<TableChanges.TableChange> iterator =
tableChanges.iterator();
- TableChanges.TableChange tableChange = null;
- while (iterator.hasNext()) {
- if (tableChange != null) {
- throw new IllegalStateException(
- "The table changes should only have one
element");
- }
- tableChange = iterator.next();
- }
- engineHistory.add(tableChange);
- continue;
- }
-
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
- }
- }
-
- EmbeddedDatabaseHistory.registerHistory(
- sourceConfig
- .getDbzConfiguration()
-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
- engineHistory);
- }
-
private void validateAndLoadDatabaseHistory(
OracleOffsetContext offset, OracleDatabaseSchema schema) {
schema.initializeStorage();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
index 2f4cb8a2da..3af8dbc63f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
@@ -24,10 +24,7 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.exception.PostgresConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
@@ -68,10 +65,8 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -129,7 +124,7 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
@Override
public void configure(SourceSplitBase sourceSplitBase) {
- registerDatabaseHistory(sourceSplitBase);
+ super.registerDatabaseHistory(sourceSplitBase, dataConnection);
// initial stateful objects
final PostgresConnectorConfig connectorConfig =
getDbzConnectorConfig();
@@ -276,27 +271,6 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
}
}
- private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
- List<TableChanges.TableChange> engineHistory = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- engineHistory.add(
- dataSourceDialect.queryTableSchema(dataConnection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- for (TableId tableId : incrementalSplit.getTableIds()) {
-
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
- }
- }
-
- EmbeddedDatabaseHistory.registerHistory(
- sourceConfig
- .getDbzConfiguration()
-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
- engineHistory);
- }
-
@Override
public PostgresSourceConfig getSourceConfig() {
return (PostgresSourceConfig) sourceConfig;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index ba72f8533d..9362b0a314 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -22,10 +22,7 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils;
@@ -55,7 +52,6 @@ import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
-import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
@@ -63,8 +59,6 @@ import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
/** The context for fetch task that fetching data of snapshot split from MySQL
data source. */
@@ -99,7 +93,7 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
@Override
public void configure(SourceSplitBase sourceSplitBase) {
- registerDatabaseHistory(sourceSplitBase);
+ super.registerDatabaseHistory(sourceSplitBase, dataConnection);
// initial stateful objects
final SqlServerConnectorConfig connectorConfig =
getDbzConnectorConfig();
@@ -282,27 +276,6 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
return sqlServerOffsetContext;
}
- private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
- List<TableChanges.TableChange> engineHistory = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- engineHistory.add(
- dataSourceDialect.queryTableSchema(dataConnection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- for (TableId tableId : incrementalSplit.getTableIds()) {
-
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
- }
- }
-
- EmbeddedDatabaseHistory.registerHistory(
- sourceConfig
- .getDbzConfiguration()
-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
- engineHistory);
- }
-
public static class SqlServerEventMetadataProvider implements
EventMetadataProvider {
@Override