This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 04470ee9d [hotfix] Fix union read fail when from timestamp and 
projection enabled (#2383)
04470ee9d is described below

commit 04470ee9db4b24c972316a2f45dd6fc05de932b1
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Jan 19 11:10:03 2026 +0800

    [hotfix] Fix union read fail when from timestamp and projection enabled 
(#2383)
---
 .../apache/fluss/flink/lake/LakeSplitGenerator.java |  4 ++--
 .../apache/fluss/flink/source/FlinkTableSource.java |  8 +++-----
 .../source/enumerator/FlinkSourceEnumerator.java    | 21 +++++++++++----------
 .../flink/FlinkUnionReadFromTimestampITCase.java    |  8 ++++++--
 4 files changed, 22 insertions(+), 19 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index cd0926294..42038cd74 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -119,7 +119,7 @@ public class LakeSplitGenerator {
                     lakeSplits, isLogTable, tableBucketsOffset, 
partitionNameById);
         } else {
             Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
-                    lakeSplits.values().iterator().next();
+                    lakeSplits.isEmpty() ? null : 
lakeSplits.values().iterator().next();
             // non-partitioned table
             return generateNoPartitionedTableSplit(
                     nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
@@ -307,7 +307,7 @@ public class LakeSplitGenerator {
     }
 
     private List<SourceSplitBase> generateNoPartitionedTableSplit(
-            Map<Integer, List<LakeSplit>> lakeSplits,
+            @Nullable Map<Integer, List<LakeSplit>> lakeSplits,
             boolean isLogTable,
             Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
         // iterate all bucket
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 6acdbf840..45ce7945a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -331,8 +331,7 @@ public class FlinkTableSource
                     enableLakeSource = false;
                 } else {
                     if (enableLakeSource) {
-                        enableLakeSource =
-                                pushTimeStampFilterToLakeSource(lakeSource, 
flussRowType);
+                        enableLakeSource = 
pushTimeStampFilterToLakeSource(lakeSource);
                     }
                 }
                 break;
@@ -385,12 +384,11 @@ public class FlinkTableSource
         }
     }
 
-    private boolean pushTimeStampFilterToLakeSource(
-            LakeSource<?> lakeSource, RowType flussRowType) {
+    private boolean pushTimeStampFilterToLakeSource(LakeSource<?> lakeSource) {
         // will push timestamp to lake
         // we will have three additional system columns, __bucket, __offset, 
__timestamp
         // in lake, get the  __timestamp index in lake table
-        final int timestampFieldIndex = flussRowType.getFieldCount() + 2;
+        final int timestampFieldIndex = tableOutputType.getFieldCount() + 2;
         Predicate timestampFilter =
                 new LeafPredicate(
                         GreaterOrEqual.INSTANCE,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index f8db0aff1..9df9502ff 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -770,16 +770,6 @@ public class FlinkSourceEnumerator
                             TableBucket tableBucket = split.getTableBucket();
                             assignedTableBuckets.add(tableBucket);
 
-                            if (pendingHybridLakeFlussSplits != null) {
-                                // removed from the 
pendingHybridLakeFlussSplits
-                                // since this split already be assigned
-                                pendingHybridLakeFlussSplits.removeIf(
-                                        hybridLakeFlussSplit ->
-                                                hybridLakeFlussSplit
-                                                        .splitId()
-                                                        
.equals(split.splitId()));
-                            }
-
                             if (isPartitioned) {
                                 long partitionId =
                                         checkNotNull(
@@ -792,6 +782,17 @@ public class FlinkSourceEnumerator
                                 assignedPartitions.put(partitionId, 
partitionName);
                             }
                         });
+
+                if (pendingHybridLakeFlussSplits != null) {
+                    Set<String> splitIdsToRemove =
+                            pendingAssignmentForReader.stream()
+                                    .map(SourceSplitBase::splitId)
+                                    .collect(Collectors.toSet());
+                    // removed from the pendingHybridLakeFlussSplits
+                    // since this split already be assigned
+                    pendingHybridLakeFlussSplits.removeIf(
+                            split -> 
splitIdsToRemove.contains(split.splitId()));
+                }
             }
         }
 
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
index efa1a84df..9cab9c7d4 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
@@ -140,12 +140,16 @@ class FlinkUnionReadFromTimestampITCase extends 
FlinkPaimonTieringTestBase {
             CloseableIterator<Row> actualRows =
                     streamTEnv
                             .executeSql(
-                                    "select * from "
+                                    "select b from "
                                             + tableName
                                             + " /*+ 
OPTIONS('scan.startup.mode' = 'timestamp',\n"
                                             + "'scan.startup.timestamp' = 
'2000') */")
                             .collect();
-            List<Row> expectedRows = rows.stream().skip(2 * 
3).collect(Collectors.toList());
+            List<Row> expectedRows =
+                    rows.stream()
+                            .skip(2 * 3)
+                            .map(row -> Row.of(row.getField(1)))
+                            .collect(Collectors.toList());
             assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
 
             // verify scan from earliest

Reply via email to