This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 04a73e0f83 [core] Support partition range predicate pushdown for
FilesTable (#7376)
04a73e0f83 is described below
commit 04a73e0f836b85f5a1bcac2c3a82a8c971aae99b
Author: Kerwin Zhang <[email protected]>
AuthorDate: Tue Mar 10 08:19:44 2026 +0800
[core] Support partition range predicate pushdown for FilesTable (#7376)
---
.../org/apache/paimon/table/system/FilesTable.java | 95 ++++++++++++++++++----
.../apache/paimon/table/system/FilesTableTest.java | 55 +++++++++++++
2 files changed, 133 insertions(+), 17 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 7e62516e85..fcfaafaea5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -33,9 +33,12 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.In;
+import org.apache.paimon.predicate.LeafBinaryFunction;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -64,6 +67,7 @@ import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SerializationUtils;
+import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
@@ -193,25 +197,62 @@ public class FilesTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader();
- if (partitionPredicate != null && partitionPredicate.function()
instanceof Equal) {
- String partitionStr =
partitionPredicate.literals().get(0).toString();
- if (partitionStr.startsWith("{")) {
- partitionStr = partitionStr.substring(1);
- }
- if (partitionStr.endsWith("}")) {
- partitionStr = partitionStr.substring(0,
partitionStr.length() - 1);
- }
- String[] partFields = partitionStr.split(", ");
- LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+ if (partitionPredicate != null) {
List<String> partitionKeys = fileStoreTable.partitionKeys();
- if (partitionKeys.size() != partFields.length) {
- return Collections::emptyList;
- }
- for (int i = 0; i < partitionKeys.size(); i++) {
- partSpec.put(partitionKeys.get(i), partFields[i]);
+ RowType partitionType =
fileStoreTable.schema().logicalPartitionType();
+ if (partitionPredicate.function() instanceof Equal) {
+ LinkedHashMap<String, String> partSpec =
+ parsePartitionSpec(
+
partitionPredicate.literals().get(0).toString(), partitionKeys);
+ if (partSpec == null) {
+ return Collections::emptyList;
+ }
+ snapshotReader.withPartitionFilter(partSpec);
+ } else if (partitionPredicate.function() instanceof In) {
+ List<Predicate> orPredicates = new ArrayList<>();
+ PredicateBuilder partBuilder = new
PredicateBuilder(partitionType);
+ for (Object literal : partitionPredicate.literals()) {
+ LinkedHashMap<String, String> partSpec =
+ parsePartitionSpec(literal.toString(),
partitionKeys);
+ if (partSpec == null) {
+ continue;
+ }
+ List<Predicate> andPredicates = new ArrayList<>();
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ Object value =
+ TypeUtils.castFromString(
+ partSpec.get(partitionKeys.get(i)),
+ partitionType.getTypeAt(i));
+ andPredicates.add(partBuilder.equal(i, value));
+ }
+ orPredicates.add(PredicateBuilder.and(andPredicates));
+ }
+ if (!orPredicates.isEmpty()) {
+
snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates));
+ }
+ } else if (partitionPredicate.function() instanceof
LeafBinaryFunction) {
+ LinkedHashMap<String, String> partSpec =
+ parsePartitionSpec(
+
partitionPredicate.literals().get(0).toString(), partitionKeys);
+ if (partSpec != null) {
+ PredicateBuilder partBuilder = new
PredicateBuilder(partitionType);
+ List<Predicate> predicates = new ArrayList<>();
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ Object value =
+ TypeUtils.castFromString(
+ partSpec.get(partitionKeys.get(i)),
+ partitionType.getTypeAt(i));
+ predicates.add(
+ new LeafPredicate(
+ partitionPredicate.function(),
+ partitionType.getTypeAt(i),
+ i,
+ partitionKeys.get(i),
+ Collections.singletonList(value)));
+ }
+
snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates));
+ }
}
- snapshotReader.withPartitionFilter(partSpec);
- // TODO support range?
}
return () ->
@@ -219,6 +260,26 @@ public class FilesTable implements ReadonlyTable {
.map(p -> new FilesSplit(p, bucketPredicate,
levelPredicate))
.collect(Collectors.toList());
}
+
+ @Nullable
+ private LinkedHashMap<String, String> parsePartitionSpec(
+ String partitionStr, List<String> partitionKeys) {
+ if (partitionStr.startsWith("{")) {
+ partitionStr = partitionStr.substring(1);
+ }
+ if (partitionStr.endsWith("}")) {
+ partitionStr = partitionStr.substring(0, partitionStr.length()
- 1);
+ }
+ String[] partFields = partitionStr.split(", ");
+ if (partitionKeys.size() != partFields.length) {
+ return null;
+ }
+ LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ partSpec.put(partitionKeys.get(i), partFields[i]);
+ }
+ return partSpec;
+ }
}
private static class FilesSplit extends SingletonSplit {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index b876480064..2a8818d498 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -30,6 +30,8 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.predicate.In;
+import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
@@ -40,6 +42,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;
@@ -134,6 +137,58 @@ public class FilesTableTest extends TableTestBase {
return rows;
}
+ @Test
+ public void testReadWithPartitionRange() throws Exception {
+ compact(table, row(2, 20), 0);
+ write(table, GenericRow.of(3, 1, 10, 1));
+
+ PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
+
+ assertThat(readPartBucketLevel(builder.greaterThan(0,
BinaryString.fromString("{1, 10}"))))
+ .containsExactlyInAnyOrder("{2, 20}-0-5");
+
+ assertThat(
+ readPartBucketLevel(
+ builder.greaterOrEqual(0,
BinaryString.fromString("{2, 20}"))))
+ .containsExactlyInAnyOrder("{2, 20}-0-5");
+
+ assertThat(readPartBucketLevel(builder.lessThan(0,
BinaryString.fromString("{2, 20}"))))
+ .containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0", "{1,
10}-1-0");
+
+ assertThat(readPartBucketLevel(builder.lessOrEqual(0,
BinaryString.fromString("{1, 10}"))))
+ .containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0", "{1,
10}-1-0");
+ }
+
+ @Test
+ public void testReadWithPartitionIn() throws Exception {
+ compact(table, row(2, 20), 0);
+ write(table, GenericRow.of(3, 1, 10, 1));
+
+ assertThat(
+ readPartBucketLevel(
+ buildInPredicate(
+ BinaryString.fromString("{1, 10}"),
+ BinaryString.fromString("{2, 20}"))))
+ .containsExactlyInAnyOrder(
+ "{1, 10}-0-0", "{1, 10}-0-0", "{1, 10}-1-0", "{2,
20}-0-5");
+
+
assertThat(readPartBucketLevel(buildInPredicate(BinaryString.fromString("{2,
20}"))))
+ .containsExactlyInAnyOrder("{2, 20}-0-5");
+
+
assertThat(readPartBucketLevel(buildInPredicate(BinaryString.fromString("{3,
30}"))))
+ .isEmpty();
+ }
+
+ private Predicate buildInPredicate(BinaryString... values) {
+ DataField partitionField = FilesTable.TABLE_TYPE.getFields().get(0);
+ return new LeafPredicate(
+ In.INSTANCE,
+ partitionField.type(),
+ 0,
+ partitionField.name(),
+ Arrays.asList(values));
+ }
+
@Test
public void testReadFilesFromLatest() throws Exception {
List<InternalRow> expectedRow = getExpectedResult(2L);