This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3c43428065 [core] Implement withFilter for TagsTable and
FileKeyRangesTable (#7874)
3c43428065 is described below
commit 3c4342806542ead034712b27bee22011a6b14539
Author: Silas <[email protected]>
AuthorDate: Mon May 18 22:42:00 2026 +0800
[core] Implement withFilter for TagsTable and FileKeyRangesTable (#7874)
Implement `withFilter` for `TagsRead` / `FileKeyRangesRead` (was `TODO`,
predicates were silently ignored).
`FileKeyRangesRead` excludes `partition` / `bucket` / `level` from the
post-filter — already pushed down by scan, and re-evaluating them on the
cast-to-string row breaks range predicates. Same pattern as `FilesTable`
(#7791).
---
.../paimon/table/system/FileKeyRangesTable.java | 29 ++++++-
.../org/apache/paimon/table/system/TagsTable.java | 9 ++-
.../table/system/FileKeyRangesTableTest.java | 52 +++++++++++++
.../apache/paimon/table/system/TagsTableTest.java | 88 +++++++++++++++++++++-
4 files changed, 175 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
index 095dcda74a..39e88f6e9a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -61,9 +62,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -184,12 +187,24 @@ public class FileKeyRangesTable implements ReadonlyTable {
private static class FileKeyRangesRead implements InnerTableRead {
+ private static final Set<String> SCAN_PUSHDOWN_FIELDS =
scanPushdownFields();
+
+ private static Set<String> scanPushdownFields() {
+ Set<String> fields = new HashSet<>();
+ fields.add("partition");
+ fields.add("bucket");
+ fields.add("level");
+ return Collections.unmodifiableSet(fields);
+ }
+
private final SchemaManager schemaManager;
private final FileStoreTable storeTable;
private RowType readType;
+ @Nullable private Predicate postFilter;
+
private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable
fileStoreTable) {
this.schemaManager = schemaManager;
this.storeTable = fileStoreTable;
@@ -197,7 +212,14 @@ public class FileKeyRangesTable implements ReadonlyTable {
@Override
public InnerTableRead withFilter(Predicate predicate) {
- // TODO
+ if (predicate == null) {
+ this.postFilter = null;
+ return this;
+ }
+ List<Predicate> remaining =
+ PredicateBuilder.excludePredicateWithFields(
+ PredicateBuilder.splitAnd(predicate),
SCAN_PUSHDOWN_FIELDS);
+ this.postFilter = remaining.isEmpty() ? null :
PredicateBuilder.and(remaining);
return this;
}
@@ -264,6 +286,11 @@ public class FileKeyRangesTable implements ReadonlyTable {
file)));
}
Iterator<InternalRow> rows =
Iterators.concat(iteratorList.iterator());
+
+ if (postFilter != null) {
+ rows = Iterators.filter(rows, postFilter::test);
+ }
+
if (readType != null) {
rows =
Iterators.transform(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 0dd22e79be..f5a1789988 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -201,13 +201,15 @@ public class TagsTable implements ReadonlyTable {
private final FileIO fileIO;
private RowType readType;
+ @Nullable private Predicate postFilter;
+
public TagsRead(FileIO fileIO) {
this.fileIO = fileIO;
}
@Override
public InnerTableRead withFilter(Predicate predicate) {
- // TODO
+ this.postFilter = predicate;
return this;
}
@@ -272,6 +274,11 @@ public class TagsTable implements ReadonlyTable {
Iterator<InternalRow> rows =
Iterators.transform(nameToSnapshot.entrySet().iterator(),
this::toRow);
+
+ if (postFilter != null) {
+ rows = Iterators.filter(rows, postFilter::test);
+ }
+
if (readType != null) {
rows =
Iterators.transform(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
index ce675e2aae..de00e33b30 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
@@ -183,6 +183,58 @@ public class FileKeyRangesTableTest extends TableTestBase {
assertThat(hasPt2).isTrue();
}
+ @Test
+ public void testReadWithRecordCountPostFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+ assertThat(readPartBucketLevel(builder.greaterOrEqual(6,
1L))).isNotEmpty();
+ assertThat(readPartBucketLevel(builder.greaterThan(6,
100L))).isEmpty();
+ }
+
+ @Test
+ public void testReadWithSchemaIdPostFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+ List<String> baseline = readPartBucketLevel(null);
+ assertThat(baseline).isNotEmpty();
+ assertThat(readPartBucketLevel(builder.equal(4,
0L))).isEqualTo(baseline);
+ assertThat(readPartBucketLevel(builder.equal(4, 9999L))).isEmpty();
+ }
+
+ @Test
+ public void testReadWithCombinedScanAndPostFilter() throws Exception {
+ // partition is scan-pushdown, record_count is post-filter only.
+ PredicateBuilder builder = new
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+ Predicate combined =
+ PredicateBuilder.and(
+ builder.equal(0, BinaryString.fromString("{1}")),
+ builder.greaterOrEqual(6, 1L));
+ List<String> rows = readPartBucketLevel(combined);
+ assertThat(rows).isNotEmpty();
+ for (String row : rows) {
+ assertThat(row).startsWith("{1}-");
+ }
+ Predicate combinedEmpty =
+ PredicateBuilder.and(
+ builder.equal(0, BinaryString.fromString("{1}")),
+ builder.greaterThan(6, 100L));
+ assertThat(readPartBucketLevel(combinedEmpty)).isEmpty();
+ }
+
+ @Test
+ public void testReadWithPartitionRangeScanPushdown() throws Exception {
+ write(table, GenericRow.of(3, 11, 50));
+
+ PredicateBuilder builder = new
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+ List<String> rows =
+ readPartBucketLevel(builder.lessThan(0,
BinaryString.fromString("{11}")));
+
+ assertThat(rows).isNotEmpty();
+ for (String row : rows) {
+ assertThat(row).doesNotStartWith("{11}-");
+ }
+ assertThat(rows.stream().anyMatch(r -> r.startsWith("{1}-"))).isTrue();
+ assertThat(rows.stream().anyMatch(r -> r.startsWith("{2}-"))).isTrue();
+ }
+
private List<String> readPartBucketLevel(Predicate predicate) throws
IOException {
ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
if (predicate != null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 7ee8cd53d7..31bc9401ea 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -24,10 +24,14 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
@@ -37,8 +41,10 @@ import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -49,6 +55,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class TagsTableTest extends TableTestBase {
private static final String tableName = "MyTable";
+ private FileStoreTable table;
private TagsTable tagsTable;
private TagManager tagManager;
@@ -66,7 +73,7 @@ class TagsTableTest extends TableTestBase {
.option("tag.num-retained-max", "3")
.build();
catalog.createTable(identifier, schema, true);
- FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ table = (FileStoreTable) catalog.getTable(identifier);
TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
commit.commit(
new ManifestCommittable(
@@ -90,6 +97,85 @@ class TagsTableTest extends TableTestBase {
assertThat(result).containsExactlyElementsOf(expectRow);
}
+ @Test
+ void testReadWithTagNameEqualFilter() throws Exception {
+ table.createTag("tag-a");
+ table.createTag("tag-b");
+ table.createTag("tag-c");
+
+ PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+ assertThat(readTagNames(builder.equal(0,
BinaryString.fromString("tag-b"))))
+ .containsExactly("tag-b");
+
+ assertThat(readTagNames(builder.equal(0,
BinaryString.fromString("missing")))).isEmpty();
+ }
+
+ @Test
+ void testReadWithTagNameInFilter() throws Exception {
+ table.createTag("tag-a");
+ table.createTag("tag-b");
+ table.createTag("tag-c");
+
+ PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+ assertThat(
+ readTagNames(
+ builder.in(
+ 0,
+ Arrays.asList(
+ (Object)
BinaryString.fromString("tag-a"),
+
BinaryString.fromString("tag-c")))))
+ .containsExactlyInAnyOrder("tag-a", "tag-c");
+ }
+
+ @Test
+ void testReadWithTagNameNotEqualFilter() throws Exception {
+ table.createTag("tag-a");
+ table.createTag("tag-b");
+ table.createTag("tag-c");
+
+ PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+ List<String> rows = readTagNames(builder.notEqual(0,
BinaryString.fromString("tag-b")));
+ assertThat(rows).contains("tag-a", "tag-c");
+ assertThat(rows).doesNotContain("tag-b");
+ }
+
+ @Test
+ void testReadWithNonTagNameFieldFilter() throws Exception {
+ table.createTag("tag-a");
+ table.createTag("tag-b");
+
+ PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
+ long maxSnapshotId =
+ tagManager.tagObjects().stream().mapToLong(p ->
p.getKey().id()).max().orElse(0L);
+ assertThat(readTagNames(builder.greaterOrEqual(1,
maxSnapshotId))).isNotEmpty();
+ assertThat(readTagNames(builder.greaterThan(1,
maxSnapshotId))).isEmpty();
+ }
+
+ @Test
+ void testReadWithNullFilterReturnsAll() throws Exception {
+ table.createTag("tag-a");
+ table.createTag("tag-b");
+
+ List<String> all =
+ tagManager.tagObjects().stream()
+ .map(Pair::getValue)
+ .collect(java.util.stream.Collectors.toList());
+
assertThat(readTagNames(null)).containsExactlyInAnyOrderElementsOf(all);
+ }
+
+ private List<String> readTagNames(Predicate predicate) throws IOException {
+ ReadBuilder readBuilder = tagsTable.newReadBuilder();
+ if (predicate != null) {
+ readBuilder = readBuilder.withFilter(predicate);
+ }
+ List<String> names = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
+ reader.forEachRemaining(row ->
names.add(row.getString(0).toString()));
+ }
+ return names;
+ }
+
private List<InternalRow> getExpectedResult() {
Map<String, InternalRow> tagToRows = new TreeMap<>();
for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {