This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ef660c0631 [test] Add file skipping test in ClusteringTableTest
ef660c0631 is described below
commit ef660c0631b03f9e2e1541e19e007572672147c0
Author: JingsongLi <[email protected]>
AuthorDate: Fri Mar 20 12:28:31 2026 +0800
[test] Add file skipping test in ClusteringTableTest
---
.../paimon/separated/ClusteringTableTest.java | 68 ++++++++++++++++++++++
.../format/parquet/ParquetReaderFactory.java | 6 ++
.../format/parquet/ParquetSchemaCacheTest.java | 9 +--
3 files changed, 76 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index ed8dd87ef5..f69f5fc6ae 100644
---
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -26,11 +26,14 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.CloseableIterator;
@@ -333,6 +336,61 @@ class ClusteringTableTest {
.containsExactlyInAnyOrder(GenericRow.of(1, 50),
GenericRow.of(2, 60));
}
+ // ==================== Clustering Column Filter Tests ====================
+
+ /** Test that equality filter on clustering column skips irrelevant files
in the scan plan. */
+ @Test
+ public void testClusteringColumnEqualityFilterSkipsFiles() throws
Exception {
+ // Write 3 commits with widely separated, non-overlapping b ranges
+ writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+ writeRows(Arrays.asList(GenericRow.of(3, 100), GenericRow.of(4, 110)));
+ writeRows(Arrays.asList(GenericRow.of(5, 1000), GenericRow.of(6,
1010)));
+
+ // After compaction, expect at least 2 files with non-overlapping b
ranges
+ int totalFiles = countFiles(table, null);
+ assertThat(totalFiles).isGreaterThanOrEqualTo(2);
+
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ // b = 1005 → only file(s) covering [1000, 1010] match, skip file(s)
with smaller b
+ assertThat(countFiles(table, pb.equal(1,
1005))).isLessThan(totalFiles);
+
+ // b = 15 → only file(s) covering [10, 20] match, skip file(s) with
larger b
+ assertThat(countFiles(table, pb.equal(1, 15))).isLessThan(totalFiles);
+
+ // b = 5000 → no file covers this value, should return 0 files
+ assertThat(countFiles(table, pb.equal(1, 5000))).isEqualTo(0);
+ }
+
+ /** Test that range filter on clustering column skips irrelevant files in
the scan plan. */
+ @Test
+ public void testClusteringColumnRangeFilterSkipsFiles() throws Exception {
+ // Write 3 commits with widely separated, non-overlapping b ranges
+ writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+ writeRows(Arrays.asList(GenericRow.of(3, 100), GenericRow.of(4, 110)));
+ writeRows(Arrays.asList(GenericRow.of(5, 1000), GenericRow.of(6,
1010)));
+
+ // After compaction, expect at least 2 files with non-overlapping b
ranges
+ int totalFiles = countFiles(table, null);
+ assertThat(totalFiles).isGreaterThanOrEqualTo(2);
+
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ // b > 500 → only file(s) covering [1000, 1010] match
+ assertThat(countFiles(table, pb.greaterThan(1,
500))).isLessThan(totalFiles);
+
+ // b < 50 → only file(s) covering [10, 20] match
+ assertThat(countFiles(table, pb.lessThan(1,
50))).isLessThan(totalFiles);
+
+ // 50 <= b <= 150 → only file(s) covering [100, 110] match
+ Predicate rangeFilter =
+ PredicateBuilder.and(pb.greaterOrEqual(1, 50),
pb.lessOrEqual(1, 150));
+ assertThat(countFiles(table, rangeFilter)).isLessThan(totalFiles);
+
+ // b > 5 → all files match (all b values are > 5)
+ assertThat(countFiles(table, pb.greaterThan(1,
5))).isEqualTo(totalFiles);
+ }
+
// ==================== First-Row Mode Tests ====================
/** Test first-row mode keeps the first record when same key is written
multiple times. */
@@ -709,4 +767,14 @@ class ClusteringTableTest {
}
return result;
}
+
+ private int countFiles(Table targetTable, Predicate filter) {
+ ReadBuilder readBuilder = targetTable.newReadBuilder();
+ if (filter != null) {
+ readBuilder.withFilter(filter);
+ }
+ return readBuilder.newScan().plan().splits().stream()
+ .mapToInt(split -> ((DataSplit) split).dataFiles().size())
+ .sum();
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 6da6ccc933..9a741a63ed 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.parquet;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
@@ -103,6 +104,11 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
this.filter = filter;
}
+ @VisibleForTesting
+ Map<MessageType, RequestedSchema> requestedSchemaCache() {
+ return requestedSchemaCache;
+ }
+
@Override
public FileRecordReader<InternalRow>
createReader(FormatReaderFactory.Context context)
throws IOException {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaCacheTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaCacheTest.java
index 0d5b0ca58a..146391021a 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaCacheTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaCacheTest.java
@@ -35,8 +35,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
-import java.lang.reflect.Field;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -198,10 +196,7 @@ public class ParquetSchemaCacheTest {
return cnt.get();
}
- private int getSchemaCacheSize(ParquetReaderFactory factory) throws
Exception {
- Field field =
ParquetReaderFactory.class.getDeclaredField("schemaCache");
- field.setAccessible(true);
- Map<?, ?> cache = (Map<?, ?>) field.get(factory);
- return cache.size();
+ private int getSchemaCacheSize(ParquetReaderFactory factory) {
+ return factory.requestedSchemaCache().size();
}
}