This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 0964d866c [flink] Union read supports reading partition data that 
exists in the lake but is expired in Fluss (#2197)
0964d866c is described below

commit 0964d866c0330967a8cf2d65d298dcc8b9eda9c2
Author: Liebing <[email protected]>
AuthorDate: Mon Dec 22 10:32:49 2025 +0800

    [flink] Union read supports reading partition data that exists in the lake 
but is expired in Fluss (#2197)
---
 .../fluss/lake/source/TestingLakeSource.java       | 142 +++++++++++
 .../apache/fluss/lake/source/TestingLakeSplit.java |  37 ++-
 .../apache/fluss/lake/source/TestingPlanner.java   |  51 ++++
 .../source/enumerator/FlinkSourceEnumerator.java   |  52 +++-
 .../flink/source/event/PartitionsRemovedEvent.java |  15 ++
 .../source/reader/FlinkSourceSplitReader.java      |  18 +-
 .../fluss/flink/lake/LakeSplitSerializerTest.java  |  32 +--
 .../enumerator/FlinkSourceEnumeratorTest.java      | 279 +++++++++++++++++++++
 .../flink/source/reader/FlinkSourceReaderTest.java |  10 +-
 .../state/SourceEnumeratorStateSerializerTest.java |  41 ++-
 .../fluss/lake/paimon/source/PaimonSplit.java      |  10 +
 .../paimon/flink/FlinkUnionReadLogTableITCase.java |  91 +++++++
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 107 ++++++++
 13 files changed, 817 insertions(+), 68 deletions(-)

diff --git 
a/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java
 
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java
new file mode 100644
index 000000000..d2fa4269b
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.source;
+
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.utils.CloseableIterator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A testing implementation of {@link LakeSource}. */
+public class TestingLakeSource implements LakeSource<LakeSplit> {
+
+    // bucket num of source table
+    private final int bucketNum;
+
+    // partition infos of partitions contain lake splits
+    private final List<PartitionInfo> partitionInfos;
+
+    public TestingLakeSource() {
+        this.bucketNum = 0;
+        this.partitionInfos = null;
+    }
+
+    public TestingLakeSource(int bucketNum, List<PartitionInfo> 
partitionInfos) {
+        this.bucketNum = bucketNum;
+        this.partitionInfos = partitionInfos;
+    }
+
+    @Override
+    public void withProject(int[][] project) {}
+
+    @Override
+    public void withLimit(int limit) {}
+
+    @Override
+    public FilterPushDownResult withFilters(List<Predicate> predicates) {
+        return null;
+    }
+
+    @Override
+    public Planner<LakeSplit> createPlanner(PlannerContext context) throws 
IOException {
+        return new TestingPlanner(bucketNum, partitionInfos);
+    }
+
+    @Override
+    public RecordReader createRecordReader(ReaderContext<LakeSplit> context) 
throws IOException {
+        return CloseableIterator::emptyIterator;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<LakeSplit> getSplitSerializer() {
+        return new SimpleVersionedSerializer<LakeSplit>() {
+
+            @Override
+            public int getVersion() {
+                return 0;
+            }
+
+            @Override
+            public byte[] serialize(LakeSplit split) throws IOException {
+                if (split instanceof TestingLakeSplit) {
+                    TestingLakeSplit testingSplit = (TestingLakeSplit) split;
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    try (DataOutputStream dos = new DataOutputStream(baos)) {
+                        // Serialize bucket
+                        dos.writeInt(testingSplit.bucket());
+
+                        // Serialize partition list
+                        List<String> partition = testingSplit.partition();
+                        if (partition == null) {
+                            dos.writeInt(-1);
+                        } else {
+                            dos.writeInt(partition.size());
+                            for (String part : partition) {
+                                // Write a boolean flag to indicate if the 
string is null
+                                dos.writeBoolean(part != null);
+                                if (part != null) {
+                                    dos.writeUTF(part);
+                                }
+                            }
+                        }
+                    }
+                    return baos.toByteArray();
+                }
+                throw new IOException("Unsupported split type: " + 
split.getClass().getName());
+            }
+
+            @Override
+            public LakeSplit deserialize(int version, byte[] serialized) 
throws IOException {
+                if (version != 0) {
+                    throw new IOException("Unsupported version: " + version);
+                }
+
+                try (DataInputStream dis =
+                        new DataInputStream(new 
ByteArrayInputStream(serialized))) {
+                    // Deserialize bucket
+                    int bucket = dis.readInt();
+
+                    // Deserialize partition list
+                    int partitionSize = dis.readInt();
+                    List<String> partition;
+                    if (partitionSize < 0) {
+                        partition = null;
+                    } else {
+                        partition = new ArrayList<>(partitionSize);
+                        for (int i = 0; i < partitionSize; i++) {
+                            // Read boolean flag to determine if the string is 
null
+                            boolean isNotNull = dis.readBoolean();
+                            String part = isNotNull ? dis.readUTF() : null;
+                            partition.add(part);
+                        }
+                    }
+
+                    return new TestingLakeSplit(bucket, partition);
+                }
+            }
+        };
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
 b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSplit.java
similarity index 52%
copy from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
copy to 
fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSplit.java
index e21c4a155..dae4ee2e7 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSplit.java
@@ -15,34 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.source.event;
+package org.apache.fluss.lake.source;
 
-import org.apache.flink.api.connector.source.SourceEvent;
+import java.util.List;
 
-import java.util.Map;
+/** A testing implementation of {@link LakeSplit}. */
+public class TestingLakeSplit implements LakeSplit {
 
-/**
- * A source event to represent partitions is removed to send from enumerator 
to reader.
- *
- * <p>It contains the partition bucket of the removed partitions that has been 
assigned to the
- * reader.
- */
-public class PartitionsRemovedEvent implements SourceEvent {
+    private final int bucket;
+    private final List<String> partition;
 
-    private static final long serialVersionUID = 1L;
-
-    private final Map<Long, String> removedPartitions;
+    public TestingLakeSplit(int bucket, List<String> partition) {
+        this.bucket = bucket;
+        this.partition = partition;
+    }
 
-    public PartitionsRemovedEvent(Map<Long, String> removedPartitions) {
-        this.removedPartitions = removedPartitions;
+    @Override
+    public String toString() {
+        return "TestingLakeSplit{" + "bucket=" + bucket + ", partition=" + 
partition + '}';
     }
 
-    public Map<Long, String> getRemovedPartitions() {
-        return removedPartitions;
+    @Override
+    public int bucket() {
+        return bucket;
     }
 
     @Override
-    public String toString() {
-        return "PartitionsRemovedEvent{" + "removedPartitions=" + 
removedPartitions + '}';
+    public List<String> partition() {
+        return partition;
     }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingPlanner.java 
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingPlanner.java
new file mode 100644
index 000000000..2ae27221e
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingPlanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.source;
+
+import org.apache.fluss.metadata.PartitionInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A testing implementation of {@link Planner}. */
+public class TestingPlanner implements Planner<LakeSplit> {
+
+    private final int bucketNum;
+    private final List<PartitionInfo> partitionInfos;
+
+    public TestingPlanner(int bucketNum, List<PartitionInfo> partitionInfos) {
+        this.bucketNum = bucketNum;
+        this.partitionInfos = partitionInfos;
+    }
+
+    @Override
+    public List<LakeSplit> plan() throws IOException {
+        List<LakeSplit> splits = new ArrayList<>();
+
+        for (PartitionInfo partitionInfo : partitionInfos) {
+            for (int i = 0; i < bucketNum; i++) {
+                splits.add(
+                        new TestingLakeSplit(
+                                i, 
partitionInfo.getResolvedPartitionSpec().getPartitionValues()));
+            }
+        }
+
+        return splits;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 55a9c871c..f8db0aff1 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -24,6 +24,8 @@ import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.lake.LakeSplitGenerator;
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
 import 
org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
 import 
org.apache.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -83,7 +85,8 @@ import static org.apache.fluss.utils.Preconditions.checkState;
  * <p>The enumerator is responsible for:
  *
  * <ul>
- *   <li>Get the all splits(snapshot split + log split) for a table of Fluss 
to be read.
+ *   <li>Get the all splits(lake split + kv snapshot split + log split) for a 
table of Fluss to be
+ *       read.
  *   <li>Assign the splits to readers with the guarantee that the splits 
belong to the same bucket
  *       will be assigned to same reader.
  * </ul>
@@ -110,10 +113,15 @@ public class FlinkSourceEnumerator
      *
      * <p>It's mainly used to help enumerator to broadcast the partition 
removed event to the
      * readers when partitions is dropped.
+     *
+     * <p>If an assigned partition exists only in the lake and has already 
expired in Fluss, it will
+     * remain here indefinitely and will not be removed. However, considering 
that only a small
+     * number of such lake-only partitions might exist during the initial 
startup, and they consume
+     * minimal memory, this issue is being ignored for now.
      */
     private final Map<Long, String> assignedPartitions;
 
-    /** buckets that have been assigned to readers. */
+    /** Buckets that have been assigned to readers. */
     private final Set<TableBucket> assignedTableBuckets;
 
     @Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;
@@ -222,12 +230,12 @@ public class FlinkSourceEnumerator
         this.context = checkNotNull(context);
         this.pendingSplitAssignment = new HashMap<>();
         this.assignedTableBuckets = new HashSet<>(assignedTableBuckets);
-        this.startingOffsetsInitializer = startingOffsetsInitializer;
         this.assignedPartitions = new HashMap<>(assignedPartitions);
         this.pendingHybridLakeFlussSplits =
                 pendingHybridLakeFlussSplits == null
                         ? null
                         : new LinkedList<>(pendingHybridLakeFlussSplits);
+        this.startingOffsetsInitializer = startingOffsetsInitializer;
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
         this.streaming = streaming;
         this.partitionFilters = partitionFilters;
@@ -258,6 +266,10 @@ public class FlinkSourceEnumerator
                     // we'll need to consider lake splits
                     List<SourceSplitBase> hybridLakeFlussSplits = 
generateHybridLakeFlussSplits();
                     if (hybridLakeFlussSplits != null) {
+                        LOG.info(
+                                "Generated {} hybrid lake splits for table 
{}.",
+                                hybridLakeFlussSplits.size(),
+                                tablePath);
                         // handle hybrid lake fluss splits firstly
                         handleSplitsAdd(hybridLakeFlussSplits, null);
                     }
@@ -554,7 +566,8 @@ public class FlinkSourceEnumerator
                 // hybrid snapshot log split;
                 OptionalLong logOffset = snapshots.getLogOffset(bucketId);
                 checkState(
-                        logOffset.isPresent(), "Log offset should be present 
if snapshot id is.");
+                        logOffset.isPresent(),
+                        "Log offset should be present if snapshot id is 
present.");
                 splits.add(
                         new HybridSnapshotLogSplit(
                                 tb, partitionName, snapshotId.getAsLong(), 
logOffset.getAsLong()));
@@ -616,6 +629,7 @@ public class FlinkSourceEnumerator
         // should be restored from checkpoint, shouldn't
         // list splits again
         if (pendingHybridLakeFlussSplits != null) {
+            LOG.info("Still have pending lake fluss splits, shouldn't list 
splits again.");
             return pendingHybridLakeFlussSplits;
         }
         try {
@@ -664,9 +678,28 @@ public class FlinkSourceEnumerator
         pendingSplitAssignment.forEach(
                 (reader, splits) ->
                         splits.removeIf(
-                                split ->
-                                        removedPartitionsMap.containsKey(
-                                                
split.getTableBucket().getPartitionId())));
+                                split -> {
+                                    // Never remove LakeSnapshotSplit, because 
during union reads,
+                                    // data from the lake must still be read 
even if the partition
+                                    // has already expired in Fluss.
+                                    if (split instanceof LakeSnapshotSplit) {
+                                        return false;
+                                    }
+
+                                    // Similar to LakeSnapshotSplit, if it 
contains any lake split,
+                                    // never remove it; otherwise, it can be 
removed when the Fluss
+                                    // partition expires.
+                                    if (split instanceof 
LakeSnapshotAndFlussLogSplit) {
+                                        LakeSnapshotAndFlussLogSplit 
hybridSplit =
+                                                (LakeSnapshotAndFlussLogSplit) 
split;
+                                        if 
(!hybridSplit.isLakeSplitFinished()) {
+                                            return false;
+                                        }
+                                    }
+
+                                    return removedPartitionsMap.containsKey(
+                                            
split.getTableBucket().getPartitionId());
+                                }));
 
         // send partition removed event to all readers
         PartitionsRemovedEvent event = new 
PartitionsRemovedEvent(removedPartitionsMap);
@@ -863,6 +896,11 @@ public class FlinkSourceEnumerator
         return assignedPartitions;
     }
 
+    @VisibleForTesting
+    Map<Integer, List<SourceSplitBase>> getPendingSplitAssignment() {
+        return pendingSplitAssignment;
+    }
+
     @Override
     public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) {
         LOG.debug("Flink Source Enumerator adds splits back: {}", splits);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
index e21c4a155..4c703e60c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.source.event;
 import org.apache.flink.api.connector.source.SourceEvent;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * A source event to represent partitions is removed to send from enumerator 
to reader.
@@ -41,6 +42,20 @@ public class PartitionsRemovedEvent implements SourceEvent {
         return removedPartitions;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PartitionsRemovedEvent that = (PartitionsRemovedEvent) o;
+        return Objects.equals(removedPartitions, that.removedPartitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(removedPartitions);
+    }
+
     @Override
     public String toString() {
         return "PartitionsRemovedEvent{" + "removedPartitions=" + 
removedPartitions + '}';
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index ce50fe406..9a2ab7aeb 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -102,7 +102,7 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     private final Table table;
     private final FlinkMetricRegistry flinkMetricRegistry;
 
-    @Nullable private LakeSource<LakeSplit> lakeSource;
+    @Nullable private final LakeSource<LakeSplit> lakeSource;
 
     // table id, will be null when haven't received any split
     private Long tableId;
@@ -131,7 +131,6 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
         this.boundedSplits = new ArrayDeque<>();
         this.subscribedBuckets = new HashMap<>();
         this.projectedFields = projectedFields;
-        if (projectedFields == null) {}
 
         this.flinkSourceReaderMetrics = flinkSourceReaderMetrics;
         sanityCheck(table.getTableInfo().getRowType(), projectedFields);
@@ -313,9 +312,12 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     }
 
     public Set<TableBucket> removePartitions(Map<Long, String> 
removedPartitions) {
-        // First, if the current active bounded split belongs to a removed 
partition,
-        // finish it so it will not be restored.
-        if (currentBoundedSplit != null) {
+        // First, if the current active bounded split belongs to a removed 
partition and is not
+        // LakeSnapshotSplit, finish it so it will not be restored.
+        // Splits contains lake splits cannot be terminated even if its 
corresponding partition has
+        // expired in Fluss; otherwise, union reads will fail to correctly 
read partitions that
+        // exist in the lake but have already expired in Fluss.
+        if (currentBoundedSplit != null && !currentBoundedSplit.isLakeSplit()) 
{
             TableBucket currentBucket = currentBoundedSplit.getTableBucket();
             if (removedPartitions.containsKey(currentBucket.getPartitionId())) 
{
                 try {
@@ -341,7 +343,11 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
         while (snapshotSplitIterator.hasNext()) {
             SourceSplitBase split = snapshotSplitIterator.next();
             TableBucket tableBucket = split.getTableBucket();
-            if (removedPartitions.containsKey(tableBucket.getPartitionId())) {
+            // Splits contains lake splits cannot be terminated even if its 
corresponding partition
+            // has expired in Fluss; otherwise, union reads will fail to 
correctly read partitions
+            // that exist in the lake but have already expired in Fluss.
+            if (removedPartitions.containsKey(tableBucket.getPartitionId())
+                    && !split.isLakeSplit()) {
                 removedSplits.add(split.splitId());
                 snapshotSplitIterator.remove();
                 unsubscribedTableBuckets.add(tableBucket);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index cbad27ef3..ddc3e9f1f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -23,6 +23,7 @@ import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
 import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.lake.source.TestingLakeSplit;
 import org.apache.fluss.metadata.TableBucket;
 
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -31,7 +32,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 
 import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
 import static 
org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
@@ -47,7 +47,7 @@ class LakeSplitSerializerTest {
     private static final int STOPPING_OFFSET = 1024;
 
     private static final LakeSplit LAKE_SPLIT =
-            new TestLakeSplit(0, Collections.singletonList("2025-08-18"));
+            new TestingLakeSplit(0, Collections.singletonList("2025-08-18"));
 
     private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
             new TestSimpleVersionedSerializer();
@@ -203,7 +203,7 @@ class LakeSplitSerializerTest {
             if (version < V2) {
                 return LAKE_SPLIT;
             }
-            return new TestLakeSplit(0, 
Collections.singletonList("2025-08-19"));
+            return new TestingLakeSplit(0, 
Collections.singletonList("2025-08-19"));
         }
 
         @Override
@@ -211,30 +211,4 @@ class LakeSplitSerializerTest {
             return V2;
         }
     }
-
-    private static class TestLakeSplit implements LakeSplit {
-
-        private final int bucket;
-        private final List<String> partition;
-
-        public TestLakeSplit(int bucket, List<String> partition) {
-            this.bucket = bucket;
-            this.partition = partition;
-        }
-
-        @Override
-        public String toString() {
-            return "TestLakeSplit";
-        }
-
-        @Override
-        public int bucket() {
-            return bucket;
-        }
-
-        @Override
-        public List<String> partition() {
-            return partition;
-        }
-    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 20196bc81..6dfce8e4d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -22,6 +22,8 @@ import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.client.write.HashBucketAssigner;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
 import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
 import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
@@ -30,6 +32,12 @@ import org.apache.fluss.flink.source.split.LogSplit;
 import org.apache.fluss.flink.source.split.SnapshotSplit;
 import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.lake.source.TestingLakeSource;
+import org.apache.fluss.lake.source.TestingLakeSplit;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -37,6 +45,9 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.encode.CompactedKeyEncoder;
 import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
+import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.api.connector.source.ReaderInfo;
@@ -45,9 +56,11 @@ import 
org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,7 +70,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
 import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -561,6 +576,189 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testPartitionsExpiredInFlussButExistInLake(
+            boolean isPrimaryKeyTable, @TempDir Path tempDir) throws Throwable 
{
+        int numSubtasks = 3;
+        TableDescriptor tableDescriptor =
+                isPrimaryKeyTable
+                        ? DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR
+                        : DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR;
+        long tableId = createTable(DEFAULT_TABLE_PATH, tableDescriptor);
+
+        ZooKeeperClient zooKeeperClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+        Map<Long, String> partitionNameByIds =
+                waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
+        assertThat(partitionNameByIds.size()).isEqualTo(2);
+        // Assume some data in the first partition has been tiered to the lake
+        Long hybridPartitionId = 
partitionNameByIds.keySet().stream().sorted().findFirst().get();
+        String hybridPartitionName = partitionNameByIds.get(hybridPartitionId);
+
+        // Mock expired partitions which already expired in Fluss, but exist 
in lake.
+        // Use a dummy partition id since these partitions are expired in fluss
+        Map<Long, String> expiredPartitions = new HashMap<>();
+        expiredPartitions.put(-1L, "expiredPartition1");
+        expiredPartitions.put(-2L, "expiredPartition2");
+
+        // Mock a lake snapshot for expired partitions and the first partition 
exists in Fluss
+        long lakeEndOffset = 50L;
+        LakeTableSnapshot lakeTableSnapshot =
+                new LakeTableSnapshot(
+                        0,
+                        ImmutableMap.of(
+                                new TableBucket(tableId, -1L, 0), 100L,
+                                new TableBucket(tableId, -1L, 1), 100L,
+                                new TableBucket(tableId, -1L, 2), 100L,
+                                new TableBucket(tableId, -2L, 0), 100L,
+                                new TableBucket(tableId, -2L, 1), 100L,
+                                new TableBucket(tableId, -2L, 2), 100L,
+                                new TableBucket(tableId, hybridPartitionId, 
0), lakeEndOffset,
+                                new TableBucket(tableId, hybridPartitionId, 
1), lakeEndOffset,
+                                new TableBucket(tableId, hybridPartitionId, 
2), lakeEndOffset));
+        LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, 
tempDir.toString());
+        lakeTableHelper.upsertLakeTable(tableId, DEFAULT_TABLE_PATH, 
lakeTableSnapshot);
+
+        // Create PartitionInfo for lake partitions
+        List<PartitionInfo> lakePartitionInfos = new ArrayList<>();
+        for (Map.Entry<Long, String> partition : expiredPartitions.entrySet()) 
{
+            Long partitionId = partition.getKey();
+            String partitionName = partition.getValue();
+            ResolvedPartitionSpec partitionSpec =
+                    ResolvedPartitionSpec.fromPartitionName(
+                            Collections.singletonList(isPrimaryKeyTable ? 
"date" : "name"),
+                            partitionName);
+            lakePartitionInfos.add(new PartitionInfo(partitionId, 
partitionSpec));
+        }
+        ResolvedPartitionSpec partitionSpec =
+                ResolvedPartitionSpec.fromPartitionName(
+                        Collections.singletonList(isPrimaryKeyTable ? "date" : 
"name"),
+                        hybridPartitionName);
+        lakePartitionInfos.add(new PartitionInfo(hybridPartitionId, 
partitionSpec));
+
+        LakeSource<LakeSplit> lakeSource =
+                new TestingLakeSource(DEFAULT_BUCKET_NUM, lakePartitionInfos);
+        try (MockSplitEnumeratorContext<SourceSplitBase> context =
+                        new MockSplitEnumeratorContext<>(numSubtasks);
+                MockWorkExecutor workExecutor = new MockWorkExecutor(context);
+                FlinkSourceEnumerator enumerator =
+                        new FlinkSourceEnumerator(
+                                DEFAULT_TABLE_PATH,
+                                flussConf,
+                                isPrimaryKeyTable,
+                                true,
+                                context,
+                                Collections.emptySet(),
+                                Collections.emptyMap(),
+                                null,
+                                OffsetsInitializer.full(),
+                                DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
+                                streaming,
+                                null,
+                                lakeSource,
+                                workExecutor)) {
+            enumerator.start();
+
+            // Remove the hybrid partition to mock expire after enumerator 
start
+            dropPartitions(
+                    zooKeeperClient,
+                    DEFAULT_TABLE_PATH,
+                    Collections.singleton(hybridPartitionName));
+
+            // Run periodic partition discovery to trigger 
handlePartitionsRemoved once
+            runPeriodicPartitionDiscovery(workExecutor);
+            // Verify that the pending splits belong to expired partitions are 
not removed
+            Map<Integer, List<SourceSplitBase>> pendingSplitAssignment =
+                    enumerator.getPendingSplitAssignment();
+            assertThat(
+                            (int)
+                                    pendingSplitAssignment.values().stream()
+                                            .flatMap(List::stream)
+                                            .filter(
+                                                    split ->
+                                                            
expiredPartitions.containsKey(
+                                                                    
split.getTableBucket()
+                                                                            
.getPartitionId()))
+                                            .count())
+                    .isEqualTo(expiredPartitions.size() * DEFAULT_BUCKET_NUM);
+            // Verify that the pending LakeSnapshotSplit(for log) and
+            // LakeSnapshotAndFlussLogSplit(for kv) are not removed for the 
expired partition
+            List<SourceSplitBase> hybridPendingSplits =
+                    pendingSplitAssignment.values().stream()
+                            .flatMap(List::stream)
+                            .filter(
+                                    split ->
+                                            Objects.equals(
+                                                    
split.getTableBucket().getPartitionId(),
+                                                    hybridPartitionId))
+                            .collect(Collectors.toList());
+            // log table will have 3 LakeSnapshotSplit
+            // kv table will have 3 LakeSnapshotAndFlussLogSplit
+            assertThat(hybridPendingSplits).hasSize(DEFAULT_BUCKET_NUM);
+            // Shouldn't have any PartitionsRemovedEvent, since no readers 
registered
+            assertThat(context.getSentSourceEvent()).isEmpty();
+
+            // Register the readers
+            for (int i = 0; i < numSubtasks; i++) {
+                registerReader(context, enumerator, i);
+            }
+
+            // All partitions include expired partitions should be assigned
+            Map<Long, String> expectedAssignedPartitions = new 
HashMap<>(partitionNameByIds);
+            lakePartitionInfos.forEach(
+                    partitionInfo ->
+                            expectedAssignedPartitions.put(
+                                    partitionInfo.getPartitionId(),
+                                    partitionInfo.getPartitionName()));
+            
assertThat(enumerator.getAssignedPartitions()).isEqualTo(expectedAssignedPartitions);
+
+            // Verify that splits for expired partitions are generated and 
assigned
+            Map<Integer, List<SourceSplitBase>> actualAssignments = 
getReadersAssignments(context);
+            Map<TableBucket, Integer> lakeSnapshotSplitsSplitIndex =
+                    actualAssignments.values().stream()
+                            .flatMap(List::stream)
+                            .filter(split -> split instanceof 
LakeSnapshotSplit)
+                            .map(split -> (LakeSnapshotSplit) split)
+                            .collect(
+                                    Collectors.toMap(
+                                            SourceSplitBase::getTableBucket,
+                                            LakeSnapshotSplit::getSplitIndex));
+            Map<Integer, List<SourceSplitBase>> expectedAssignments =
+                    expectAssignments(
+                            enumerator,
+                            tableId,
+                            isPrimaryKeyTable,
+                            partitionNameByIds,
+                            lakePartitionInfos,
+                            lakeSnapshotSplitsSplitIndex,
+                            expiredPartitions);
+            checkAssignmentIgnoreOrder(actualAssignments, expectedAssignments);
+
+            // Run periodic partition discovery to trigger 
handlePartitionsRemoved again
+            runPeriodicPartitionDiscovery(workExecutor);
+
+            // Verify that PartitionsRemovedEvent is sent
+            Map<Integer, List<SourceEvent>> expectedSentSourceEvent = new 
HashMap<>();
+            for (int i = 0; i < 3; i++) {
+                Map<Long, String> removedPartitions = new HashMap<>();
+                // Add removed fluss partition
+                removedPartitions.put(hybridPartitionId, hybridPartitionName);
+                // Add lake partitions
+                lakePartitionInfos.forEach(
+                        partitionInfo ->
+                                removedPartitions.put(
+                                        partitionInfo.getPartitionId(),
+                                        partitionInfo.getPartitionName()));
+
+                expectedSentSourceEvent.put(
+                        i,
+                        Collections.singletonList(new 
PartitionsRemovedEvent(removedPartitions)));
+            }
+            Map<Integer, List<SourceEvent>> actualSentSourceEvent = 
context.getSentSourceEvent();
+            
assertThat(actualSentSourceEvent).isEqualTo(expectedSentSourceEvent);
+        }
+    }
+
     // ---------------------
     private void registerReader(
             MockSplitEnumeratorContext<SourceSplitBase> context,
@@ -586,6 +784,87 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
         return expectedAssignment;
     }
 
+    private Map<Integer, List<SourceSplitBase>> expectAssignments(
+            FlinkSourceEnumerator enumerator,
+            long tableId,
+            boolean hasPrimaryKey,
+            Map<Long, String> partitionNameIds,
+            List<PartitionInfo> lakePartitionInfos,
+            Map<TableBucket, Integer> lakeSnapshotSplitsSplitIndex,
+            Map<Long, String> expiredPartitions) {
+        Map<Integer, List<SourceSplitBase>> expectedAssignment = new 
HashMap<>();
+
+        // for partitions exist in Fluss when startup
+        for (Long partitionId : partitionNameIds.keySet()) {
+            for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
+                TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
+                List<SourceSplitBase> splits = new ArrayList<>();
+                if (hasPrimaryKey) {
+                    splits.add(
+                            new LakeSnapshotAndFlussLogSplit(
+                                    tableBucket,
+                                    partitionNameIds.get(partitionId),
+                                    null,
+                                    EARLIEST_OFFSET,
+                                    Long.MIN_VALUE));
+                } else {
+                    if (lakePartitionInfos.stream()
+                            .map(PartitionInfo::getPartitionId)
+                            .anyMatch(partitionId::equals)) {
+                        splits.add(
+                                new LakeSnapshotSplit(
+                                        tableBucket,
+                                        partitionNameIds.get(partitionId),
+                                        new TestingLakeSplit(
+                                                i,
+                                                Collections.singletonList(
+                                                        
partitionNameIds.get(partitionId))),
+                                        
lakeSnapshotSplitsSplitIndex.get(tableBucket)));
+                    } else {
+                        splits.add(
+                                new LogSplit(
+                                        tableBucket,
+                                        partitionNameIds.get(partitionId),
+                                        EARLIEST_OFFSET));
+                    }
+                }
+                splits.forEach(
+                        split -> {
+                            int task = enumerator.getSplitOwner(split);
+                            expectedAssignment
+                                    .computeIfAbsent(task, k -> new 
ArrayList<>())
+                                    .add(split);
+                        });
+            }
+        }
+
+        // for partitions expired in Fluss but exists in lake
+        for (PartitionInfo lakePartitionInfo : lakePartitionInfos) {
+            Long partitionId = lakePartitionInfo.getPartitionId();
+            if (!expiredPartitions.containsKey(partitionId)) {
+                continue;
+            }
+
+            String lakePartitionName = lakePartitionInfo.getPartitionName();
+            List<String> partitionValues =
+                    
lakePartitionInfo.getResolvedPartitionSpec().getPartitionValues();
+            for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
+                TableBucket tableBucket =
+                        new TableBucket(tableId, 
lakePartitionInfo.getPartitionId(), i);
+                SourceSplitBase split =
+                        new LakeSnapshotSplit(
+                                tableBucket,
+                                lakePartitionName,
+                                new TestingLakeSplit(i, partitionValues),
+                                lakeSnapshotSplitsSplitIndex.get(tableBucket));
+                int task = enumerator.getSplitOwner(split);
+                expectedAssignment.computeIfAbsent(task, k -> new 
ArrayList<>()).add(split);
+            }
+        }
+
+        return expectedAssignment;
+    }
+
     private void checkAssignmentIgnoreOrder(
             Map<Integer, List<SourceSplitBase>> actualAssignment,
             Map<Integer, List<SourceSplitBase>> expectedAssignment) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
index 6634236aa..bf9792251 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
@@ -26,6 +26,8 @@ import 
org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
 import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
 import org.apache.fluss.flink.source.split.LogSplit;
 import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
@@ -85,7 +87,8 @@ class FlinkSourceReaderTest extends FlinkTestBase {
                         clientConf,
                         tablePath,
                         tableDescriptor.getSchema().getRowType(),
-                        readerContext)) {
+                        readerContext,
+                        null)) {
 
             // first of all, add all splits of all partitions to the reader
             Map<Long, Set<TableBucket>> assignedBuckets = new HashMap<>();
@@ -159,7 +162,8 @@ class FlinkSourceReaderTest extends FlinkTestBase {
             Configuration flussConf,
             TablePath tablePath,
             RowType sourceOutputType,
-            SourceReaderContext context)
+            SourceReaderContext context,
+            LakeSource<LakeSplit> lakeSource)
             throws Exception {
         FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordAndPos>> 
elementsQueue =
                 new FutureCompletingBlockingQueue<>();
@@ -181,6 +185,6 @@ class FlinkSourceReaderTest extends FlinkTestBase {
                 null,
                 new FlinkSourceReaderMetrics(context.metricGroup()),
                 recordEmitter,
-                null);
+                lakeSource);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
index 6a413f80d..26b4a024c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
@@ -17,14 +17,23 @@
 
 package org.apache.fluss.flink.source.state;
 
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
+import org.apache.fluss.flink.source.split.LogSplit;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.lake.source.TestingLakeSource;
+import org.apache.fluss.lake.source.TestingLakeSplit;
 import org.apache.fluss.metadata.TableBucket;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -38,7 +47,7 @@ class SourceEnumeratorStateSerializerTest {
     @Test
     void testPendingSplitsCheckpointSerde() throws Exception {
         FlussSourceEnumeratorStateSerializer serializer =
-                new FlussSourceEnumeratorStateSerializer(null);
+                new FlussSourceEnumeratorStateSerializer(new 
TestingLakeSource());
 
         Set<TableBucket> assignedBuckets =
                 new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
@@ -46,13 +55,37 @@ class SourceEnumeratorStateSerializerTest {
         assignedPartitions.put(1L, "partition1");
         assignedPartitions.put(2L, "partition2");
 
+        // Create remaining hybrid lake fluss splits with different types
+        List<SourceSplitBase> remainingHybridLakeFlussSplits = new 
ArrayList<>();
+
+        // Add a LogSplit
+        TableBucket logSplitBucket = new TableBucket(1, 0);
+        LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
+        remainingHybridLakeFlussSplits.add(logSplit);
+
+        // Add a HybridSnapshotLogSplit
+        TableBucket hybridSplitBucket = new TableBucket(1, 1);
+        HybridSnapshotLogSplit hybridSplit =
+                new HybridSnapshotLogSplit(hybridSplitBucket, null, 200L, 50L);
+        remainingHybridLakeFlussSplits.add(hybridSplit);
+
+        // Add a LakeSnapshotAndFlussLogSplit
+        TableBucket lakeHybridSplitBucket = new TableBucket(1, 100L, 2);
+        List<LakeSplit> lakeSplits =
+                Collections.singletonList(
+                        new TestingLakeSplit(2, 
Collections.singletonList("2024-01-01")));
+        LakeSnapshotAndFlussLogSplit lakeHybridSplit =
+                new LakeSnapshotAndFlussLogSplit(
+                        lakeHybridSplitBucket, "2024-01-01", lakeSplits, 300L, 
Long.MIN_VALUE);
+        remainingHybridLakeFlussSplits.add(lakeHybridSplit);
+
         SourceEnumeratorState sourceEnumeratorState =
                 new SourceEnumeratorState(
-                        assignedBuckets, assignedPartitions, 
Collections.emptyList());
+                        assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
 
-        // serialize assigned buckets
+        // serialize state with remaining hybrid lake fluss splits
         byte[] serialized = serializer.serialize(sourceEnumeratorState);
-        // deserialize assigned buckets
+        // deserialize state
         SourceEnumeratorState deserializedSourceEnumeratorState =
                 serializer.deserialize(serializer.getVersion(), serialized);
 
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
index aa9da5986..7096c0abe 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
@@ -72,4 +72,14 @@ public class PaimonSplit implements LakeSplit {
     public boolean isBucketUnAware() {
         return isBucketUnAware;
     }
+
+    @Override
+    public String toString() {
+        return "PaimonSplit{"
+                + "dataSplit="
+                + dataSplit
+                + ", isBucketUnAware="
+                + isBucketUnAware
+                + '}';
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index cdb88dcbf..6ebffa9c5 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -17,6 +17,8 @@
 
 package org.apache.fluss.lake.paimon.flink;
 
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.InternalRow;
@@ -31,6 +33,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -38,10 +41,12 @@ import org.junit.jupiter.params.provider.ValueSource;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +55,9 @@ import java.util.stream.Collectors;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The IT case for Flink union data in lake and fluss for log table. */
@@ -242,6 +249,90 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         jobClient.cancel().get();
     }
 
+    @Test
+    void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "expired_partition_logTable";
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new ArrayList<>();
+        long tableId = prepareLogTable(tablePath, DEFAULT_BUCKET_NUM, true, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, true);
+
+        // Get all partitions
+        Map<Long, String> partitionNameByIds = waitUntilPartitions(tablePath);
+        assertThat(partitionNameByIds.size()).isGreaterThan(0);
+
+        // Select one partition to drop (expire in Fluss)
+        Long partitionToDropId = partitionNameByIds.keySet().iterator().next();
+        String partitionToDropName = partitionNameByIds.get(partitionToDropId);
+
+        // Filter rows that belong to the partition to be dropped
+        List<Row> rowsInExpiredPartition =
+                writtenRows.stream()
+                        .filter(row -> 
partitionToDropName.equals(row.getField(15)))
+                        .collect(Collectors.toList());
+        assertThat(rowsInExpiredPartition).isNotEmpty();
+
+        // Now drop the partition in Fluss (make it expired)
+        // The partition data still exists in Paimon
+        admin.dropPartition(
+                        tablePath,
+                        new PartitionSpec(Collections.singletonMap("p", 
partitionToDropName)),
+                        false)
+                .get();
+
+        // Retry until partition dropped
+        retry(
+                Duration.ofSeconds(60),
+                () -> {
+                    List<PartitionInfo> remainingPartitions =
+                            admin.listPartitionInfos(tablePath).get();
+                    assertThat(remainingPartitions.size()).isEqualTo(1);
+                });
+
+        // Now query the table - it should read data from both:
+        // 1. Remaining partitions from Fluss
+        // 2. Expired partition from Paimon (union read)
+        CloseableIterator<Row> iterator =
+                streamTEnv
+                        .executeSql(
+                                "select * from "
+                                        + tableName
+                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+                        .collect();
+        List<String> actual = collectRowsWithTimeout(iterator, 
writtenRows.size(), true);
+        assertThat(actual)
+                .containsExactlyInAnyOrderElementsOf(
+                        
writtenRows.stream().map(Row::toString).collect(Collectors.toList()));
+
+        // Test partition filter - query only the expired partition
+        String sqlWithPartitionFilter =
+                "select"
+                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */"
+                        + " * FROM "
+                        + tableName
+                        + " WHERE p = '"
+                        + partitionToDropName
+                        + "'";
+        iterator = streamTEnv.executeSql(sqlWithPartitionFilter).collect();
+        List<String> filteredActual =
+                collectRowsWithTimeout(iterator, 
rowsInExpiredPartition.size(), true);
+
+        // Should still be able to read data from expired partition via Paimon
+        assertThat(filteredActual)
+                .as("Should read expired partition data from Paimon when 
filtering by partition")
+                .containsExactlyInAnyOrderElementsOf(
+                        rowsInExpiredPartition.stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()));
+
+        // cancel the tiering job
+        jobClient.cancel().get();
+    }
+
     private long prepareLogTable(
             TablePath tablePath, int bucketNum, boolean isPartitioned, 
List<Row> flinkRows)
             throws Exception {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 5baa4868f..7c8c2d698 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -19,6 +19,8 @@ package org.apache.fluss.lake.paimon.flink;
 
 import org.apache.fluss.config.AutoPartitionTimeUnit;
 import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -62,7 +64,9 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The IT case for Flink union data in lake and fluss for primary key table. 
*/
@@ -763,6 +767,109 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         jobClient.cancel().get();
     }
 
+    @Test
+    void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "expired_partition_pk_table";
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        // create table and write data
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        Function<String, List<InternalRow>> rowGenerator =
+                (partition) ->
+                        Arrays.asList(
+                                row(3, "string", partition), row(30, 
"another_string", partition));
+        long tableId =
+                prepareSimplePKTable(
+                        tablePath, DEFAULT_BUCKET_NUM, true, rowGenerator, 
bucketLogEndOffset);
+
+        // wait until records has been synced
+        waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, true);
+
+        // Get all partitions
+        Map<Long, String> partitionNameByIds = waitUntilPartitions(tablePath);
+        assertThat(partitionNameByIds.size()).isGreaterThan(0);
+
+        // Build expected rows for all partitions
+        // Simple PK table has 3 columns: c1 (INT), c2 (STRING), c3 (STRING) 
where c3 is partition
+        List<Row> expectedAllRows = new ArrayList<>();
+        for (String partition : partitionNameByIds.values()) {
+            expectedAllRows.add(Row.of(3, "string", partition));
+            expectedAllRows.add(Row.of(30, "another_string", partition));
+        }
+
+        // Select one partition to drop (expire in Fluss)
+        Long partitionToDropId = partitionNameByIds.keySet().iterator().next();
+        String partitionToDropName = partitionNameByIds.get(partitionToDropId);
+
+        // Filter rows that belong to the partition to be dropped
+        // c3 is the partition column (index 2)
+        List<Row> rowsInExpiredPartition =
+                expectedAllRows.stream()
+                        .filter(row -> 
partitionToDropName.equals(row.getField(2)))
+                        .collect(Collectors.toList());
+        assertThat(rowsInExpiredPartition).isNotEmpty();
+
+        // Now drop the partition in Fluss (make it expired)
+        // The partition data still exists in Paimon
+        // c3 is the partition column for simple PK table
+        admin.dropPartition(
+                        tablePath,
+                        new PartitionSpec(Collections.singletonMap("c3", 
partitionToDropName)),
+                        false)
+                .get();
+
+        // Retry until partition dropped
+        retry(
+                Duration.ofSeconds(60),
+                () -> {
+                    List<PartitionInfo> remainingPartitions =
+                            admin.listPartitionInfos(tablePath).get();
+                    assertThat(remainingPartitions.size()).isEqualTo(1);
+                });
+
+        // Now query the table - it should read data from both:
+        // 1. Remaining partitions from Fluss
+        // 2. Expired partition from Paimon (union read)
+        CloseableIterator<Row> iterator =
+                streamTEnv
+                        .executeSql(
+                                "select * from "
+                                        + tableName
+                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+                        .collect();
+        List<String> actual = collectRowsWithTimeout(iterator, 
expectedAllRows.size(), true);
+        assertThat(actual)
+                .containsExactlyInAnyOrderElementsOf(
+                        
expectedAllRows.stream().map(Row::toString).collect(Collectors.toList()));
+
+        // Test partition filter - query only the expired partition
+        // c3 is the partition column for simple PK table
+        String sqlWithPartitionFilter =
+                "select"
+                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */"
+                        + " * FROM "
+                        + tableName
+                        + " WHERE c3 = '"
+                        + partitionToDropName
+                        + "'";
+        iterator = streamTEnv.executeSql(sqlWithPartitionFilter).collect();
+        List<String> filteredActual =
+                collectRowsWithTimeout(iterator, 
rowsInExpiredPartition.size(), true);
+
+        // Should still be able to read data from expired partition via Paimon
+        assertThat(filteredActual)
+                .as("Should read expired partition data from Paimon when 
filtering by partition")
+                .containsExactlyInAnyOrderElementsOf(
+                        rowsInExpiredPartition.stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()));
+
+        // cancel the tiering job
+        jobClient.cancel().get();
+    }
+
     private List<Row> sortedRows(List<Row> rows) {
         rows.sort(Comparator.comparing(Row::toString));
         return rows;

Reply via email to