This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new eb7a0f5e9 [flink] Fix skipping partition pushdown for primary key
lakehouse table in batch mode. (#2585)
eb7a0f5e9 is described below
commit eb7a0f5e91ec638fa6b80e1b85d943b2c3e5e16d
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Feb 9 14:12:50 2026 +0800
[flink] Fix skipping partition pushdown for primary key lakehouse table in
batch mode. (#2585)
---
.../fluss/flink/source/FlinkTableSource.java | 22 +++---
.../source/testutils/FlinkRowAssertionsUtils.java | 9 +++
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 79 +++++++++++++++++++++-
3 files changed, 98 insertions(+), 12 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 31377048c..0946db99d 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -490,17 +490,21 @@ public class FlinkTableSource
lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex],
fieldEqual.equalValue);
visitedPkFields.add(fieldEqual.fieldIndex);
}
- // if not all primary key fields are in condition, we skip to
pushdown
- if (!visitedPkFields.equals(primaryKeyTypes.keySet())) {
- return Result.of(Collections.emptyList(), filters);
+ // if not all primary key fields are in condition, meaning can not
push down single row
+ // filter, determine whether to push down partition filter later
+
+ if (visitedPkFields.equals(primaryKeyTypes.keySet())) {
+ singleRowFilter = lookupRow;
+ // FLINK-38635 We cannot determine whether this source will
ultimately be used as a
+ // scan
+ // source or a lookup source. Since fluss lookup sources
cannot accept filters yet,
+ // to
+ // be safe, we return all filters to the Flink planner.
+ return Result.of(acceptedFilters, filters);
}
- singleRowFilter = lookupRow;
+ }
- // FLINK-38635 We cannot determine whether this source will
ultimately be used as a scan
- // source or a lookup source. Since fluss lookup sources cannot
accept filters yet, to
- // be safe, we return all filters to the Flink planner.
- return Result.of(acceptedFilters, filters);
- } else if (isPartitioned()) {
+ if (isPartitioned()) {
// apply partition filter pushdown
List<Predicate> converted = new ArrayList<>();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 00807497d..a489cc631 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -110,6 +110,15 @@ public class FlinkRowAssertionsUtils {
Duration.ofMinutes(1));
}
+ public static List<String> collectBatchRows(CloseableIterator<Row>
iterator) throws Exception {
+ List<String> actual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ iterator.close();
+ return actual;
+ }
+
protected static List<String> collectRowsWithTimeout(
CloseableIterator<Row> iterator,
int expectedCount,
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 68dd9ba14..9837cbcd3 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -60,6 +60,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -67,6 +68,7 @@ import java.util.stream.Collectors;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectBatchRows;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
@@ -95,7 +97,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
long tableId =
preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
- // wait unit records have been synced
+ // wait until records have been synced
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
// check the status of replica after synced
@@ -523,7 +525,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
bucketLogEndOffset.put(new TableBucket(tableId, 1), 1L);
bucketLogEndOffset.put(new TableBucket(tableId, 2), 1L);
- // wait unit records have been synced
+ // wait until records have been synced
waitUntilBucketsSynced(bucketLogEndOffset.keySet());
// check the status of replica after synced
@@ -558,7 +560,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
long tableId =
preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
- // wait unit records have been synced
+ // wait until records have been synced
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
// check the status of replica after synced
@@ -1117,6 +1119,77 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
jobClient.cancel().get();
}
+ @Test
+ void testPartitionFilterOnPartitionedTableInBatch() throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "stream_pk_table_full";
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId = preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, true,
bucketLogEndOffset);
+
+ // wait unit records have been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, true);
+
+ // check the status of replica after synced
+ assertReplicaStatus(bucketLogEndOffset);
+ // Stop tiering to ensure we read from Paimon snapshot
+ jobClient.cancel().get();
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ Map<Long, String> partitionNameById =
+
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+ Iterator<String> partitionIterator =
+ partitionNameById.values().stream().sorted().iterator();
+ String partition1 = partitionIterator.next();
+ String partition2 = partitionIterator.next();
+ String query =
+ String.format(
+ "SELECT c1, c2, c3, c19 FROM %s WHERE c19 between '%s'
and '%s'",
+ tableName, partition1, partition2);
+
+ assertThat(batchTEnv.explainSql(query))
+ .contains(
+ String.format(
+ "TableSourceScan(table=[[testcatalog, %s, %s, "
+ + "filter=[and(>=(c19, _UTF-16LE'%s'),
<=(c19, _UTF-16LE'%s'))], "
+ + "project=[c1, c2, c3, c19]]], "
+ + "fields=[c1, c2, c3, c19])",
+ DEFAULT_DB, tableName, partition1,
partition2));
+
+ CloseableIterator<Row> collected =
batchTEnv.executeSql(query).collect();
+ List<String> expected =
+ Arrays.asList(
+ String.format("+I[false, 1, 2, %s]", partition1),
+ String.format("+I[true, 10, 20, %s]", partition1),
+ String.format("+I[false, 1, 2, %s]", partition2),
+ String.format("+I[true, 10, 20, %s]", partition2));
+ List<String> actual = collectBatchRows(collected);
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+ query =
+ String.format(
+ "SELECT c1, c2, c3, c19 FROM %s WHERE c19 = '%s'",
tableName, partition2);
+
+ assertThat(batchTEnv.explainSql(query))
+ .contains(
+ String.format(
+ "TableSourceScan(table=[[testcatalog, %s, %s, "
+ + "filter=[=(c19,
_UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
+ + "project=[c1, c2, c3, c19]]], "
+ + "fields=[c1, c2, c3, c19])",
+ DEFAULT_DB, tableName, partition2));
+ collected = batchTEnv.executeSql(query).collect();
+ actual = collectBatchRows(collected);
+ expected =
+ Arrays.asList(
+ String.format("+I[false, 1, 2, %s]", partition2),
+ String.format("+I[true, 10, 20, %s]", partition2));
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
private List<Row> sortedRows(List<Row> rows) {
rows.sort(Comparator.comparing(Row::toString));
return rows;