This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 43a3b89dd8 [core] Implement withFilter for PartitionsTable and 
FilesTable (#7791)
43a3b89dd8 is described below

commit 43a3b89dd866281ffc75d7609b45a9781a6e5571
Author: Silas <[email protected]>
AuthorDate: Mon May 11 15:27:23 2026 +0800

    [core] Implement withFilter for PartitionsTable and FilesTable (#7791)
---
 .../org/apache/paimon/table/system/FilesTable.java | 29 +++++++++-
 .../paimon/table/system/PartitionsTable.java       | 10 +++-
 .../apache/paimon/table/system/FilesTableTest.java | 37 ++++++++++++
 .../paimon/table/system/PartitionsTableTest.java   | 66 ++++++++++++++++++++++
 4 files changed, 140 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 56d3e0836b..3840b4dc54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -73,11 +74,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -280,12 +283,24 @@ public class FilesTable implements ReadonlyTable {
 
     private static class FilesRead implements InnerTableRead {
 
+        private static final Set<String> SCAN_PUSHDOWN_FIELDS = 
scanPushdownFields();
+
+        private static Set<String> scanPushdownFields() {
+            Set<String> fields = new HashSet<>();
+            fields.add("partition");
+            fields.add("bucket");
+            fields.add("level");
+            return Collections.unmodifiableSet(fields);
+        }
+
         private final SchemaManager schemaManager;
 
         private final FileStoreTable storeTable;
 
         private RowType readType;
 
+        @Nullable private Predicate predicate;
+
         private FilesRead(SchemaManager schemaManager, FileStoreTable 
fileStoreTable) {
             this.schemaManager = schemaManager;
             this.storeTable = fileStoreTable;
@@ -293,7 +308,14 @@ public class FilesTable implements ReadonlyTable {
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
-            // TODO
+            if (predicate == null) {
+                this.predicate = null;
+                return this;
+            }
+            List<Predicate> remaining =
+                    PredicateBuilder.excludePredicateWithFields(
+                            PredicateBuilder.splitAnd(predicate), 
SCAN_PUSHDOWN_FIELDS);
+            this.predicate = remaining.isEmpty() ? null : 
PredicateBuilder.and(remaining);
             return this;
         }
 
@@ -366,6 +388,11 @@ public class FilesTable implements ReadonlyTable {
                                                 simpleStatsEvolutions)));
             }
             Iterator<InternalRow> rows = 
Iterators.concat(iteratorList.iterator());
+
+            if (predicate != null) {
+                rows = Iterators.filter(rows, predicate::test);
+            }
+
             if (readType != null) {
                 rows =
                         Iterators.transform(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index a165eb5185..1291308e5a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -61,6 +61,8 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -182,13 +184,15 @@ public class PartitionsTable implements ReadonlyTable {
 
         private RowType readType;
 
+        @Nullable private Predicate predicate;
+
         public PartitionsRead(FileStoreTable table) {
             this.fileStoreTable = table;
         }
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
-            // TODO
+            this.predicate = predicate;
             return this;
         }
 
@@ -236,6 +240,10 @@ public class PartitionsTable implements ReadonlyTable {
                             .sorted(Comparator.comparing(row -> 
row.getString(0)))
                             .iterator();
 
+            if (predicate != null) {
+                iterator = Iterators.filter(iterator, predicate::test);
+            }
+
             if (readType != null) {
                 iterator =
                         Iterators.transform(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 7baf4ae617..4c4e3c8859 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -209,6 +209,43 @@ public class FilesTableTest extends TableTestBase {
         assertThat(readPartBucketLevel(builder.equal(0, "[2]"))).isEmpty();
     }
 
+    @Test
+    public void testReadWithSchemaIdFilter() throws Exception {
+        PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
+
+        assertThat(readPartBucketLevel(builder.equal(4, 0L)))
+                .containsExactlyInAnyOrder(
+                        "{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2, 
20}-0-0");
+        assertThat(readPartBucketLevel(builder.equal(4, 999L))).isEmpty();
+    }
+
+    @Test
+    public void testReadWithRecordCountFilter() throws Exception {
+        PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
+
+        assertThat(readPartBucketLevel(builder.greaterThan(6, 0L)))
+                .containsExactlyInAnyOrder(
+                        "{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2, 
20}-0-0");
+        assertThat(readPartBucketLevel(builder.greaterThan(6, 
100L))).isEmpty();
+    }
+
+    @Test
+    public void testReadWithCombinedPushdownAndPostFilter() throws Exception {
+        PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
+
+        Predicate combined =
+                PredicateBuilder.and(
+                        builder.equal(0, BinaryString.fromString("{1, 10}")), 
builder.equal(4, 0L));
+        assertThat(readPartBucketLevel(combined))
+                .containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0");
+
+        Predicate combinedMiss =
+                PredicateBuilder.and(
+                        builder.equal(0, BinaryString.fromString("{1, 10}")),
+                        builder.equal(4, 999L));
+        assertThat(readPartBucketLevel(combinedMiss)).isEmpty();
+    }
+
     @Test
     public void testReadFilesFromSpecifiedSnapshot() throws Exception {
         List<InternalRow> expectedRow = getExpectedResult(1L);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
index d70fe0d666..f79c7811b5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
@@ -26,6 +26,9 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
@@ -33,12 +36,15 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -180,4 +186,64 @@ public class PartitionsTableTest extends TableTestBase {
         List<InternalRow> result = read(testPartitionsTable, new int[] {0, 1});
         assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
     }
+
+    @Test
+    public void testReadWithPartitionEqualFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+        assertThat(readPartitionAndRecordCount(builder.equal(0, 
BinaryString.fromString("pt=2"))))
+                .containsExactlyInAnyOrder("pt=2-1");
+
+        assertThat(readPartitionAndRecordCount(builder.equal(0, 
BinaryString.fromString("pt=99"))))
+                .isEmpty();
+    }
+
+    @Test
+    public void testReadWithPartitionInFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+        assertThat(
+                        readPartitionAndRecordCount(
+                                builder.in(
+                                        0,
+                                        Arrays.asList(
+                                                (Object) 
BinaryString.fromString("pt=1"),
+                                                
BinaryString.fromString("pt=3")))))
+                .containsExactlyInAnyOrder("pt=1-2", "pt=3-1");
+    }
+
+    @Test
+    public void testReadWithRecordCountFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+        assertThat(readPartitionAndRecordCount(builder.greaterThan(1, 1L)))
+                .containsExactlyInAnyOrder("pt=1-2");
+    }
+
+    @Test
+    public void testReadWithFileCountFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+        assertThat(readPartitionAndRecordCount(builder.equal(3, 1L)))
+                .containsExactlyInAnyOrder("pt=2-1", "pt=3-1");
+        assertThat(readPartitionAndRecordCount(builder.greaterOrEqual(3, 2L)))
+                .containsExactlyInAnyOrder("pt=1-2");
+    }
+
+    @Test
+    public void testReadWithNullFilterReturnsAll() throws Exception {
+        assertThat(readPartitionAndRecordCount(null))
+                .containsExactlyInAnyOrder("pt=1-2", "pt=2-1", "pt=3-1");
+    }
+
+    private List<String> readPartitionAndRecordCount(Predicate predicate) 
throws IOException {
+        ReadBuilder readBuilder = 
partitionsTable.newReadBuilder().withFilter(predicate);
+        List<String> rows = new ArrayList<>();
+        try (RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
+            reader.forEachRemaining(
+                    row -> rows.add(row.getString(0).toString() + "-" + 
row.getLong(1)));
+        }
+        return rows;
+    }
 }

Reply via email to