hailin0 commented on code in PR #4200:
URL:
https://github.com/apache/incubator-seatunnel/pull/4200#discussion_r1115741977
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java:
##########
@@ -111,19 +108,8 @@ public MySqlSourceFetchTaskContext createFetchTaskContext(
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(taskSourceConfig.getDbzConfiguration());
- List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- tableChangeList.add(queryTableSchema(jdbcConnection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- for (TableId tableId : incrementalSplit.getTableIds()) {
- tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
- }
- }
return new MySqlSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, binaryLogClient,
tableChangeList);
Review Comment:
move to FetchTaskContext
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java:
##########
@@ -113,20 +110,8 @@ public SqlServerSourceFetchTaskContext
createFetchTaskContext(
final SqlServerConnection metaDataConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
- List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
- // TODO: support save table schema
- if (sourceSplitBase instanceof SnapshotSplit) {
- SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
- tableChangeList.add(queryTableSchema(jdbcConnection,
snapshotSplit.getTableId()));
- } else {
- IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
- for (TableId tableId : incrementalSplit.getTableIds()) {
- tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
- }
- }
-
return new SqlServerSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, metaDataConnection,
tableChangeList);
Review Comment:
move to FetchTaskContext
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java:
##########
@@ -118,7 +118,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws
InterruptedException {
boolean reachChangeLogEnd = false;
SourceRecord lowWatermark = null;
SourceRecord highWatermark = null;
- Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
+ Map<Struct, SourceRecord> outputBuffer = new LinkedHashMap<>();
Review Comment:
Fix data disorder
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]