This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2902aa2bdf [core] Add tests and implement withFilter for 
StatisticTable (#7893)
2902aa2bdf is described below

commit 2902aa2bdfeaf518abf0a46e4078d5ce73d10bd5
Author: Silas <[email protected]>
AuthorDate: Sat May 23 22:50:18 2026 +0800

    [core] Add tests and implement withFilter for StatisticTable (#7893)
---
 .../apache/paimon/table/system/StatisticTable.java |  43 ++++---
 .../paimon/table/system/StatisticTableTest.java    | 132 +++++++++++++++++++++
 2 files changed, 158 insertions(+), 17 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
index b1d305ffa7..2c8c5759ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
@@ -47,6 +47,8 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -119,7 +121,6 @@ public class StatisticTable implements ReadonlyTable {
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
-            // TODO
             return this;
         }
 
@@ -168,6 +169,8 @@ public class StatisticTable implements ReadonlyTable {
 
         private RowType readType;
 
+        @Nullable private Predicate postFilter;
+
         private final FileStoreTable dataTable;
 
         public StatisticRead(FileStoreTable dataTable) {
@@ -176,7 +179,7 @@ public class StatisticTable implements ReadonlyTable {
 
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
-            // TODO
+            this.postFilter = predicate;
             return this;
         }
 
@@ -198,23 +201,29 @@ public class StatisticTable implements ReadonlyTable {
             }
 
             Optional<Statistics> statisticsOptional = dataTable.statistics();
-            if (statisticsOptional.isPresent()) {
-                Statistics statistics = statisticsOptional.get();
-                Iterator<Statistics> statisticsIterator =
-                        Collections.singletonList(statistics).iterator();
-                Iterator<InternalRow> rows = 
Iterators.transform(statisticsIterator, this::toRow);
-                if (readType != null) {
-                    rows =
-                            Iterators.transform(
-                                    rows,
-                                    row ->
-                                            ProjectedRow.from(readType, 
StatisticTable.TABLE_TYPE)
-                                                    .replaceRow(row));
-                }
-                return new IteratorRecordReader<>(rows);
-            } else {
+            if (!statisticsOptional.isPresent()) {
                 return new EmptyRecordReader<>();
             }
+
+            Iterator<InternalRow> rows =
+                    Iterators.transform(
+                            
Collections.singletonList(statisticsOptional.get()).iterator(),
+                            this::toRow);
+
+            if (postFilter != null) {
+                rows = Iterators.filter(rows, postFilter::test);
+            }
+
+            if (readType != null) {
+                rows =
+                        Iterators.transform(
+                                rows,
+                                row ->
+                                        ProjectedRow.from(readType, 
StatisticTable.TABLE_TYPE)
+                                                .replaceRow(row));
+            }
+
+            return new IteratorRecordReader<>(rows);
         }
 
         private InternalRow toRow(Statistics statistics) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/StatisticTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/StatisticTableTest.java
new file mode 100644
index 0000000000..750ffd35db
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/StatisticTableTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link StatisticTable}. */
+class StatisticTableTest extends TableTestBase {
+
+    private static final String tableName = "MyTable";
+
+    private FileStoreTable table;
+    private StatisticTable statisticTable;
+
+    @BeforeEach
+    void before() throws Exception {
+        Identifier identifier = identifier(tableName);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("product_id", DataTypes.INT())
+                        .column("price", DataTypes.INT())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        table = (FileStoreTable) catalog.getTable(identifier);
+        write(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
+        statisticTable = (StatisticTable) 
catalog.getTable(identifier(tableName + "$statistics"));
+    }
+
+    @Test
+    void testEmptyStatistics() throws Exception {
+        assertThat(read(statisticTable)).isEmpty();
+    }
+
+    @Test
+    void testReadStatistics() throws Exception {
+        long writtenSnapshotId = commitStatistics(10L, 1000L);
+
+        List<InternalRow> rows = read(statisticTable);
+        assertThat(rows).hasSize(1);
+        InternalRow row = rows.get(0);
+        assertThat(row.getLong(0)).isEqualTo(writtenSnapshotId);
+        assertThat(row.getLong(2)).isEqualTo(10L);
+        assertThat(row.getLong(3)).isEqualTo(1000L);
+    }
+
+    @Test
+    void testReadWithSnapshotIdEqualHit() throws Exception {
+        long writtenSnapshotId = commitStatistics(10L, 1000L);
+
+        PredicateBuilder builder = new 
PredicateBuilder(statisticTable.rowType());
+        List<InternalRow> rows = readWith(builder.equal(0, writtenSnapshotId));
+        assertThat(rows).hasSize(1);
+        assertThat(rows.get(0).getLong(0)).isEqualTo(writtenSnapshotId);
+    }
+
+    @Test
+    void testReadWithSnapshotIdEqualMiss() throws Exception {
+        commitStatistics(10L, 1000L);
+
+        PredicateBuilder builder = new 
PredicateBuilder(statisticTable.rowType());
+        assertThat(readWith(builder.equal(0, Long.MAX_VALUE))).isEmpty();
+    }
+
+    @Test
+    void testReadWithMergedRecordCountFilter() throws Exception {
+        commitStatistics(10L, 1000L);
+
+        PredicateBuilder builder = new 
PredicateBuilder(statisticTable.rowType());
+        assertThat(readWith(builder.greaterThan(2, 5L))).hasSize(1);
+        assertThat(readWith(builder.greaterThan(2, 100L))).isEmpty();
+    }
+
+    private long commitStatistics(long recordCount, long recordSize) throws 
Exception {
+        long snapshotId = table.snapshotManager().latestSnapshot().id();
+        long schemaId = table.snapshotManager().latestSnapshot().schemaId();
+        Statistics stats = new Statistics(snapshotId, schemaId, recordCount, 
recordSize);
+        try (TableCommitImpl commit = table.newCommit(commitUser)) {
+            commit.updateStatistics(stats);
+        }
+        return snapshotId;
+    }
+
+    private List<InternalRow> readWith(Predicate predicate) throws IOException 
{
+        ReadBuilder readBuilder = statisticTable.newReadBuilder();
+        if (predicate != null) {
+            readBuilder = readBuilder.withFilter(predicate);
+        }
+        List<InternalRow> result = new ArrayList<>();
+        try (RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
+            reader.forEachRemaining(result::add);
+        }
+        return result;
+    }
+}

Reply via email to