This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 6543da48dde5f7c4d0c0718eb10d23503a7c3995 Author: Hongshun Wang <[email protected]> AuthorDate: Mon Feb 9 14:12:50 2026 +0800 [flink] Fix skipping partition pushdown for primary key lakehouse table in batch mode. (#2585) --- .../fluss/flink/source/FlinkTableSource.java | 22 +++--- .../source/testutils/FlinkRowAssertionsUtils.java | 9 +++ .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 79 +++++++++++++++++++++- 3 files changed, 98 insertions(+), 12 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 31377048c..0946db99d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -490,17 +490,21 @@ public class FlinkTableSource lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue); visitedPkFields.add(fieldEqual.fieldIndex); } - // if not all primary key fields are in condition, we skip to pushdown - if (!visitedPkFields.equals(primaryKeyTypes.keySet())) { - return Result.of(Collections.emptyList(), filters); + // if not all primary key fields are in condition, meaning can not push down single row + // filter, determine whether to push down partition filter later + + if (visitedPkFields.equals(primaryKeyTypes.keySet())) { + singleRowFilter = lookupRow; + // FLINK-38635 We cannot determine whether this source will ultimately be used as a + // scan + // source or a lookup source. Since fluss lookup sources cannot accept filters yet, + // to + // be safe, we return all filters to the Flink planner. + return Result.of(acceptedFilters, filters); } - singleRowFilter = lookupRow; + } - // FLINK-38635 We cannot determine whether this source will ultimately be used as a scan - // source or a lookup source. Since fluss lookup sources cannot accept filters yet, to - // be safe, we return all filters to the Flink planner. - return Result.of(acceptedFilters, filters); - } else if (isPartitioned()) { + if (isPartitioned()) { // apply partition filter pushdown List<Predicate> converted = new ArrayList<>(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java index 00807497d..a489cc631 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java @@ -110,6 +110,15 @@ public class FlinkRowAssertionsUtils { Duration.ofMinutes(1)); } + public static List<String> collectBatchRows(CloseableIterator<Row> iterator) throws Exception { + List<String> actual = new ArrayList<>(); + while (iterator.hasNext()) { + actual.add(iterator.next().toString()); + } + iterator.close(); + return actual; + } + protected static List<String> collectRowsWithTimeout( CloseableIterator<Row> iterator, int expectedCount, diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 68dd9ba14..9837cbcd3 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -60,6 +60,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -67,6 +68,7 @@ import java.util.stream.Collectors; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectBatchRows; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; @@ -95,7 +97,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase { long tableId = preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset); - // wait unit records have been synced + // wait until records have been synced waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned); // check the status of replica after synced @@ -523,7 +525,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase { bucketLogEndOffset.put(new TableBucket(tableId, 1), 1L); bucketLogEndOffset.put(new TableBucket(tableId, 2), 1L); - // wait unit records have been synced + // wait until records have been synced waitUntilBucketsSynced(bucketLogEndOffset.keySet()); // check the status of replica after synced @@ -558,7 +560,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase { long tableId = preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset); - // wait unit records have been synced + // wait until records have been synced waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned); // check the status of replica after synced @@ -1117,6 +1119,77 @@ class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase { jobClient.cancel().get(); } + @Test + void testPartitionFilterOnPartitionedTableInBatch() throws Exception { + // first of all, start tiering + JobClient jobClient = buildTieringJob(execEnv); + + String tableName = "stream_pk_table_full"; + TablePath t1 = TablePath.of(DEFAULT_DB, tableName); + Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>(); + // create table & write initial data + long tableId = preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, true, bucketLogEndOffset); + + // wait unit records have been synced + waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, true); + + // check the status of replica after synced + assertReplicaStatus(bucketLogEndOffset); + // Stop tiering to ensure we read from Paimon snapshot + jobClient.cancel().get(); + + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + Map<Long, String> partitionNameById = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath); + Iterator<String> partitionIterator = + partitionNameById.values().stream().sorted().iterator(); + String partition1 = partitionIterator.next(); + String partition2 = partitionIterator.next(); + String query = + String.format( + "SELECT c1, c2, c3, c19 FROM %s WHERE c19 between '%s' and '%s'", + tableName, partition1, partition2); + + assertThat(batchTEnv.explainSql(query)) + .contains( + String.format( + "TableSourceScan(table=[[testcatalog, %s, %s, " + + "filter=[and(>=(c19, _UTF-16LE'%s'), <=(c19, _UTF-16LE'%s'))], " + + "project=[c1, c2, c3, c19]]], " + + "fields=[c1, c2, c3, c19])", + DEFAULT_DB, tableName, partition1, partition2)); + + CloseableIterator<Row> collected = batchTEnv.executeSql(query).collect(); + List<String> expected = + Arrays.asList( + String.format("+I[false, 1, 2, %s]", partition1), + String.format("+I[true, 10, 20, %s]", partition1), + String.format("+I[false, 1, 2, %s]", partition2), + String.format("+I[true, 10, 20, %s]", partition2)); + List<String> actual = collectBatchRows(collected); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + + query = + String.format( + "SELECT c1, c2, c3, c19 FROM %s WHERE c19 = '%s'", tableName, partition2); + + assertThat(batchTEnv.explainSql(query)) + .contains( + String.format( + "TableSourceScan(table=[[testcatalog, %s, %s, " + + "filter=[=(c19, _UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], " + + "project=[c1, c2, c3, c19]]], " + + "fields=[c1, c2, c3, c19])", + DEFAULT_DB, tableName, partition2)); + collected = batchTEnv.executeSql(query).collect(); + actual = collectBatchRows(collected); + expected = + Arrays.asList( + String.format("+I[false, 1, 2, %s]", partition2), + String.format("+I[true, 10, 20, %s]", partition2)); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + private List<Row> sortedRows(List<Row> rows) { rows.sort(Comparator.comparing(Row::toString)); return rows;
