This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 59d3995bd2 [core] Support schema_id predicate pushdown for 
ManifestsTable (#7310)
59d3995bd2 is described below

commit 59d3995bd27ef0dbc95fb5e1ae22151b4cbb30b7
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri Feb 27 10:28:12 2026 +0800

    [core] Support schema_id predicate pushdown for ManifestsTable (#7310)
---
 .../apache/paimon/table/system/ManifestsTable.java | 117 ++++++++++++++++++++-
 .../paimon/table/system/ManifestsTableTest.java    |  68 ++++++++++++
 2 files changed, 183 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index ea91154b7d..7bc8f23b42 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -30,6 +30,17 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
+import org.apache.paimon.predicate.InPredicateVisitor;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.LessOrEqual;
+import org.apache.paimon.predicate.LessThan;
+import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -55,11 +66,15 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -137,7 +152,7 @@ public class ManifestsTable implements ReadonlyTable {
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
-            // TODO
+            // filter is handled in ManifestsRead
             return this;
         }
 
@@ -169,20 +184,80 @@ public class ManifestsTable implements ReadonlyTable {
 
     private static class ManifestsRead implements InnerTableRead {
 
+        private static final String LEAF_NAME = "schema_id";
+
         private RowType readType;
 
         private final FileStoreTable dataTable;
 
+        private @Nullable Long schemaIdMin = null;
+        private @Nullable Long schemaIdMax = null;
+        private final List<Long> schemaIds = new ArrayList<>();
+
         public ManifestsRead(FileStoreTable dataTable) {
             this.dataTable = dataTable;
         }
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
-            // TODO
+            if (predicate == null) {
+                return this;
+            }
+
+            if (predicate instanceof CompoundPredicate) {
+                CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+                if ((compoundPredicate.function()) instanceof And) {
+                    List<Predicate> children = compoundPredicate.children();
+                    for (Predicate leaf : children) {
+                        handleLeafPredicate(leaf, LEAF_NAME);
+                    }
+                }
+
+                // optimize for IN filter
+                if ((compoundPredicate.function()) instanceof Or) {
+                    InPredicateVisitor.extractInElements(predicate, LEAF_NAME)
+                            .ifPresent(
+                                    leafs ->
+                                            leafs.forEach(
+                                                    leaf ->
+                                                            schemaIds.add(
+                                                                    
Long.parseLong(
+                                                                            
leaf.toString()))));
+                }
+            } else {
+                handleLeafPredicate(predicate, LEAF_NAME);
+            }
+
             return this;
         }
 
+        private void handleLeafPredicate(Predicate predicate, String leafName) 
{
+            LeafPredicate schemaPred =
+                    
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
+            if (schemaPred != null) {
+                if (schemaPred.function() instanceof Equal) {
+                    schemaIdMin = (Long) schemaPred.literals().get(0);
+                    schemaIdMax = (Long) schemaPred.literals().get(0);
+                }
+
+                if (schemaPred.function() instanceof GreaterThan) {
+                    schemaIdMin = (Long) schemaPred.literals().get(0) + 1;
+                }
+
+                if (schemaPred.function() instanceof GreaterOrEqual) {
+                    schemaIdMin = (Long) schemaPred.literals().get(0);
+                }
+
+                if (schemaPred.function() instanceof LessThan) {
+                    schemaIdMax = (Long) schemaPred.literals().get(0) - 1;
+                }
+
+                if (schemaPred.function() instanceof LessOrEqual) {
+                    schemaIdMax = (Long) schemaPred.literals().get(0);
+                }
+            }
+        }
+
         @Override
         public InnerTableRead withReadType(RowType readType) {
             this.readType = readType;
@@ -201,6 +276,17 @@ public class ManifestsTable implements ReadonlyTable {
             }
             List<ManifestFileMeta> manifestFileMetas = allManifests(dataTable);
 
+            // Apply schema_id filter
+            if (!schemaIds.isEmpty()) {
+                manifestFileMetas = filterBySchemaIds(manifestFileMetas, 
schemaIds);
+            } else if (schemaIdMin != null || schemaIdMax != null) {
+                manifestFileMetas =
+                        filterBySchemaIdRange(
+                                manifestFileMetas,
+                                Optional.ofNullable(schemaIdMin),
+                                Optional.ofNullable(schemaIdMax));
+            }
+
             @SuppressWarnings("unchecked")
             CastExecutor<InternalRow, BinaryString> partitionCastExecutor =
                     (CastExecutor<InternalRow, BinaryString>)
@@ -222,6 +308,33 @@ public class ManifestsTable implements ReadonlyTable {
             return new IteratorRecordReader<>(rows);
         }
 
+        private static List<ManifestFileMeta> filterBySchemaIds(
+                List<ManifestFileMeta> metas, List<Long> schemaIds) {
+            List<ManifestFileMeta> result = new ArrayList<>();
+            for (ManifestFileMeta meta : metas) {
+                if (schemaIds.contains(meta.schemaId())) {
+                    result.add(meta);
+                }
+            }
+            return result;
+        }
+
+        private static List<ManifestFileMeta> filterBySchemaIdRange(
+                List<ManifestFileMeta> metas, Optional<Long> min, 
Optional<Long> max) {
+            List<ManifestFileMeta> result = new ArrayList<>();
+            for (ManifestFileMeta meta : metas) {
+                long schemaId = meta.schemaId();
+                if (min.isPresent() && schemaId < min.get()) {
+                    continue;
+                }
+                if (max.isPresent() && schemaId > max.get()) {
+                    continue;
+                }
+                result.add(meta);
+            }
+            return result;
+        }
+
         private InternalRow toRow(
                 ManifestFileMeta manifestFileMeta,
                 CastExecutor<InternalRow, BinaryString> partitionCastExecutor) 
{
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index 3a4507c5b6..c8bf00e813 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -33,10 +34,14 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -44,6 +49,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -164,6 +170,53 @@ public class ManifestsTableTest extends TableTestBase {
                 "Specified parameter scan.snapshot-id = 3 is not exist, you 
can set it in range from 1 to 2");
     }
 
+    @Test
+    public void testFilterBySchemaIdEqual() throws Exception {
+        List<InternalRow> expectedRow = getExpectedResult(2L);
+        List<InternalRow> result = readWithFilter(manifestsTable, 
schemaIdEqual(0L));
+        assertThat(result).containsExactlyElementsOf(expectedRow);
+    }
+
+    @Test
+    public void testFilterBySchemaIdEqualNoMatch() throws Exception {
+        List<InternalRow> result = readWithFilter(manifestsTable, 
schemaIdEqual(999L));
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    public void testFilterBySchemaIdRange() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+        Predicate predicate =
+                PredicateBuilder.and(builder.greaterOrEqual(4, 0L), 
builder.lessOrEqual(4, 0L));
+        List<InternalRow> expectedRow = getExpectedResult(2L);
+        List<InternalRow> result = readWithFilter(manifestsTable, predicate);
+        assertThat(result).containsExactlyElementsOf(expectedRow);
+    }
+
+    @Test
+    public void testFilterBySchemaIdGreaterThan() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+        List<InternalRow> result = readWithFilter(manifestsTable, 
builder.greaterThan(4, 0L));
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    public void testFilterBySchemaIdIn() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+        Predicate predicate = builder.in(4, Arrays.asList(0L, 1L));
+        List<InternalRow> expectedRow = getExpectedResult(2L);
+        List<InternalRow> result = readWithFilter(manifestsTable, predicate);
+        assertThat(result).containsExactlyElementsOf(expectedRow);
+    }
+
+    @Test
+    public void testFilterBySchemaIdInNoMatch() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+        Predicate predicate = builder.in(4, Arrays.asList(998L, 999L));
+        List<InternalRow> result = readWithFilter(manifestsTable, predicate);
+        assertThat(result).isEmpty();
+    }
+
     @Test
     void testManifestCreationTimeTimestamp() throws Exception {
         Identifier identifier = identifier("T_CreationTime");
@@ -243,4 +296,19 @@ public class ManifestsTableTest extends TableTestBase {
         }
         return expectedRow;
     }
+
+    private Predicate schemaIdEqual(long schemaId) {
+        PredicateBuilder builder = new 
PredicateBuilder(ManifestsTable.TABLE_TYPE);
+        return builder.equal(4, schemaId);
+    }
+
+    private List<InternalRow> readWithFilter(Table table, Predicate predicate) 
throws Exception {
+        ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+        return rows;
+    }
 }

Reply via email to