This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 6543da48dde5f7c4d0c0718eb10d23503a7c3995
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Feb 9 14:12:50 2026 +0800

    [flink] Fix skipping partition pushdown for primary key lakehouse table in 
batch mode. (#2585)
---
 .../fluss/flink/source/FlinkTableSource.java       | 22 +++---
 .../source/testutils/FlinkRowAssertionsUtils.java  |  9 +++
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 79 +++++++++++++++++++++-
 3 files changed, 98 insertions(+), 12 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 31377048c..0946db99d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -490,17 +490,21 @@ public class FlinkTableSource
                 lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], 
fieldEqual.equalValue);
                 visitedPkFields.add(fieldEqual.fieldIndex);
             }
-            // if not all primary key fields are in condition, we skip to 
pushdown
-            if (!visitedPkFields.equals(primaryKeyTypes.keySet())) {
-                return Result.of(Collections.emptyList(), filters);
+            // if not all primary key fields are in condition, meaning can not 
push down single row
+            // filter, determine whether to push down partition filter later
+
+            if (visitedPkFields.equals(primaryKeyTypes.keySet())) {
+                singleRowFilter = lookupRow;
+                // FLINK-38635 We cannot determine whether this source will 
ultimately be used as a
+                // scan
+                // source or a lookup source. Since fluss lookup sources 
cannot accept filters yet,
+                // to
+                // be safe, we return all filters to the Flink planner.
+                return Result.of(acceptedFilters, filters);
             }
-            singleRowFilter = lookupRow;
+        }
 
-            // FLINK-38635 We cannot determine whether this source will 
ultimately be used as a scan
-            // source or a lookup source. Since fluss lookup sources cannot 
accept filters yet, to
-            // be safe, we return all filters to the Flink planner.
-            return Result.of(acceptedFilters, filters);
-        } else if (isPartitioned()) {
+        if (isPartitioned()) {
             // apply partition filter pushdown
             List<Predicate> converted = new ArrayList<>();
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 00807497d..a489cc631 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -110,6 +110,15 @@ public class FlinkRowAssertionsUtils {
                 Duration.ofMinutes(1));
     }
 
+    public static List<String> collectBatchRows(CloseableIterator<Row> 
iterator) throws Exception {
+        List<String> actual = new ArrayList<>();
+        while (iterator.hasNext()) {
+            actual.add(iterator.next().toString());
+        }
+        iterator.close();
+        return actual;
+    }
+
     protected static List<String> collectRowsWithTimeout(
             CloseableIterator<Row> iterator,
             int expectedCount,
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 68dd9ba14..9837cbcd3 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -60,6 +60,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -67,6 +68,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectBatchRows;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
@@ -95,7 +97,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         long tableId =
                 preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
 
-        // wait unit records have been synced
+        // wait until records have been synced
         waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
 
         // check the status of replica after synced
@@ -523,7 +525,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         bucketLogEndOffset.put(new TableBucket(tableId, 1), 1L);
         bucketLogEndOffset.put(new TableBucket(tableId, 2), 1L);
 
-        // wait unit records have been synced
+        // wait until records have been synced
         waitUntilBucketsSynced(bucketLogEndOffset.keySet());
 
         // check the status of replica after synced
@@ -558,7 +560,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         long tableId =
                 preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
 
-        // wait unit records have been synced
+        // wait until records have been synced
         waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
 
         // check the status of replica after synced
@@ -1117,6 +1119,77 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         jobClient.cancel().get();
     }
 
+    @Test
+    void testPartitionFilterOnPartitionedTableInBatch() throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_pk_table_full";
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId = preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, true, 
bucketLogEndOffset);
+
+        // wait unit records have been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, true);
+
+        // check the status of replica after synced
+        assertReplicaStatus(bucketLogEndOffset);
+        // Stop tiering to ensure we read from Paimon snapshot
+        jobClient.cancel().get();
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        Map<Long, String> partitionNameById =
+                
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+        Iterator<String> partitionIterator =
+                partitionNameById.values().stream().sorted().iterator();
+        String partition1 = partitionIterator.next();
+        String partition2 = partitionIterator.next();
+        String query =
+                String.format(
+                        "SELECT c1, c2, c3, c19 FROM %s WHERE c19 between '%s' 
and '%s'",
+                        tableName, partition1, partition2);
+
+        assertThat(batchTEnv.explainSql(query))
+                .contains(
+                        String.format(
+                                "TableSourceScan(table=[[testcatalog, %s, %s, "
+                                        + "filter=[and(>=(c19, _UTF-16LE'%s'), 
<=(c19, _UTF-16LE'%s'))], "
+                                        + "project=[c1, c2, c3, c19]]], "
+                                        + "fields=[c1, c2, c3, c19])",
+                                DEFAULT_DB, tableName, partition1, 
partition2));
+
+        CloseableIterator<Row> collected = 
batchTEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        String.format("+I[false, 1, 2, %s]", partition1),
+                        String.format("+I[true, 10, 20, %s]", partition1),
+                        String.format("+I[false, 1, 2, %s]", partition2),
+                        String.format("+I[true, 10, 20, %s]", partition2));
+        List<String> actual = collectBatchRows(collected);
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+        query =
+                String.format(
+                        "SELECT c1, c2, c3, c19 FROM %s WHERE c19 = '%s'", 
tableName, partition2);
+
+        assertThat(batchTEnv.explainSql(query))
+                .contains(
+                        String.format(
+                                "TableSourceScan(table=[[testcatalog, %s, %s, "
+                                        + "filter=[=(c19, 
_UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
+                                        + "project=[c1, c2, c3, c19]]], "
+                                        + "fields=[c1, c2, c3, c19])",
+                                DEFAULT_DB, tableName, partition2));
+        collected = batchTEnv.executeSql(query).collect();
+        actual = collectBatchRows(collected);
+        expected =
+                Arrays.asList(
+                        String.format("+I[false, 1, 2, %s]", partition2),
+                        String.format("+I[true, 10, 20, %s]", partition2));
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
     private List<Row> sortedRows(List<Row> rows) {
         rows.sort(Comparator.comparing(Row::toString));
         return rows;

Reply via email to