This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c43428065 [core] Implement withFilter for TagsTable and 
FileKeyRangesTable (#7874)
3c43428065 is described below

commit 3c4342806542ead034712b27bee22011a6b14539
Author: Silas <[email protected]>
AuthorDate: Mon May 18 22:42:00 2026 +0800

    [core] Implement withFilter for TagsTable and FileKeyRangesTable (#7874)
    
    Implement `withFilter` for `TagsRead` / `FileKeyRangesRead` (was `TODO`,
    predicates were silently ignored).
    
    `FileKeyRangesRead` excludes `partition` / `bucket` / `level` from the
    post-filter — already pushed down by scan, and re-evaluating them on the
    cast-to-string row breaks range predicates. Same pattern as `FilesTable`
    (#7791).
---
 .../paimon/table/system/FileKeyRangesTable.java    | 29 ++++++-
 .../org/apache/paimon/table/system/TagsTable.java  |  9 ++-
 .../table/system/FileKeyRangesTableTest.java       | 52 +++++++++++++
 .../apache/paimon/table/system/TagsTableTest.java  | 88 +++++++++++++++++++++-
 4 files changed, 175 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
index 095dcda74a..39e88f6e9a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.LeafPredicateExtractor;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -61,9 +62,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -184,12 +187,24 @@ public class FileKeyRangesTable implements ReadonlyTable {
 
     private static class FileKeyRangesRead implements InnerTableRead {
 
+        private static final Set<String> SCAN_PUSHDOWN_FIELDS = 
scanPushdownFields();
+
+        private static Set<String> scanPushdownFields() {
+            Set<String> fields = new HashSet<>();
+            fields.add("partition");
+            fields.add("bucket");
+            fields.add("level");
+            return Collections.unmodifiableSet(fields);
+        }
+
         private final SchemaManager schemaManager;
 
         private final FileStoreTable storeTable;
 
         private RowType readType;
 
+        @Nullable private Predicate postFilter;
+
         private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable 
fileStoreTable) {
             this.schemaManager = schemaManager;
             this.storeTable = fileStoreTable;
@@ -197,7 +212,14 @@ public class FileKeyRangesTable implements ReadonlyTable {
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
-            // TODO
+            if (predicate == null) {
+                this.postFilter = null;
+                return this;
+            }
+            List<Predicate> remaining =
+                    PredicateBuilder.excludePredicateWithFields(
+                            PredicateBuilder.splitAnd(predicate), 
SCAN_PUSHDOWN_FIELDS);
+            this.postFilter = remaining.isEmpty() ? null : 
PredicateBuilder.and(remaining);
             return this;
         }
 
@@ -264,6 +286,11 @@ public class FileKeyRangesTable implements ReadonlyTable {
                                                 file)));
             }
             Iterator<InternalRow> rows = 
Iterators.concat(iteratorList.iterator());
+
+            if (postFilter != null) {
+                rows = Iterators.filter(rows, postFilter::test);
+            }
+
             if (readType != null) {
                 rows =
                         Iterators.transform(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 0dd22e79be..f5a1789988 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -201,13 +201,15 @@ public class TagsTable implements ReadonlyTable {
         private final FileIO fileIO;
         private RowType readType;
 
+        @Nullable private Predicate postFilter;
+
         public TagsRead(FileIO fileIO) {
             this.fileIO = fileIO;
         }
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
-            // TODO
+            this.postFilter = predicate;
             return this;
         }
 
@@ -272,6 +274,11 @@ public class TagsTable implements ReadonlyTable {
 
             Iterator<InternalRow> rows =
                     Iterators.transform(nameToSnapshot.entrySet().iterator(), 
this::toRow);
+
+            if (postFilter != null) {
+                rows = Iterators.filter(rows, postFilter::test);
+            }
+
             if (readType != null) {
                 rows =
                         Iterators.transform(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
index ce675e2aae..de00e33b30 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
@@ -183,6 +183,58 @@ public class FileKeyRangesTableTest extends TableTestBase {
         assertThat(hasPt2).isTrue();
     }
 
+    @Test
+    public void testReadWithRecordCountPostFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+        assertThat(readPartBucketLevel(builder.greaterOrEqual(6, 
1L))).isNotEmpty();
+        assertThat(readPartBucketLevel(builder.greaterThan(6, 
100L))).isEmpty();
+    }
+
+    @Test
+    public void testReadWithSchemaIdPostFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+        List<String> baseline = readPartBucketLevel(null);
+        assertThat(baseline).isNotEmpty();
+        assertThat(readPartBucketLevel(builder.equal(4, 
0L))).isEqualTo(baseline);
+        assertThat(readPartBucketLevel(builder.equal(4, 9999L))).isEmpty();
+    }
+
+    @Test
+    public void testReadWithCombinedScanAndPostFilter() throws Exception {
+        // partition is scan-pushdown, record_count is post-filter only.
+        PredicateBuilder builder = new 
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+        Predicate combined =
+                PredicateBuilder.and(
+                        builder.equal(0, BinaryString.fromString("{1}")),
+                        builder.greaterOrEqual(6, 1L));
+        List<String> rows = readPartBucketLevel(combined);
+        assertThat(rows).isNotEmpty();
+        for (String row : rows) {
+            assertThat(row).startsWith("{1}-");
+        }
+        Predicate combinedEmpty =
+                PredicateBuilder.and(
+                        builder.equal(0, BinaryString.fromString("{1}")),
+                        builder.greaterThan(6, 100L));
+        assertThat(readPartBucketLevel(combinedEmpty)).isEmpty();
+    }
+
+    @Test
+    public void testReadWithPartitionRangeScanPushdown() throws Exception {
+        write(table, GenericRow.of(3, 11, 50));
+
+        PredicateBuilder builder = new 
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+        List<String> rows =
+                readPartBucketLevel(builder.lessThan(0, 
BinaryString.fromString("{11}")));
+
+        assertThat(rows).isNotEmpty();
+        for (String row : rows) {
+            assertThat(row).doesNotStartWith("{11}-");
+        }
+        assertThat(rows.stream().anyMatch(r -> r.startsWith("{1}-"))).isTrue();
+        assertThat(rows.stream().anyMatch(r -> r.startsWith("{2}-"))).isTrue();
+    }
+
     private List<String> readPartBucketLevel(Predicate predicate) throws 
IOException {
         ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
         if (predicate != null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 7ee8cd53d7..31bc9401ea 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -24,10 +24,14 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.DateTimeUtils;
@@ -37,8 +41,10 @@ import org.apache.paimon.utils.TagManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -49,6 +55,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 class TagsTableTest extends TableTestBase {
 
     private static final String tableName = "MyTable";
+    private FileStoreTable table;
     private TagsTable tagsTable;
     private TagManager tagManager;
 
@@ -66,7 +73,7 @@ class TagsTableTest extends TableTestBase {
                         .option("tag.num-retained-max", "3")
                         .build();
         catalog.createTable(identifier, schema, true);
-        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        table = (FileStoreTable) catalog.getTable(identifier);
         TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
         commit.commit(
                 new ManifestCommittable(
@@ -90,6 +97,85 @@ class TagsTableTest extends TableTestBase {
         assertThat(result).containsExactlyElementsOf(expectRow);
     }
 
+    @Test
+    void testReadWithTagNameEqualFilter() throws Exception {
+        table.createTag("tag-a");
+        table.createTag("tag-b");
+        table.createTag("tag-c");
+
+        PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+        assertThat(readTagNames(builder.equal(0, 
BinaryString.fromString("tag-b"))))
+                .containsExactly("tag-b");
+
+        assertThat(readTagNames(builder.equal(0, 
BinaryString.fromString("missing")))).isEmpty();
+    }
+
+    @Test
+    void testReadWithTagNameInFilter() throws Exception {
+        table.createTag("tag-a");
+        table.createTag("tag-b");
+        table.createTag("tag-c");
+
+        PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+        assertThat(
+                        readTagNames(
+                                builder.in(
+                                        0,
+                                        Arrays.asList(
+                                                (Object) 
BinaryString.fromString("tag-a"),
+                                                
BinaryString.fromString("tag-c")))))
+                .containsExactlyInAnyOrder("tag-a", "tag-c");
+    }
+
+    @Test
+    void testReadWithTagNameNotEqualFilter() throws Exception {
+        table.createTag("tag-a");
+        table.createTag("tag-b");
+        table.createTag("tag-c");
+
+        PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+        List<String> rows = readTagNames(builder.notEqual(0, 
BinaryString.fromString("tag-b")));
+        assertThat(rows).contains("tag-a", "tag-c");
+        assertThat(rows).doesNotContain("tag-b");
+    }
+
+    @Test
+    void testReadWithNonTagNameFieldFilter() throws Exception {
+        table.createTag("tag-a");
+        table.createTag("tag-b");
+
+        PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+        long maxSnapshotId =
+                tagManager.tagObjects().stream().mapToLong(p -> 
p.getKey().id()).max().orElse(0L);
+        assertThat(readTagNames(builder.greaterOrEqual(1, 
maxSnapshotId))).isNotEmpty();
+        assertThat(readTagNames(builder.greaterThan(1, 
maxSnapshotId))).isEmpty();
+    }
+
+    @Test
+    void testReadWithNullFilterReturnsAll() throws Exception {
+        table.createTag("tag-a");
+        table.createTag("tag-b");
+
+        List<String> all =
+                tagManager.tagObjects().stream()
+                        .map(Pair::getValue)
+                        .collect(java.util.stream.Collectors.toList());
+        
assertThat(readTagNames(null)).containsExactlyInAnyOrderElementsOf(all);
+    }
+
+    private List<String> readTagNames(Predicate predicate) throws IOException {
+        ReadBuilder readBuilder = tagsTable.newReadBuilder();
+        if (predicate != null) {
+            readBuilder = readBuilder.withFilter(predicate);
+        }
+        List<String> names = new ArrayList<>();
+        try (RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
+            reader.forEachRemaining(row -> 
names.add(row.getString(0).toString()));
+        }
+        return names;
+    }
+
     private List<InternalRow> getExpectedResult() {
         Map<String, InternalRow> tagToRows = new TreeMap<>();
         for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {

Reply via email to