lvyanquan commented on code in PR #4087:
URL: https://github.com/apache/flink-cdc/pull/4087#discussion_r2958034741
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java:
##########
@@ -208,9 +206,7 @@ public void close() {
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
- snapshotSplitAssigner.getAssignedSplits().values().stream()
- .sorted(Comparator.comparing(MySqlSplit::splitId))
- .collect(Collectors.toList());
+ new
ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());
Review Comment:
Thank you @morozov for the detailed explanation. However, I believe there's
still a compatibility issue that wasn't fully addressed. Let me clarify my
concern with a concrete **job upgrade scenario**.
## The Scenario
Consider an **existing running job** that is being upgraded from the old
version to the new version:
### Before Upgrade (Old Version)
1. Splits are assigned in order: `split3 → split2 → split1`
2. **Enumerator restores from checkpoint and sorts by split ID:**
```java
// Old code sorts on restore
.sorted(Entry.comparingByKey())
```
Result: `assignedSplits = [split1, split2, split3]`
3. **Reader has already received partial splits** based on the sorted order,
e.g., `[split1, split2]`
4. `finishedSnapshotSplitInfos` on Enumerator side is built from sorted
order: `[split1, split2, split3]`
5. If `chunk-meta.group.size = 2`, the partitions are:
```
[
0: [split1, split2]
1: [split3]
]
```
6. Reader has received group 0, next request will be for group 1 (`split3`)
### After Upgrade (New Version)
1. **Job restarts from checkpoint and upgrades to new version**
2. **Enumerator restores WITHOUT sorting:**
```java
// New code does NOT sort on restore
this.assignedSplits = assignedSplits;
```
Result: `assignedSplits = [split3, split2, split1]` (preserves original
insertion order)
3. `finishedSnapshotSplitInfos` is now built from unsorted order: `[split3,
split2, split1]`
4. The partitions are now:
```
[
0: [split3, split2]
1: [split1]
]
```
5. **Reader still has state `[split1, split2]`** (unchanged from checkpoint)
6. Reader requests group 1 (because `receivedMetaNum = 2`, and `2 / 2 = 1`)
7. Enumerator returns `[split1]` instead of the expected `split3`
8. Reader calculates `expectedNumberOfAlreadyRetrievedElements = 2 % 2 = 0`
9. Reader discards 0 elements and appends `[split1]` to `[split1, split2]`
10. **Result:** Reader state becomes `[split1, split2, split1]` — **contains
duplicate `split1` and missing `split3`**
## The Root Cause
The issue is that the **order semantics changed** between versions, but the
**state serializer version was NOT incremented**:
```java
private static final int VERSION = 5; // Still 5, not incremented
```
The old version and new version interpret the same serialized state
differently:
- Old version: sorts on restore → order is by split ID
- New version: does not sort on restore → order is by insertion time
This breaks the invariant that both Enumerator and Reader must use the same
order.
## Suggested Fix
1. **Increment the state serializer version:**
```java
private static final int VERSION = 6;
```
2. **Add migration logic for version 5 and below:**
```java
private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
int version, int splitVersion, DataInputDeserializer in) throws
IOException {
// ... existing deserialization logic ...
// For old versions, sort the assigned splits to maintain
compatibility
if (version < 6) {
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> sortedSplits =
assignedSchemalessSnapshotSplits.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(o1, o2) -> o1,
LinkedHashMap::new));
assignedSchemalessSnapshotSplits = sortedSplits;
}
// ...
}
```
This ensures backward compatibility for existing jobs upgrading to the new
version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]