This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 917ec03678 [core] Support consumer_id predicate pushdown for 
ConsumersTable (#7329)
917ec03678 is described below

commit 917ec0367837b99841654802e4f510747d072688
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Mar 2 15:54:24 2026 +0800

    [core] Support consumer_id predicate pushdown for ConsumersTable (#7329)
---
 .../apache/paimon/table/system/ConsumersTable.java | 56 ++++++++++++++++++++-
 .../paimon/table/system/ConsumersTableTest.java    | 57 ++++++++++++++++++++++
 2 files changed, 112 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index 74c160da96..7865ff37a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -26,6 +26,13 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.InPredicateVisitor;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -47,8 +54,10 @@ import org.apache.paimon.utils.SerializationUtils;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -171,6 +180,7 @@ public class ConsumersTable implements ReadonlyTable {
 
         private final FileIO fileIO;
         private RowType readType;
+        private final List<String> consumerIds = new ArrayList<>();
 
         public ConsumersRead(FileIO fileIO) {
             this.fileIO = fileIO;
@@ -178,9 +188,41 @@ public class ConsumersTable implements ReadonlyTable {
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
+            if (predicate == null) {
+                return this;
+            }
+
+            String leafName = "consumer_id";
+            if (predicate instanceof CompoundPredicate) {
+                CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+                if ((compoundPredicate.function()) instanceof Or) {
+                    // optimize for IN filter
+                    InPredicateVisitor.extractInElements(predicate, leafName)
+                            .ifPresent(
+                                    leafs ->
+                                            leafs.forEach(
+                                                    leaf -> 
consumerIds.add(leaf.toString())));
+                } else if ((compoundPredicate.function()) instanceof And) {
+                    List<Predicate> children = compoundPredicate.children();
+                    for (Predicate leaf : children) {
+                        handleLeafPredicate(leaf, leafName);
+                    }
+                }
+            } else {
+                handleLeafPredicate(predicate, leafName);
+            }
+
             return this;
         }
 
+        public void handleLeafPredicate(Predicate predicate, String leafName) {
+            LeafPredicate consumerPred =
+                    
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
+            if (consumerPred != null && consumerPred.function() instanceof 
Equal) {
+                consumerIds.add(consumerPred.literals().get(0).toString());
+            }
+        }
+
         @Override
         public InnerTableRead withReadType(RowType readType) {
             this.readType = readType;
@@ -198,7 +240,19 @@ public class ConsumersTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((ConsumersTable.ConsumersSplit) split).location;
-            Map<String, Long> consumers = new ConsumerManager(fileIO, 
location, branch).consumers();
+            Map<String, Long> consumers;
+            if (!consumerIds.isEmpty()) {
+                consumers = new HashMap<>();
+                ConsumerManager consumerManager = new ConsumerManager(fileIO, 
location, branch);
+                for (String consumerId : consumerIds) {
+                    consumerManager
+                            .consumer(consumerId)
+                            .ifPresent(
+                                    consumer -> consumers.put(consumerId, 
consumer.nextSnapshot()));
+                }
+            } else {
+                consumers = new ConsumerManager(fileIO, location, 
branch).consumers();
+            }
             Iterator<InternalRow> rows =
                     Iterators.transform(consumers.entrySet().iterator(), 
this::toRow);
             if (readType != null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
index 6412db9df0..249d570991 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
@@ -25,15 +25,23 @@ import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -75,6 +83,40 @@ public class ConsumersTableTest extends TableTestBase {
         assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
     }
 
+    @Test
+    public void testFilterByConsumerIdEqual() throws Exception {
+        Predicate predicate = consumerIdEqual("id1");
+        List<InternalRow> expectedRow =
+                Arrays.asList(GenericRow.of(BinaryString.fromString("id1"), 
5L));
+        List<InternalRow> result = readWithFilter(consumersTable, predicate);
+        assertThat(result).containsExactlyElementsOf(expectedRow);
+    }
+
+    @Test
+    public void testFilterByConsumerIdEqualNoMatch() throws Exception {
+        Predicate predicate = consumerIdEqual("id999");
+        List<InternalRow> result = readWithFilter(consumersTable, predicate);
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    public void testFilterByConsumerIdIn() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(ConsumersTable.TABLE_TYPE);
+        Predicate predicate = builder.in(0, Arrays.asList("id1", "id999"));
+        List<InternalRow> expectedRow =
+                Arrays.asList(GenericRow.of(BinaryString.fromString("id1"), 
5L));
+        List<InternalRow> result = readWithFilter(consumersTable, predicate);
+        assertThat(result).containsExactlyElementsOf(expectedRow);
+    }
+
+    @Test
+    public void testFilterByConsumerIdInNoMatch() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(ConsumersTable.TABLE_TYPE);
+        Predicate predicate = builder.in(0, Arrays.asList("id998", "id999"));
+        List<InternalRow> result = readWithFilter(consumersTable, predicate);
+        assertThat(result).isEmpty();
+    }
+
     private List<InternalRow> getExpectedResult() throws IOException {
         Map<String, Long> consumers = manager.consumers();
         return consumers.entrySet().stream()
@@ -84,4 +126,19 @@ public class ConsumersTableTest extends TableTestBase {
                                         
BinaryString.fromString(entry.getKey()), entry.getValue()))
                 .collect(Collectors.toList());
     }
+
+    private Predicate consumerIdEqual(String consumerId) {
+        PredicateBuilder builder = new 
PredicateBuilder(ConsumersTable.TABLE_TYPE);
+        return builder.equal(0, BinaryString.fromString(consumerId));
+    }
+
+    private List<InternalRow> readWithFilter(Table table, Predicate predicate) 
throws Exception {
+        ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+        return rows;
+    }
 }

Reply via email to