lvyanquan commented on code in PR #4087:
URL: https://github.com/apache/flink-cdc/pull/4087#discussion_r2958034741


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java:
##########
@@ -208,9 +206,7 @@ public void close() {
 
     private MySqlBinlogSplit createBinlogSplit() {
         final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
-                snapshotSplitAssigner.getAssignedSplits().values().stream()
-                        .sorted(Comparator.comparing(MySqlSplit::splitId))
-                        .collect(Collectors.toList());
+                new 
ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());

Review Comment:
   Thank you @morozov for the detailed explanation. However, I believe there's 
still a compatibility issue that wasn't fully addressed. Let me clarify my 
concern with a concrete **job upgrade scenario**.
   
   ## The Scenario
   
   Consider an **existing running job** that is being upgraded from the old 
version to the new version:
   
   ### Before Upgrade (Old Version)
   
   1. Splits are assigned in order: `split3 → split2 → split1`
   2. **Enumerator restores from checkpoint and sorts by split ID:**
      ```java
      // Old code sorts on restore
      .sorted(Entry.comparingByKey())
      ```
      Result: `assignedSplits = [split1, split2, split3]`
   
   3. **Reader has already received partial splits** based on the sorted order, 
e.g., `[split1, split2]`
   
   4. `finishedSnapshotSplitInfos` on Enumerator side is built from sorted 
order: `[split1, split2, split3]`
   
   5. If `chunk-meta.group.size = 2`, the partitions are:
      ```
      [
        0: [split1, split2]
        1: [split3]
      ]
      ```
   
   6. Reader has received group 0, next request will be for group 1 (`split3`)
   
   ### After Upgrade (New Version)
   
   1. **Job restarts from checkpoint and upgrades to new version**
   
   2. **Enumerator restores WITHOUT sorting:**
      ```java
      // New code does NOT sort on restore
      this.assignedSplits = assignedSplits;
      ```
      Result: `assignedSplits = [split3, split2, split1]` (preserves original 
insertion order)
   
   3. `finishedSnapshotSplitInfos` is now built from unsorted order: `[split3, 
split2, split1]`
   
   4. The partitions are now:
      ```
      [
        0: [split3, split2]
        1: [split1]
      ]
      ```
   
   5. **Reader still has state `[split1, split2]`** (unchanged from checkpoint)
   
   6. Reader requests group 1 (because `receivedMetaNum = 2`, and `2 / 2 = 1`)
   
   7. Enumerator returns `[split1]` instead of the expected `split3`
   
   8. Reader calculates `expectedNumberOfAlreadyRetrievedElements = 2 % 2 = 0`
   
   9. Reader discards 0 elements and appends `[split1]` to `[split1, split2]`
   
   10. **Result:** Reader state becomes `[split1, split2, split1]` — **contains 
duplicate `split1` and missing `split3`**
   
   ## The Root Cause
   
   The issue is that the **order semantics changed** between versions, but the 
**state serializer version was NOT incremented**:
   
   ```java
   private static final int VERSION = 5;  // Still 5, not incremented
   ```
   
   The old version and new version interpret the same serialized state 
differently:
   - Old version: sorts on restore → order is by split ID
   - New version: does not sort on restore → order is by insertion time
   
   This breaks the invariant that both Enumerator and Reader must use the same 
order.
   
   ## Suggested Fix
   
   1. **Increment the state serializer version:**
      ```java
      private static final int VERSION = 6;
      ```
   
   2. **Add migration logic for version 5 and below:**
      ```java
      private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
              int version, int splitVersion, DataInputDeserializer in) throws 
IOException {
          // ... existing deserialization logic ...
          
          // For old versions, sort the assigned splits to maintain 
compatibility
          if (version < 6) {
              LinkedHashMap<String, MySqlSchemalessSnapshotSplit> sortedSplits =
                  assignedSchemalessSnapshotSplits.entrySet().stream()
                      .sorted(Map.Entry.comparingByKey())
                      .collect(Collectors.toMap(
                          Map.Entry::getKey,
                          Map.Entry::getValue,
                          (o1, o2) -> o1,
                          LinkedHashMap::new));
              assignedSchemalessSnapshotSplits = sortedSplits;
          }
          // ...
      }
      ```
   
   This ensures backward compatibility for existing jobs upgrading to the new 
version.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to