This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 59d3995bd2 [core] Support schema_id predicate pushdown for
ManifestsTable (#7310)
59d3995bd2 is described below
commit 59d3995bd27ef0dbc95fb5e1ae22151b4cbb30b7
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri Feb 27 10:28:12 2026 +0800
[core] Support schema_id predicate pushdown for ManifestsTable (#7310)
---
.../apache/paimon/table/system/ManifestsTable.java | 117 ++++++++++++++++++++-
.../paimon/table/system/ManifestsTableTest.java | 68 ++++++++++++
2 files changed, 183 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index ea91154b7d..7bc8f23b42 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -30,6 +30,17 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
+import org.apache.paimon.predicate.InPredicateVisitor;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.LessOrEqual;
+import org.apache.paimon.predicate.LessThan;
+import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -55,11 +66,15 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -137,7 +152,7 @@ public class ManifestsTable implements ReadonlyTable {
@Override
public InnerTableScan withFilter(Predicate predicate) {
- // TODO
+ // filter is handled in ManifestsRead
return this;
}
@@ -169,20 +184,80 @@ public class ManifestsTable implements ReadonlyTable {
private static class ManifestsRead implements InnerTableRead {
+ private static final String LEAF_NAME = "schema_id";
+
private RowType readType;
private final FileStoreTable dataTable;
+ private @Nullable Long schemaIdMin = null;
+ private @Nullable Long schemaIdMax = null;
+ private final List<Long> schemaIds = new ArrayList<>();
+
public ManifestsRead(FileStoreTable dataTable) {
this.dataTable = dataTable;
}
@Override
public InnerTableRead withFilter(Predicate predicate) {
- // TODO
+ if (predicate == null) {
+ return this;
+ }
+
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ if ((compoundPredicate.function()) instanceof And) {
+ List<Predicate> children = compoundPredicate.children();
+ for (Predicate leaf : children) {
+ handleLeafPredicate(leaf, LEAF_NAME);
+ }
+ }
+
+ // optimize for IN filter
+ if ((compoundPredicate.function()) instanceof Or) {
+ InPredicateVisitor.extractInElements(predicate, LEAF_NAME)
+ .ifPresent(
+ leafs ->
+ leafs.forEach(
+ leaf ->
+ schemaIds.add(
+
Long.parseLong(
+
leaf.toString()))));
+ }
+ } else {
+ handleLeafPredicate(predicate, LEAF_NAME);
+ }
+
return this;
}
+ private void handleLeafPredicate(Predicate predicate, String leafName)
{
+ LeafPredicate schemaPred =
+
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
+ if (schemaPred != null) {
+ if (schemaPred.function() instanceof Equal) {
+ schemaIdMin = (Long) schemaPred.literals().get(0);
+ schemaIdMax = (Long) schemaPred.literals().get(0);
+ }
+
+ if (schemaPred.function() instanceof GreaterThan) {
+ schemaIdMin = (Long) schemaPred.literals().get(0) + 1;
+ }
+
+ if (schemaPred.function() instanceof GreaterOrEqual) {
+ schemaIdMin = (Long) schemaPred.literals().get(0);
+ }
+
+ if (schemaPred.function() instanceof LessThan) {
+ schemaIdMax = (Long) schemaPred.literals().get(0) - 1;
+ }
+
+ if (schemaPred.function() instanceof LessOrEqual) {
+ schemaIdMax = (Long) schemaPred.literals().get(0);
+ }
+ }
+ }
+
@Override
public InnerTableRead withReadType(RowType readType) {
this.readType = readType;
@@ -201,6 +276,17 @@ public class ManifestsTable implements ReadonlyTable {
}
List<ManifestFileMeta> manifestFileMetas = allManifests(dataTable);
+ // Apply schema_id filter
+ if (!schemaIds.isEmpty()) {
+ manifestFileMetas = filterBySchemaIds(manifestFileMetas,
schemaIds);
+ } else if (schemaIdMin != null || schemaIdMax != null) {
+ manifestFileMetas =
+ filterBySchemaIdRange(
+ manifestFileMetas,
+ Optional.ofNullable(schemaIdMin),
+ Optional.ofNullable(schemaIdMax));
+ }
+
@SuppressWarnings("unchecked")
CastExecutor<InternalRow, BinaryString> partitionCastExecutor =
(CastExecutor<InternalRow, BinaryString>)
@@ -222,6 +308,33 @@ public class ManifestsTable implements ReadonlyTable {
return new IteratorRecordReader<>(rows);
}
+ private static List<ManifestFileMeta> filterBySchemaIds(
+ List<ManifestFileMeta> metas, List<Long> schemaIds) {
+ List<ManifestFileMeta> result = new ArrayList<>();
+ for (ManifestFileMeta meta : metas) {
+ if (schemaIds.contains(meta.schemaId())) {
+ result.add(meta);
+ }
+ }
+ return result;
+ }
+
+ private static List<ManifestFileMeta> filterBySchemaIdRange(
+ List<ManifestFileMeta> metas, Optional<Long> min,
Optional<Long> max) {
+ List<ManifestFileMeta> result = new ArrayList<>();
+ for (ManifestFileMeta meta : metas) {
+ long schemaId = meta.schemaId();
+ if (min.isPresent() && schemaId < min.get()) {
+ continue;
+ }
+ if (max.isPresent() && schemaId > max.get()) {
+ continue;
+ }
+ result.add(meta);
+ }
+ return result;
+ }
+
private InternalRow toRow(
ManifestFileMeta manifestFileMeta,
CastExecutor<InternalRow, BinaryString> partitionCastExecutor)
{
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index 3a4507c5b6..c8bf00e813 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -33,10 +34,14 @@ import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;
@@ -44,6 +49,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -164,6 +170,53 @@ public class ManifestsTableTest extends TableTestBase {
"Specified parameter scan.snapshot-id = 3 is not exist, you
can set it in range from 1 to 2");
}
+ @Test
+ public void testFilterBySchemaIdEqual() throws Exception {
+ List<InternalRow> expectedRow = getExpectedResult(2L);
+ List<InternalRow> result = readWithFilter(manifestsTable,
schemaIdEqual(0L));
+ assertThat(result).containsExactlyElementsOf(expectedRow);
+ }
+
+ @Test
+ public void testFilterBySchemaIdEqualNoMatch() throws Exception {
+ List<InternalRow> result = readWithFilter(manifestsTable,
schemaIdEqual(999L));
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ public void testFilterBySchemaIdRange() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+ Predicate predicate =
+ PredicateBuilder.and(builder.greaterOrEqual(4, 0L),
builder.lessOrEqual(4, 0L));
+ List<InternalRow> expectedRow = getExpectedResult(2L);
+ List<InternalRow> result = readWithFilter(manifestsTable, predicate);
+ assertThat(result).containsExactlyElementsOf(expectedRow);
+ }
+
+ @Test
+ public void testFilterBySchemaIdGreaterThan() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+ List<InternalRow> result = readWithFilter(manifestsTable,
builder.greaterThan(4, 0L));
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ public void testFilterBySchemaIdIn() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+ Predicate predicate = builder.in(4, Arrays.asList(0L, 1L));
+ List<InternalRow> expectedRow = getExpectedResult(2L);
+ List<InternalRow> result = readWithFilter(manifestsTable, predicate);
+ assertThat(result).containsExactlyElementsOf(expectedRow);
+ }
+
+ @Test
+ public void testFilterBySchemaIdInNoMatch() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+ Predicate predicate = builder.in(4, Arrays.asList(998L, 999L));
+ List<InternalRow> result = readWithFilter(manifestsTable, predicate);
+ assertThat(result).isEmpty();
+ }
+
@Test
void testManifestCreationTimeTimestamp() throws Exception {
Identifier identifier = identifier("T_CreationTime");
@@ -243,4 +296,19 @@ public class ManifestsTableTest extends TableTestBase {
}
return expectedRow;
}
+
+ private Predicate schemaIdEqual(long schemaId) {
+ PredicateBuilder builder = new
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+ return builder.equal(4, schemaId);
+ }
+
+ private List<InternalRow> readWithFilter(Table table, Predicate predicate)
throws Exception {
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+ return rows;
+ }
}