This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 917ec03678 [core] Support consumer_id predicate pushdown for
ConsumersTable (#7329)
917ec03678 is described below
commit 917ec0367837b99841654802e4f510747d072688
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Mar 2 15:54:24 2026 +0800
[core] Support consumer_id predicate pushdown for ConsumersTable (#7329)
---
.../apache/paimon/table/system/ConsumersTable.java | 56 ++++++++++++++++++++-
.../paimon/table/system/ConsumersTableTest.java | 57 ++++++++++++++++++++++
2 files changed, 112 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index 74c160da96..7865ff37a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -26,6 +26,13 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.InPredicateVisitor;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -47,8 +54,10 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -171,6 +180,7 @@ public class ConsumersTable implements ReadonlyTable {
private final FileIO fileIO;
private RowType readType;
+ private final List<String> consumerIds = new ArrayList<>();
public ConsumersRead(FileIO fileIO) {
this.fileIO = fileIO;
@@ -178,9 +188,41 @@ public class ConsumersTable implements ReadonlyTable {
@Override
public InnerTableRead withFilter(Predicate predicate) {
+ if (predicate == null) {
+ return this;
+ }
+
+ String leafName = "consumer_id";
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ if ((compoundPredicate.function()) instanceof Or) {
+ // optimize for IN filter
+ InPredicateVisitor.extractInElements(predicate, leafName)
+ .ifPresent(
+ leafs ->
+ leafs.forEach(
+ leaf ->
consumerIds.add(leaf.toString())));
+ } else if ((compoundPredicate.function()) instanceof And) {
+ List<Predicate> children = compoundPredicate.children();
+ for (Predicate leaf : children) {
+ handleLeafPredicate(leaf, leafName);
+ }
+ }
+ } else {
+ handleLeafPredicate(predicate, leafName);
+ }
+
return this;
}
+ public void handleLeafPredicate(Predicate predicate, String leafName) {
+ LeafPredicate consumerPred =
+
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
+ if (consumerPred != null && consumerPred.function() instanceof
Equal) {
+ consumerIds.add(consumerPred.literals().get(0).toString());
+ }
+ }
+
@Override
public InnerTableRead withReadType(RowType readType) {
this.readType = readType;
@@ -198,7 +240,19 @@ public class ConsumersTable implements ReadonlyTable {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
Path location = ((ConsumersTable.ConsumersSplit) split).location;
- Map<String, Long> consumers = new ConsumerManager(fileIO,
location, branch).consumers();
+ Map<String, Long> consumers;
+ if (!consumerIds.isEmpty()) {
+ consumers = new HashMap<>();
+ ConsumerManager consumerManager = new ConsumerManager(fileIO,
location, branch);
+ for (String consumerId : consumerIds) {
+ consumerManager
+ .consumer(consumerId)
+ .ifPresent(
+ consumer -> consumers.put(consumerId,
consumer.nextSnapshot()));
+ }
+ } else {
+ consumers = new ConsumerManager(fileIO, location,
branch).consumers();
+ }
Iterator<InternalRow> rows =
Iterators.transform(consumers.entrySet().iterator(),
this::toRow);
if (readType != null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
index 6412db9df0..249d570991 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
@@ -25,15 +25,23 @@ import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -75,6 +83,40 @@ public class ConsumersTableTest extends TableTestBase {
assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
}
+ @Test
+ public void testFilterByConsumerIdEqual() throws Exception {
+ Predicate predicate = consumerIdEqual("id1");
+ List<InternalRow> expectedRow =
+ Arrays.asList(GenericRow.of(BinaryString.fromString("id1"),
5L));
+ List<InternalRow> result = readWithFilter(consumersTable, predicate);
+ assertThat(result).containsExactlyElementsOf(expectedRow);
+ }
+
+ @Test
+ public void testFilterByConsumerIdEqualNoMatch() throws Exception {
+ Predicate predicate = consumerIdEqual("id999");
+ List<InternalRow> result = readWithFilter(consumersTable, predicate);
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ public void testFilterByConsumerIdIn() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(ConsumersTable.TABLE_TYPE);
+ Predicate predicate = builder.in(0, Arrays.asList("id1", "id999"));
+ List<InternalRow> expectedRow =
+ Arrays.asList(GenericRow.of(BinaryString.fromString("id1"),
5L));
+ List<InternalRow> result = readWithFilter(consumersTable, predicate);
+ assertThat(result).containsExactlyElementsOf(expectedRow);
+ }
+
+ @Test
+ public void testFilterByConsumerIdInNoMatch() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(ConsumersTable.TABLE_TYPE);
+ Predicate predicate = builder.in(0, Arrays.asList("id998", "id999"));
+ List<InternalRow> result = readWithFilter(consumersTable, predicate);
+ assertThat(result).isEmpty();
+ }
+
private List<InternalRow> getExpectedResult() throws IOException {
Map<String, Long> consumers = manager.consumers();
return consumers.entrySet().stream()
@@ -84,4 +126,19 @@ public class ConsumersTableTest extends TableTestBase {
BinaryString.fromString(entry.getKey()), entry.getValue()))
.collect(Collectors.toList());
}
+
+ private Predicate consumerIdEqual(String consumerId) {
+ PredicateBuilder builder = new
PredicateBuilder(ConsumersTable.TABLE_TYPE);
+ return builder.equal(0, BinaryString.fromString(consumerId));
+ }
+
+ private List<InternalRow> readWithFilter(Table table, Predicate predicate)
throws Exception {
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+ return rows;
+ }
}