Copilot commented on code in PR #2321:
URL: https://github.com/apache/fluss/pull/2321#discussion_r2667027906


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -124,8 +151,14 @@ public SourceEnumeratorState deserialize(int version, 
byte[] serialized) throws
         }
 
         List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
-        if (lakeSource != null) {
-            // todo: add a ut for serialize remaining hybrid lake fluss splits
+
+        if (version == VERSION_0) {
+            // if it's version0, which rely on lakeSource flag to do 
deserialize,
+            if (lakeSource != null) {
+                remainingHybridLakeFlussSplits = 
deserializeRemainingHybridLakeFlussSplits(in);
+            }
+        } else {
+            // anyway, deserialize remaining splits

Review Comment:
   The comment "anyway, deserialize remaining splits" is unclear and too 
casual. It should explain why in VERSION_1 the remaining splits are always 
deserialized regardless of the lakeSource flag, which is the key fix in this PR.
   ```suggestion
               // For VERSION_0, deserialization of remaining hybrid lake/Fluss 
splits relies on the
               // lakeSource flag: only attempt to deserialize when lakeSource 
is not null.
               if (lakeSource != null) {
                   remainingHybridLakeFlussSplits = 
deserializeRemainingHybridLakeFlussSplits(in);
               }
           } else {
               // For VERSION_1 and later, always attempt to deserialize 
remaining hybrid lake/Fluss
               // splits. The serialized state encodes their presence via a 
boolean flag, so this
               // logic no longer depends on the lakeSource flag. This 
unconditional deserialization
               // is the intended behavior change compared to VERSION_0.
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java:
##########
@@ -92,4 +92,66 @@ void testPendingSplitsCheckpointSerde() throws Exception {
         /* check deserialized is equal to the original */
         
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
     }
+
+    @Test
+    void testV0Compatibility() throws Exception {
+        // serialize with v0,
+        int version = 0;
+        // test with lake source = null
+        FlussSourceEnumeratorStateSerializer serializer =
+                new FlussSourceEnumeratorStateSerializer(null);
+
+        Set<TableBucket> assignedBuckets =
+                new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
+        Map<Long, String> assignedPartitions = new HashMap<>();
+        assignedPartitions.put(1L, "partition1");
+        assignedPartitions.put(2L, "partition2");
+        SourceEnumeratorState sourceEnumeratorState =
+                new SourceEnumeratorState(assignedBuckets, assignedPartitions, 
null);
+        byte[] serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        SourceEnumeratorState deserializedSourceEnumeratorState =
+                serializer.deserialize(version, serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+
+        // test with lake source is not null
+        serializer = new FlussSourceEnumeratorStateSerializer(new 
TestingLakeSource());
+        List<SourceSplitBase> remainingHybridLakeFlussSplits = new 
ArrayList<>();
+        // Add a LogSplit
+        TableBucket logSplitBucket = new TableBucket(1, 0);
+        LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
+        remainingHybridLakeFlussSplits.add(logSplit);
+        sourceEnumeratorState =
+                new SourceEnumeratorState(
+                        assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
+
+        serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        deserializedSourceEnumeratorState = serializer.deserialize(version, 
serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+    }
+
+    @Test
+    void testInconsistentLakeSourceSerde() throws Exception {
+        // test serialize with null lake source
+        FlussSourceEnumeratorStateSerializer serializer =
+                new FlussSourceEnumeratorStateSerializer(null);
+
+        Set<TableBucket> assignedBuckets =
+                new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
+        Map<Long, String> assignedPartitions = new HashMap<>();
+        assignedPartitions.put(1L, "partition1");
+        assignedPartitions.put(2L, "partition2");
+        SourceEnumeratorState sourceEnumeratorState =
+                new SourceEnumeratorState(assignedBuckets, assignedPartitions, 
null);
+        byte[] serialized = serializer.serialize(sourceEnumeratorState);
+
+        // test deserialize with nonnull lake source
+        serializer = new FlussSourceEnumeratorStateSerializer(new 
TestingLakeSource());
+        SourceEnumeratorState deserializedSourceEnumeratorState =
+                serializer.deserialize(serializer.getVersion(), serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);

Review Comment:
   The test assertions use equals() to verify serialization/deserialization 
correctness, but SourceEnumeratorState.equals() doesn't compare 
remainingHybridLakeFlussSplits field. This means the test won't catch bugs 
where remainingHybridLakeFlussSplits is incorrectly serialized or deserialized. 
Consider adding explicit assertions to verify remainingHybridLakeFlussSplits 
separately.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java:
##########
@@ -92,4 +92,66 @@ void testPendingSplitsCheckpointSerde() throws Exception {
         /* check deserialized is equal to the original */
         
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
     }
+
+    @Test
+    void testV0Compatibility() throws Exception {
+        // serialize with v0,
+        int version = 0;
+        // test with lake source = null
+        FlussSourceEnumeratorStateSerializer serializer =
+                new FlussSourceEnumeratorStateSerializer(null);
+
+        Set<TableBucket> assignedBuckets =
+                new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
+        Map<Long, String> assignedPartitions = new HashMap<>();
+        assignedPartitions.put(1L, "partition1");
+        assignedPartitions.put(2L, "partition2");
+        SourceEnumeratorState sourceEnumeratorState =
+                new SourceEnumeratorState(assignedBuckets, assignedPartitions, 
null);
+        byte[] serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        SourceEnumeratorState deserializedSourceEnumeratorState =
+                serializer.deserialize(version, serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+
+        // test with lake source is not null
+        serializer = new FlussSourceEnumeratorStateSerializer(new 
TestingLakeSource());
+        List<SourceSplitBase> remainingHybridLakeFlussSplits = new 
ArrayList<>();
+        // Add a LogSplit
+        TableBucket logSplitBucket = new TableBucket(1, 0);
+        LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
+        remainingHybridLakeFlussSplits.add(logSplit);
+        sourceEnumeratorState =
+                new SourceEnumeratorState(
+                        assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
+
+        serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        deserializedSourceEnumeratorState = serializer.deserialize(version, 
serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);

Review Comment:
   The test assertions use equals() to verify serialization/deserialization 
correctness, but SourceEnumeratorState.equals() doesn't compare 
remainingHybridLakeFlussSplits field. This means the test won't catch bugs 
where remainingHybridLakeFlussSplits is incorrectly serialized or deserialized. 
Consider adding explicit assertions to verify remainingHybridLakeFlussSplits 
separately.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -124,8 +151,14 @@ public SourceEnumeratorState deserialize(int version, 
byte[] serialized) throws
         }
 
         List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
-        if (lakeSource != null) {
-            // todo: add a ut for serialize remaining hybrid lake fluss splits
+
+        if (version == VERSION_0) {
+            // if it's version0, which rely on lakeSource flag to do 
deserialize,

Review Comment:
   The comment is incomplete and grammatically incorrect. It should form a 
complete sentence explaining what VERSION_0 does.
   ```suggestion
               // For VERSION_0, deserialize remaining hybrid lake Fluss splits 
only when lakeSource is not null.
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java:
##########
@@ -92,4 +92,66 @@ void testPendingSplitsCheckpointSerde() throws Exception {
         /* check deserialized is equal to the original */
         
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
     }
+
+    @Test
+    void testV0Compatibility() throws Exception {
+        // serialize with v0,
+        int version = 0;
+        // test with lake source = null
+        FlussSourceEnumeratorStateSerializer serializer =
+                new FlussSourceEnumeratorStateSerializer(null);
+
+        Set<TableBucket> assignedBuckets =
+                new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
+        Map<Long, String> assignedPartitions = new HashMap<>();
+        assignedPartitions.put(1L, "partition1");
+        assignedPartitions.put(2L, "partition2");
+        SourceEnumeratorState sourceEnumeratorState =
+                new SourceEnumeratorState(assignedBuckets, assignedPartitions, 
null);
+        byte[] serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        SourceEnumeratorState deserializedSourceEnumeratorState =
+                serializer.deserialize(version, serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+
+        // test with lake source is not null
+        serializer = new FlussSourceEnumeratorStateSerializer(new 
TestingLakeSource());
+        List<SourceSplitBase> remainingHybridLakeFlussSplits = new 
ArrayList<>();
+        // Add a LogSplit
+        TableBucket logSplitBucket = new TableBucket(1, 0);
+        LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
+        remainingHybridLakeFlussSplits.add(logSplit);
+        sourceEnumeratorState =
+                new SourceEnumeratorState(
+                        assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
+
+        serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        deserializedSourceEnumeratorState = serializer.deserialize(version, 
serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+    }
+
+    @Test
+    void testInconsistentLakeSourceSerde() throws Exception {

Review Comment:
   The method name "testInconsistentLakeSourceSerde" is misleading. This test 
actually validates that VERSION_1 serialization format correctly handles the 
case where serialization happens with null lakeSource but deserialization 
happens with non-null lakeSource. Consider renaming to something like 
"testV1DeserializeWithDifferentLakeSource" to better reflect what's being 
tested.
   ```suggestion
       void testV1DeserializeWithDifferentLakeSource() throws Exception {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to