This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9017df468 [INLONG-7857][Sort] Fix duplicated chunk happens when open 
scan-newly-added-table in mysql-cdc (#7859)
9017df468 is described below

commit 9017df4683b7fe4436a793dfd802fdbe4a4419eb
Author: Schnapps <[email protected]>
AuthorDate: Fri Apr 14 17:41:56 2023 +0800

    [INLONG-7857][Sort] Fix duplicated chunk happens when open 
scan-newly-added-table in mysql-cdc (#7859)
---
 .../sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index fb85c8c30..244de1f2a 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -230,6 +230,7 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
                                 synchronized (lock) {
                                     
remainingSplits.addAll(schemaLessSnapshotSplits);
                                     remainingTables.remove(nextTable);
+                                    
addAlreadyProcessedTablesIfNotExists(nextTable);
                                     lock.notify();
                                 }
                             }
@@ -249,7 +250,6 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
                 MySqlSchemalessSnapshotSplit split = iterator.next();
                 remainingSplits.remove(split);
                 assignedSplits.put(split.splitId(), split);
-                addAlreadyProcessedTablesIfNotExists(split.getTableId());
                 return Optional.of(
                         
split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
             } else if (!remainingTables.isEmpty()) {

Reply via email to