This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 43a3b89dd8 [core] Implement withFilter for PartitionsTable and
FilesTable (#7791)
43a3b89dd8 is described below
commit 43a3b89dd866281ffc75d7609b45a9781a6e5571
Author: Silas <[email protected]>
AuthorDate: Mon May 11 15:27:23 2026 +0800
[core] Implement withFilter for PartitionsTable and FilesTable (#7791)
---
.../org/apache/paimon/table/system/FilesTable.java | 29 +++++++++-
.../paimon/table/system/PartitionsTable.java | 10 +++-
.../apache/paimon/table/system/FilesTableTest.java | 37 ++++++++++++
.../paimon/table/system/PartitionsTableTest.java | 66 ++++++++++++++++++++++
4 files changed, 140 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 56d3e0836b..3840b4dc54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -73,11 +74,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -280,12 +283,24 @@ public class FilesTable implements ReadonlyTable {
private static class FilesRead implements InnerTableRead {
+ private static final Set<String> SCAN_PUSHDOWN_FIELDS =
scanPushdownFields();
+
+ private static Set<String> scanPushdownFields() {
+ Set<String> fields = new HashSet<>();
+ fields.add("partition");
+ fields.add("bucket");
+ fields.add("level");
+ return Collections.unmodifiableSet(fields);
+ }
+
private final SchemaManager schemaManager;
private final FileStoreTable storeTable;
private RowType readType;
+ @Nullable private Predicate predicate;
+
private FilesRead(SchemaManager schemaManager, FileStoreTable
fileStoreTable) {
this.schemaManager = schemaManager;
this.storeTable = fileStoreTable;
@@ -293,7 +308,14 @@ public class FilesTable implements ReadonlyTable {
@Override
public InnerTableRead withFilter(Predicate predicate) {
- // TODO
+ if (predicate == null) {
+ this.predicate = null;
+ return this;
+ }
+ List<Predicate> remaining =
+ PredicateBuilder.excludePredicateWithFields(
+ PredicateBuilder.splitAnd(predicate),
SCAN_PUSHDOWN_FIELDS);
+ this.predicate = remaining.isEmpty() ? null :
PredicateBuilder.and(remaining);
return this;
}
@@ -366,6 +388,11 @@ public class FilesTable implements ReadonlyTable {
simpleStatsEvolutions)));
}
Iterator<InternalRow> rows =
Iterators.concat(iteratorList.iterator());
+
+ if (predicate != null) {
+ rows = Iterators.filter(rows, predicate::test);
+ }
+
if (readType != null) {
rows =
Iterators.transform(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index a165eb5185..1291308e5a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -61,6 +61,8 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -182,13 +184,15 @@ public class PartitionsTable implements ReadonlyTable {
private RowType readType;
+ @Nullable private Predicate predicate;
+
public PartitionsRead(FileStoreTable table) {
this.fileStoreTable = table;
}
@Override
public InnerTableRead withFilter(Predicate predicate) {
- // TODO
+ this.predicate = predicate;
return this;
}
@@ -236,6 +240,10 @@ public class PartitionsTable implements ReadonlyTable {
.sorted(Comparator.comparing(row ->
row.getString(0)))
.iterator();
+ if (predicate != null) {
+ iterator = Iterators.filter(iterator, predicate::test);
+ }
+
if (readType != null) {
iterator =
Iterators.transform(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 7baf4ae617..4c4e3c8859 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -209,6 +209,43 @@ public class FilesTableTest extends TableTestBase {
assertThat(readPartBucketLevel(builder.equal(0, "[2]"))).isEmpty();
}
+ @Test
+ public void testReadWithSchemaIdFilter() throws Exception {
+ PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
+
+ assertThat(readPartBucketLevel(builder.equal(4, 0L)))
+ .containsExactlyInAnyOrder(
+ "{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2,
20}-0-0");
+ assertThat(readPartBucketLevel(builder.equal(4, 999L))).isEmpty();
+ }
+
+ @Test
+ public void testReadWithRecordCountFilter() throws Exception {
+ PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
+
+ assertThat(readPartBucketLevel(builder.greaterThan(6, 0L)))
+ .containsExactlyInAnyOrder(
+ "{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2,
20}-0-0");
+ assertThat(readPartBucketLevel(builder.greaterThan(6,
100L))).isEmpty();
+ }
+
+ @Test
+ public void testReadWithCombinedPushdownAndPostFilter() throws Exception {
+ PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
+
+ Predicate combined =
+ PredicateBuilder.and(
+ builder.equal(0, BinaryString.fromString("{1, 10}")),
builder.equal(4, 0L));
+ assertThat(readPartBucketLevel(combined))
+ .containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0");
+
+ Predicate combinedMiss =
+ PredicateBuilder.and(
+ builder.equal(0, BinaryString.fromString("{1, 10}")),
+ builder.equal(4, 999L));
+ assertThat(readPartBucketLevel(combinedMiss)).isEmpty();
+ }
+
@Test
public void testReadFilesFromSpecifiedSnapshot() throws Exception {
List<InternalRow> expectedRow = getExpectedResult(1L);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
index d70fe0d666..f79c7811b5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
@@ -26,6 +26,9 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
@@ -33,12 +36,15 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -180,4 +186,64 @@ public class PartitionsTableTest extends TableTestBase {
List<InternalRow> result = read(testPartitionsTable, new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
+
+ @Test
+ public void testReadWithPartitionEqualFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+ assertThat(readPartitionAndRecordCount(builder.equal(0,
BinaryString.fromString("pt=2"))))
+ .containsExactlyInAnyOrder("pt=2-1");
+
+ assertThat(readPartitionAndRecordCount(builder.equal(0,
BinaryString.fromString("pt=99"))))
+ .isEmpty();
+ }
+
+ @Test
+ public void testReadWithPartitionInFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+ assertThat(
+ readPartitionAndRecordCount(
+ builder.in(
+ 0,
+ Arrays.asList(
+ (Object)
BinaryString.fromString("pt=1"),
+
BinaryString.fromString("pt=3")))))
+ .containsExactlyInAnyOrder("pt=1-2", "pt=3-1");
+ }
+
+ @Test
+ public void testReadWithRecordCountFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+ assertThat(readPartitionAndRecordCount(builder.greaterThan(1, 1L)))
+ .containsExactlyInAnyOrder("pt=1-2");
+ }
+
+ @Test
+ public void testReadWithFileCountFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(PartitionsTable.TABLE_TYPE);
+
+ assertThat(readPartitionAndRecordCount(builder.equal(3, 1L)))
+ .containsExactlyInAnyOrder("pt=2-1", "pt=3-1");
+ assertThat(readPartitionAndRecordCount(builder.greaterOrEqual(3, 2L)))
+ .containsExactlyInAnyOrder("pt=1-2");
+ }
+
+ @Test
+ public void testReadWithNullFilterReturnsAll() throws Exception {
+ assertThat(readPartitionAndRecordCount(null))
+ .containsExactlyInAnyOrder("pt=1-2", "pt=2-1", "pt=3-1");
+ }
+
+ private List<String> readPartitionAndRecordCount(Predicate predicate)
throws IOException {
+ ReadBuilder readBuilder =
partitionsTable.newReadBuilder().withFilter(predicate);
+ List<String> rows = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
+ reader.forEachRemaining(
+ row -> rows.add(row.getString(0).toString() + "-" +
row.getLong(1)));
+ }
+ return rows;
+ }
}