This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 04470ee9d [hotfix] Fix union read fail when from timestamp and
projection enabled (#2383)
04470ee9d is described below
commit 04470ee9db4b24c972316a2f45dd6fc05de932b1
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Jan 19 11:10:03 2026 +0800
[hotfix] Fix union read fail when from timestamp and projection enabled
(#2383)
---
.../apache/fluss/flink/lake/LakeSplitGenerator.java | 4 ++--
.../apache/fluss/flink/source/FlinkTableSource.java | 8 +++-----
.../source/enumerator/FlinkSourceEnumerator.java | 21 +++++++++++----------
.../flink/FlinkUnionReadFromTimestampITCase.java | 8 ++++++--
4 files changed, 22 insertions(+), 19 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index cd0926294..42038cd74 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -119,7 +119,7 @@ public class LakeSplitGenerator {
lakeSplits, isLogTable, tableBucketsOffset,
partitionNameById);
} else {
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
- lakeSplits.values().iterator().next();
+ lakeSplits.isEmpty() ? null :
lakeSplits.values().iterator().next();
// non-partitioned table
return generateNoPartitionedTableSplit(
nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
@@ -307,7 +307,7 @@ public class LakeSplitGenerator {
}
private List<SourceSplitBase> generateNoPartitionedTableSplit(
- Map<Integer, List<LakeSplit>> lakeSplits,
+ @Nullable Map<Integer, List<LakeSplit>> lakeSplits,
boolean isLogTable,
Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
// iterate all bucket
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 6acdbf840..45ce7945a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -331,8 +331,7 @@ public class FlinkTableSource
enableLakeSource = false;
} else {
if (enableLakeSource) {
- enableLakeSource =
- pushTimeStampFilterToLakeSource(lakeSource,
flussRowType);
+ enableLakeSource =
pushTimeStampFilterToLakeSource(lakeSource);
}
}
break;
@@ -385,12 +384,11 @@ public class FlinkTableSource
}
}
- private boolean pushTimeStampFilterToLakeSource(
- LakeSource<?> lakeSource, RowType flussRowType) {
+ private boolean pushTimeStampFilterToLakeSource(LakeSource<?> lakeSource) {
// will push timestamp to lake
// we will have three additional system columns, __bucket, __offset,
__timestamp
// in lake, get the __timestamp index in lake table
- final int timestampFieldIndex = flussRowType.getFieldCount() + 2;
+ final int timestampFieldIndex = tableOutputType.getFieldCount() + 2;
Predicate timestampFilter =
new LeafPredicate(
GreaterOrEqual.INSTANCE,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index f8db0aff1..9df9502ff 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -770,16 +770,6 @@ public class FlinkSourceEnumerator
TableBucket tableBucket = split.getTableBucket();
assignedTableBuckets.add(tableBucket);
- if (pendingHybridLakeFlussSplits != null) {
- // removed from the
pendingHybridLakeFlussSplits
- // since this split already be assigned
- pendingHybridLakeFlussSplits.removeIf(
- hybridLakeFlussSplit ->
- hybridLakeFlussSplit
- .splitId()
-
.equals(split.splitId()));
- }
-
if (isPartitioned) {
long partitionId =
checkNotNull(
@@ -792,6 +782,17 @@ public class FlinkSourceEnumerator
assignedPartitions.put(partitionId,
partitionName);
}
});
+
+ if (pendingHybridLakeFlussSplits != null) {
+ Set<String> splitIdsToRemove =
+ pendingAssignmentForReader.stream()
+ .map(SourceSplitBase::splitId)
+ .collect(Collectors.toSet());
+ // removed from the pendingHybridLakeFlussSplits
+ // since this split already be assigned
+ pendingHybridLakeFlussSplits.removeIf(
+ split ->
splitIdsToRemove.contains(split.splitId()));
+ }
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
index efa1a84df..9cab9c7d4 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
@@ -140,12 +140,16 @@ class FlinkUnionReadFromTimestampITCase extends
FlinkPaimonTieringTestBase {
CloseableIterator<Row> actualRows =
streamTEnv
.executeSql(
- "select * from "
+ "select b from "
+ tableName
+ " /*+
OPTIONS('scan.startup.mode' = 'timestamp',\n"
+ "'scan.startup.timestamp' =
'2000') */")
.collect();
- List<Row> expectedRows = rows.stream().skip(2 *
3).collect(Collectors.toList());
+ List<Row> expectedRows =
+ rows.stream()
+ .skip(2 * 3)
+ .map(row -> Row.of(row.getField(1)))
+ .collect(Collectors.toList());
assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
// verify scan from earliest