This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 38c255501 [hotfix] Fix union read can't restore issue (#2321)
38c255501 is described below

commit 38c255501d63e25c4a2ad1d8dd621e1827ddb55b
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Jan 8 14:21:44 2026 +0800

    [hotfix] Fix union read can't restore issue (#2321)
---
 .../FlussSourceEnumeratorStateSerializer.java      | 171 ++++++++++++++++-----
 .../flink/source/state/SourceEnumeratorState.java  |   6 +-
 .../state/SourceEnumeratorStateSerializerTest.java |  62 ++++++++
 3 files changed, 195 insertions(+), 44 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
index c355109a2..b721032d0 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
@@ -17,11 +17,13 @@
 
 package org.apache.fluss.flink.source.state;
 
+import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.flink.source.split.SourceSplitSerializer;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.types.Tuple2;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -37,17 +39,40 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-/** A serializer for {@link SourceEnumeratorState}. */
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Serializer for {@link SourceEnumeratorState}.
+ *
+ * <p>This serializer manages the versioned persistence of the enumerator's 
state, including
+ * assigned buckets, partitions, and remaining hybrid lake/Fluss splits.
+ *
+ * <h3>Version Evolution:</h3>
+ *
+ * <ul>
+ *   <li><b>Version 0:</b> Initial version. Remaining hybrid lake splits are 
only (de)serialized if
+ *       the {@code lakeSource} is non-null.
+ *   <li><b>Version 1 (Current):</b> Decouples split serialization from the 
{@code lakeSource}
+ *       presence. It always attempts to (de)serialize the splits, using an 
internal boolean flag to
+ *       indicate presence. This ensures state consistency regardless of the 
current runtime
+ *       configuration.
+ * </ul>
+ *
+ * <p><b>Compatibility Note:</b> This serializer is designed for backward 
compatibility. It can
+ * deserialize states from Version 0, but always produces Version 1 during 
serialization.
+ */
 public class FlussSourceEnumeratorStateSerializer
         implements SimpleVersionedSerializer<SourceEnumeratorState> {
 
     @Nullable private final LakeSource<LakeSplit> lakeSource;
 
     private static final int VERSION_0 = 0;
+    private static final int VERSION_1 = 1;
+
     private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
             ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
 
-    private static final int CURRENT_VERSION = VERSION_0;
+    private static final int CURRENT_VERSION = VERSION_1;
 
     public FlussSourceEnumeratorStateSerializer(LakeSource<LakeSplit> 
lakeSource) {
         this.lakeSource = lakeSource;
@@ -61,11 +86,28 @@ public class FlussSourceEnumeratorStateSerializer
     @Override
     public byte[] serialize(SourceEnumeratorState state) throws IOException {
         final DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+        // serialize assign bucket and partitions
+        serializeAssignBucketAndPartitions(
+                out, state.getAssignedBuckets(), 
state.getAssignedPartitions());
+
+        // serialize remain hybrid lake splits
+        serializeRemainingHybridLakeFlussSplits(out, state);
+
+        final byte[] result = out.getCopyOfBuffer();
+        out.clear();
+        return result;
+    }
+
+    private void serializeAssignBucketAndPartitions(
+            DataOutputSerializer out,
+            Set<TableBucket> assignedBuckets,
+            Map<Long, String> assignedPartitions)
+            throws IOException {
         // write assigned buckets
-        out.writeInt(state.getAssignedBuckets().size());
-        for (TableBucket tableBucket : state.getAssignedBuckets()) {
+        out.writeInt(assignedBuckets.size());
+        for (TableBucket tableBucket : assignedBuckets) {
             out.writeLong(tableBucket.getTableId());
-
             // write partition
             // if partition is not null
             if (tableBucket.getPartitionId() != null) {
@@ -78,16 +120,42 @@ public class FlussSourceEnumeratorStateSerializer
             out.writeInt(tableBucket.getBucket());
         }
         // write assigned partitions
-        out.writeInt(state.getAssignedPartitions().size());
-        for (Map.Entry<Long, String> entry : 
state.getAssignedPartitions().entrySet()) {
+        out.writeInt(assignedPartitions.size());
+        for (Map.Entry<Long, String> entry : assignedPartitions.entrySet()) {
             out.writeLong(entry.getKey());
             out.writeUTF(entry.getValue());
         }
+    }
 
+    private void serializeRemainingHybridLakeFlussSplits(
+            final DataOutputSerializer out, SourceEnumeratorState state) 
throws IOException {
+        List<SourceSplitBase> remainingHybridLakeFlussSplits =
+                state.getRemainingHybridLakeFlussSplits();
+        if (remainingHybridLakeFlussSplits != null) {
+            // write that hybrid lake fluss splits is not null
+            out.writeBoolean(true);
+            out.writeInt(remainingHybridLakeFlussSplits.size());
+            SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            out.writeInt(sourceSplitSerializer.getVersion());
+            for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
+                byte[] serializeBytes = sourceSplitSerializer.serialize(split);
+                out.writeInt(serializeBytes.length);
+                out.write(serializeBytes);
+            }
+        } else {
+            // write that hybrid lake fluss splits is null
+            out.writeBoolean(false);
+        }
+    }
+
+    @VisibleForTesting
+    protected byte[] serializeV0(SourceEnumeratorState state) throws 
IOException {
+        final DataOutputSerializer out = SERIALIZER_CACHE.get();
+        serializeAssignBucketAndPartitions(
+                out, state.getAssignedBuckets(), 
state.getAssignedPartitions());
         if (lakeSource != null) {
             serializeRemainingHybridLakeFlussSplits(out, state);
         }
-
         final byte[] result = out.getCopyOfBuffer();
         out.clear();
         return result;
@@ -95,10 +163,54 @@ public class FlussSourceEnumeratorStateSerializer
 
     @Override
     public SourceEnumeratorState deserialize(int version, byte[] serialized) 
throws IOException {
-        if (version != VERSION_0) {
-            throw new IOException("Unknown version or corrupt state: " + 
version);
+        switch (version) {
+            case VERSION_1:
+                return deserializeV1(serialized);
+            case VERSION_0:
+                return deserializeV0(serialized);
+            default:
+                throw new IOException(
+                        String.format(
+                                "The bytes are serialized with version %d, "
+                                        + "while this deserializer only 
supports version up to %d",
+                                version, CURRENT_VERSION));
+        }
+    }
+
+    private SourceEnumeratorState deserializeV0(byte[] serialized) throws 
IOException {
+        DataInputDeserializer in = new DataInputDeserializer(serialized);
+        Tuple2<Set<TableBucket>, Map<Long, String>> assignBucketAndPartitions =
+                deserializeAssignBucketAndPartitions(in);
+        List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
+        // in version 0, deserialize remaining hybrid lake Fluss splits only 
when lakeSource is
+        // not null.
+        if (lakeSource != null) {
+            remainingHybridLakeFlussSplits = 
deserializeRemainingHybridLakeFlussSplits(in);
         }
-        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+        return new SourceEnumeratorState(
+                assignBucketAndPartitions.f0,
+                assignBucketAndPartitions.f1,
+                remainingHybridLakeFlussSplits);
+    }
+
+    private SourceEnumeratorState deserializeV1(byte[] serialized) throws 
IOException {
+        DataInputDeserializer in = new DataInputDeserializer(serialized);
+        Tuple2<Set<TableBucket>, Map<Long, String>> assignBucketAndPartitions =
+                deserializeAssignBucketAndPartitions(in);
+        List<SourceSplitBase> remainingHybridLakeFlussSplits =
+                deserializeRemainingHybridLakeFlussSplits(in);
+        // in version 1,  always attempt to deserialize remaining hybrid 
lake/Fluss
+        // splits. The serialized state encodes their presence via a boolean 
flag, so
+        // this logic no longer depends on the lakeSource flag. This 
unconditional
+        // deserialization is the intended behavior change compared to 
VERSION_0.
+        return new SourceEnumeratorState(
+                assignBucketAndPartitions.f0,
+                assignBucketAndPartitions.f1,
+                remainingHybridLakeFlussSplits);
+    }
+
+    private Tuple2<Set<TableBucket>, Map<Long, String>> 
deserializeAssignBucketAndPartitions(
+            DataInputDeserializer in) throws IOException {
         // deserialize assigned buckets
         int assignedBucketsSize = in.readInt();
         Set<TableBucket> assignedBuckets = new HashSet<>(assignedBucketsSize);
@@ -122,36 +234,7 @@ public class FlussSourceEnumeratorStateSerializer
             String partition = in.readUTF();
             assignedPartitions.put(partitionId, partition);
         }
-
-        List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
-        if (lakeSource != null) {
-            // todo: add a ut for serialize remaining hybrid lake fluss splits
-            remainingHybridLakeFlussSplits = 
deserializeRemainingHybridLakeFlussSplits(in);
-        }
-
-        return new SourceEnumeratorState(
-                assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
-    }
-
-    private void serializeRemainingHybridLakeFlussSplits(
-            final DataOutputSerializer out, SourceEnumeratorState state) 
throws IOException {
-        List<SourceSplitBase> remainingHybridLakeFlussSplits =
-                state.getRemainingHybridLakeFlussSplits();
-        if (remainingHybridLakeFlussSplits != null) {
-            // write that hybrid lake fluss splits is not null
-            out.writeBoolean(true);
-            out.writeInt(remainingHybridLakeFlussSplits.size());
-            SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
-            out.writeInt(sourceSplitSerializer.getVersion());
-            for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
-                byte[] serializeBytes = sourceSplitSerializer.serialize(split);
-                out.writeInt(serializeBytes.length);
-                out.write(serializeBytes);
-            }
-        } else {
-            // write that hybrid lake fluss splits is null
-            out.writeBoolean(false);
-        }
+        return Tuple2.of(assignedBuckets, assignedPartitions);
     }
 
     @Nullable
@@ -160,7 +243,11 @@ public class FlussSourceEnumeratorStateSerializer
         if (in.readBoolean()) {
             int numSplits = in.readInt();
             List<SourceSplitBase> splits = new ArrayList<>(numSplits);
-            SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            SourceSplitSerializer sourceSplitSerializer =
+                    new SourceSplitSerializer(
+                            checkNotNull(
+                                    lakeSource,
+                                    "lake source must not be null when there 
are hybrid lake splits."));
             int version = in.readInt();
             for (int i = 0; i < numSplits; i++) {
                 int splitSizeInBytes = in.readInt();
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
index 7a677df20..6042e65f1 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
@@ -73,12 +73,14 @@ public class SourceEnumeratorState {
         }
         SourceEnumeratorState that = (SourceEnumeratorState) o;
         return Objects.equals(assignedBuckets, that.assignedBuckets)
-                && Objects.equals(assignedPartitions, that.assignedPartitions);
+                && Objects.equals(assignedPartitions, that.assignedPartitions)
+                && Objects.equals(
+                        remainingHybridLakeFlussSplits, 
that.remainingHybridLakeFlussSplits);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(assignedBuckets, assignedPartitions);
+        return Objects.hash(assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
index 26b4a024c..2c30bf086 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
@@ -92,4 +92,66 @@ class SourceEnumeratorStateSerializerTest {
         /* check deserialized is equal to the original */
         
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
     }
+
+    @Test
+    void testV0Compatibility() throws Exception {
+        // serialize with v0,
+        int version = 0;
+        // test with lake source = null
+        FlussSourceEnumeratorStateSerializer serializer =
+                new FlussSourceEnumeratorStateSerializer(null);
+
+        Set<TableBucket> assignedBuckets =
+                new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
+        Map<Long, String> assignedPartitions = new HashMap<>();
+        assignedPartitions.put(1L, "partition1");
+        assignedPartitions.put(2L, "partition2");
+        SourceEnumeratorState sourceEnumeratorState =
+                new SourceEnumeratorState(assignedBuckets, assignedPartitions, 
null);
+        byte[] serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        SourceEnumeratorState deserializedSourceEnumeratorState =
+                serializer.deserialize(version, serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+
+        // test with lake source is not null
+        serializer = new FlussSourceEnumeratorStateSerializer(new 
TestingLakeSource());
+        List<SourceSplitBase> remainingHybridLakeFlussSplits = new 
ArrayList<>();
+        // Add a LogSplit
+        TableBucket logSplitBucket = new TableBucket(1, 0);
+        LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
+        remainingHybridLakeFlussSplits.add(logSplit);
+        sourceEnumeratorState =
+                new SourceEnumeratorState(
+                        assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
+
+        serialized = serializer.serializeV0(sourceEnumeratorState);
+
+        // then deserialize
+        deserializedSourceEnumeratorState = serializer.deserialize(version, 
serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+    }
+
+    @Test
+    void testInconsistentLakeSourceSerde() throws Exception {
+        // test serialize with null lake source
+        FlussSourceEnumeratorStateSerializer serializer =
+                new FlussSourceEnumeratorStateSerializer(null);
+
+        Set<TableBucket> assignedBuckets =
+                new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
+        Map<Long, String> assignedPartitions = new HashMap<>();
+        assignedPartitions.put(1L, "partition1");
+        assignedPartitions.put(2L, "partition2");
+        SourceEnumeratorState sourceEnumeratorState =
+                new SourceEnumeratorState(assignedBuckets, assignedPartitions, 
null);
+        byte[] serialized = serializer.serialize(sourceEnumeratorState);
+
+        // test deserialize with nonnull lake source
+        serializer = new FlussSourceEnumeratorStateSerializer(new 
TestingLakeSource());
+        SourceEnumeratorState deserializedSourceEnumeratorState =
+                serializer.deserialize(serializer.getVersion(), serialized);
+        
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+    }
 }

Reply via email to