This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e54917b06 [core] Fix DataEvolutionFileStoreScan schema-evolution 
filtering (#8084)
4e54917b06 is described below

commit 4e54917b06cd603d9e6f3cbb79fec0694b3ae69e
Author: duanyyyyyyy <[email protected]>
AuthorDate: Thu Jun 4 13:53:43 2026 +0800

    [core] Fix DataEvolutionFileStoreScan schema-evolution filtering (#8084)
---
 .../apache/paimon/reader/DataEvolutionArray.java   |  30 ++
 .../operation/DataEvolutionFileStoreScan.java      | 119 +++++--
 .../paimon/table/DataEvolutionTableTest.java       | 376 +++++++++++++++++++++
 3 files changed, 490 insertions(+), 35 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
index 8b89c1ce7f..0995e84e96 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
@@ -31,14 +31,31 @@ import org.apache.paimon.data.variant.Variant;
 /** The array which is made up by several rows. */
 public class DataEvolutionArray implements InternalArray {
 
+    /** Sentinel for "no fallback"; positions with rowOffsets[pos] &lt; 0 stay 
null. */
+    public static final long NO_MISSING_FIELD_FALLBACK = Long.MIN_VALUE;
+
     private final InternalArray[] rows;
     private final int[] rowOffsets;
     private final int[] fieldOffsets;
 
+    /**
+     * Value to return from getLong(pos) when {@code rowOffsets[pos] &lt; 0}. 
Used by data-evolution
+     * null-count arrays to encode "field not physically present in any file 
in the group" as "all
+     * rowCount rows are null" instead of "unknown stats". {@link 
#NO_MISSING_FIELD_FALLBACK}
+     * disables the fallback.
+     */
+    private final long missingFieldLong;
+
     public DataEvolutionArray(int rowNumber, int[] rowOffsets, int[] 
fieldOffsets) {
+        this(rowNumber, rowOffsets, fieldOffsets, NO_MISSING_FIELD_FALLBACK);
+    }
+
+    public DataEvolutionArray(
+            int rowNumber, int[] rowOffsets, int[] fieldOffsets, long 
missingFieldLong) {
         this.rows = new InternalArray[rowNumber];
         this.rowOffsets = rowOffsets;
         this.fieldOffsets = fieldOffsets;
+        this.missingFieldLong = missingFieldLong;
     }
 
     public void setRow(int pos, InternalArray row) {
@@ -73,6 +90,16 @@ public class DataEvolutionArray implements InternalArray {
 
     @Override
     public boolean isNullAt(int pos) {
+        // rowOffsets[pos] == -1: field is absent from every file in the 
group, so every
+        //   logical row is null for it; with missingFieldLong set this is 
encoded as a
+        //   known count rather than "unknown stats" (isNullAt=false), so 
non-IS-NULL
+        //   predicates can prune the file.
+        // rowOffsets[pos] == -2: field exists in a file but its stats were 
not captured
+        //   (e.g. valueStatsCols did not include it). Treat as unknown stats 
so callers
+        //   stay conservative.
+        if (rowOffsets[pos] == -1 && missingFieldLong != 
NO_MISSING_FIELD_FALLBACK) {
+            return false;
+        }
         if (rowOffsets[pos] < 0) {
             return true;
         }
@@ -101,6 +128,9 @@ public class DataEvolutionArray implements InternalArray {
 
     @Override
     public long getLong(int pos) {
+        if (rowOffsets[pos] == -1 && missingFieldLong != 
NO_MISSING_FIELD_FALLBACK) {
+            return missingFieldLong;
+        }
         return chooseArray(pos).getLong(offsetInRow(pos));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 514afff296..38ab9b0db0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -46,11 +46,13 @@ import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Function;
@@ -60,16 +62,18 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
 import static org.apache.paimon.types.VectorType.isVectorStoreFile;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** {@link FileStoreScan} for data-evolution enabled table. */
 public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
 
-    private final ConcurrentMap<Pair<Long, List<String>>, List<String>> 
fileFields;
-
     private boolean dropStats = false;
     @Nullable private RowType readType;
 
+    // Cache file's physical field id set per (schemaId, writeCols) to avoid 
recomputing during
+    // per-file column pruning in postFilterManifestEntries.
+    private final ConcurrentMap<Pair<Long, List<String>>, Set<Integer>> 
fileFieldIdsCache =
+            new ConcurrentHashMap<>();
+
     public DataEvolutionFileStoreScan(
             ManifestsReader manifestsReader,
             BucketSelectConverter bucketSelectConverter,
@@ -90,8 +94,6 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
                 false,
                 deletionVectorsEnabled,
                 true);
-
-        this.fileFields = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -175,21 +177,22 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
 
     @Override
     protected boolean postFilterManifestEntriesEnabled() {
-        return inputFilter != null;
+        // Always enable post-filtering. The list filterByStats handles 
predicate-based pruning
+        // and pruneByReadType strips per-file columns that are not requested 
— both
+        // need row-id-range grouping that single filterByStats(ManifestEntry) 
cannot see.
+        return inputFilter != null || readType != null;
     }
 
     @Override
     protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
-        checkNotNull(inputFilter);
-
         // group by row id range
         RangeHelper<ManifestEntry> rangeHelper =
                 new RangeHelper<>(e -> e.file().nonNullRowIdRange());
         List<List<ManifestEntry>> splitByRowId = 
rangeHelper.mergeOverlappingRanges(entries);
 
         return splitByRowId.stream()
-                .filter(this::filterByStats)
-                .flatMap(Collection::stream)
+                .filter(group -> inputFilter == null || filterByStats(group))
+                .flatMap(group -> pruneByReadType(group).stream())
                 .map(entry -> dropStats ? dropStats(entry) : entry)
                 .collect(Collectors.toList());
     }
@@ -200,6 +203,62 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
                 stats.rowCount(), stats.minValues(), stats.maxValues(), 
stats.nullCounts());
     }
 
+    /**
+     * Per-file column pruning within a row-id-range group: drop files whose 
physical columns have
+     * no overlap with the query's {@code readType}. Necessary for 
columnar-split DE scenarios where
+     * a logical row is reconstructed from multiple files in the same row id 
range — a query that
+     * does not reference a file's columns has no reason to read it.
+     *
+     * <p>When every file in the group lacks a requested column (e.g. an ADD 
COLUMN projection over
+     * a row-disjoint pre-ALTER group), one file is kept as a row-count 
representative so the reader
+     * can emit the right number of NULL-filled rows.
+     */
+    private List<ManifestEntry> pruneByReadType(List<ManifestEntry> group) {
+        if (readType == null || group.size() <= 1) {
+            return group;
+        }
+        Set<Integer> readFieldIds = new HashSet<>();
+        for (DataField f : readType.getFields()) {
+            readFieldIds.add(f.id());
+        }
+        List<ManifestEntry> kept = new ArrayList<>(group.size());
+        for (ManifestEntry entry : group) {
+            Set<Integer> fileIds = fileFieldIdsForEntry(entry);
+            for (int id : readFieldIds) {
+                if (fileIds.contains(id)) {
+                    kept.add(entry);
+                    break;
+                }
+            }
+        }
+        // Group must contribute at least one file so the reader sees rowCount 
and can NULL-fill
+        // missing columns for the projection's rows.
+        return kept.isEmpty() ? Collections.singletonList(group.get(0)) : kept;
+    }
+
+    private Set<Integer> fileFieldIdsForEntry(ManifestEntry entry) {
+        return fileFieldIdsCache.computeIfAbsent(
+                Pair.of(entry.file().schemaId(), entry.file().writeCols()),
+                pair -> computeFileFieldIds(this::scanTableSchema, 
entry.file()));
+    }
+
+    /**
+     * Field ids of the columns physically present in {@code file}, resolved 
through the file's own
+     * schema (i.e. the schema the file was written under). Field id, not 
field name, is the stable
+     * identity across schemas — necessary so a renamed column matches an old 
file written under the
+     * pre-rename name.
+     */
+    @VisibleForTesting
+    static Set<Integer> computeFileFieldIds(
+            Function<Long, TableSchema> scanTableSchema, DataFileMeta file) {
+        Set<Integer> ids = new HashSet<>();
+        for (DataField f :
+                
scanTableSchema.apply(file.schemaId()).project(file.writeCols()).fields()) {
+            ids.add(f.id());
+        }
+        return ids;
+    }
+
     /** TODO: Optimize implementation of this method. */
     @VisibleForTesting
     static EvolutionStats evolutionStats(
@@ -279,16 +338,20 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
             }
         }
 
+        long groupRowCount = metas.get(0).file().rowCount();
         DataEvolutionRow finalMin = new DataEvolutionRow(metas.size(), 
rowOffsets, fieldOffsets);
         DataEvolutionRow finalMax = new DataEvolutionRow(metas.size(), 
rowOffsets, fieldOffsets);
+        // For null-count specifically, a field absent from every file in the 
group means every
+        // logical row is null for that field — encode as groupRowCount so 
stats predicates can
+        // prune non-null comparisons (e.g. `extra2 = 'x'`) instead of falling 
back to
+        // "unknown stats -> keep" in LeafPredicate.test.
         DataEvolutionArray finalNullCounts =
-                new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets);
+                new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets, 
groupRowCount);
 
         finalMin.setRows(min);
         finalMax.setRows(max);
         finalNullCounts.setRows(nullCounts);
-        return new EvolutionStats(
-                metas.get(0).file().rowCount(), finalMin, finalMax, 
finalNullCounts);
+        return new EvolutionStats(groupRowCount, finalMin, finalMax, 
finalNullCounts);
     }
 
     /** Note: Keep this thread-safe. */
@@ -296,27 +359,13 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
     protected boolean filterByStats(ManifestEntry entry) {
         DataFileMeta file = entry.file();
 
-        if (readType != null) {
-            boolean containsReadCol = false;
-            List<String> fileFieldNmes =
-                    fileFields.computeIfAbsent(
-                            Pair.of(file.schemaId(), file.writeCols()),
-                            pair ->
-                                    scanTableSchema(file.schemaId())
-                                            .project(file.writeCols())
-                                            .logicalRowType()
-                                            .getFieldNames());
-
-            for (String field : readType.getFieldNames()) {
-                if (fileFieldNmes.contains(field)) {
-                    containsReadCol = true;
-                    break;
-                }
-            }
-            if (!containsReadCol) {
-                return false;
-            }
-        }
+        // Do not drop a file based on read-column intersection. For 
data-evolution
+        // tables a field absent from a file is an implicit NULL across 
rowCount()
+        // rows, and predicates such as `new_col IS NULL` should still match 
those
+        // rows. Predicate-based stats pruning runs in
+        // filterByStats(List<ManifestEntry>), which evolves stats per file via
+        // DataEvolutionRow / DataEvolutionArray and correctly reports missing
+        // fields as null.
 
         // If rowRanges is null, all entries should be kept
         if (this.rowRangeIndex == null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index cc1b07fe6b..0b1799ac81 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -1541,6 +1541,382 @@ public class DataEvolutionTableTest extends 
DataEvolutionTestBase {
         assertThat(rows.get(2).getString(2).toString()).isEqualTo("b");
     }
 
+    /**
+     * Central repro for the ADD COLUMN bug fixed in this change. Pre-ALTER 
files do not carry the
+     * new column physically; {@code WHERE new_col IS NULL} must match every 
pre-ALTER row. Before
+     * the fix, the single-entry filterByStats dropped pre-ALTER files at the 
manifest layer and the
+     * predicate returned zero rows.
+     */
+    @Test
+    public void testAddColumnIsNullKeepsPreAlterRows() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+
+        // Pre-ALTER write: only (f0, f1).
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        RowType writeF0F1 = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeF0F1)) {
+            for (int i = 0; i < 5; i++) {
+                write.write(GenericRow.of(i, BinaryString.fromString("a" + 
i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+
+        // ADD COLUMN f3 (post-ALTER) and write a full-schema row at a fresh 
row id.
+        catalog.alterTable(identifier(), SchemaChange.addColumn("f3", 
DataTypes.STRING()), false);
+        FileStoreTable table = getTableDefault();
+        builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = builder.newWrite()) {
+            for (int i = 5; i < 10; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("a" + i),
+                                BinaryString.fromString("c" + i),
+                                BinaryString.fromString("e" + i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+
+        // WHERE f3 IS NULL -> pre-ALTER rows (5 of them).
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+        int f3Idx = table.rowType().getFieldIndex("f3");
+        ReadBuilder rb = table.newReadBuilder().withFilter(pb.isNull(f3Idx));
+        assertThat(countMatchingRows(rb)).isEqualTo(5);
+    }
+
+    /**
+     * Predicate-aware stats pruning for ADD COLUMN: WHERE new_col = 
'something' cannot match
+     * pre-ALTER rows (their new_col is implicit NULL), so the pre-ALTER 
manifest must be pruned at
+     * planning time. The all-NULL encoding in EvolutionStats / 
DataEvolutionArray makes
+     * LeafPredicate.test drop the file via the leaf's normal decision instead 
of falling back to
+     * "unknown stats -> keep".
+     */
+    @Test
+    public void testAddColumnEqualityPredicatePrunesPreAlterFiles() throws 
Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+
+        // Pre-ALTER write: only (f0, f1).
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        RowType writeF0F1 = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeF0F1)) {
+            for (int i = 0; i < 5; i++) {
+                write.write(GenericRow.of(i, BinaryString.fromString("a" + 
i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+
+        catalog.alterTable(identifier(), SchemaChange.addColumn("f3", 
DataTypes.STRING()), false);
+        FileStoreTable table = getTableDefault();
+        builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = builder.newWrite()) {
+            for (int i = 5; i < 10; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("a" + i),
+                                BinaryString.fromString("c" + i),
+                                BinaryString.fromString("e" + i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+
+        // Total files on the table.
+        assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+
+        // WHERE f3 = 'e7' -> only the post-ALTER file can match. The 
pre-ALTER file is
+        // pruned at planning because EvolutionStats encodes its missing f3 as 
all-NULL,
+        // letting LeafPredicate.test evaluate Equal against (min=null, 
max=null,
+        // nullCount=rowCount) and return false instead of falling through to
+        // "unknown stats -> keep".
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+        int f3Idx = table.rowType().getFieldIndex("f3");
+        Predicate filter = pb.equal(f3Idx, BinaryString.fromString("e7"));
+        assertThat(plannedFileCount(table, null, filter)).isEqualTo(1);
+    }
+
+    /**
+     * Central repro for the RENAME COLUMN bug fixed in this change. The 
renamed field's id is
+     * preserved across schemas, so a predicate on the latest name must still 
match rows in the
+     * pre-rename file (whose physical writeCols carry the old name). Before 
the fix, the
+     * single-entry filterByStats compared by name and dropped pre-rename 
files at the manifest
+     * layer.
+     */
+    @Test
+    public void testRenameColumnPredicateKeepsPreRenameRows() throws Exception 
{
+        createTableDefault();
+        Schema schema = schemaDefault();
+
+        // Pre-rename write: f2 carries the values that will later be queried 
as f3.
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(schema.rowType())) {
+            for (int i = 0; i < 5; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("a" + i),
+                                BinaryString.fromString("preR_" + i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+
+        catalog.alterTable(identifier(), SchemaChange.renameColumn("f2", 
"f3"), false);
+        FileStoreTable table = getTableDefault();
+        builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = builder.newWrite()) {
+            for (int i = 5; i < 10; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("a" + i),
+                                BinaryString.fromString("postR_" + i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+
+        // WHERE f3 LIKE 'preR_%' -> rows from the pre-rename file (5 rows).
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+        int f3Idx = table.rowType().getFieldIndex("f3");
+        ReadBuilder rb =
+                table.newReadBuilder()
+                        .withFilter(pb.startsWith(f3Idx, 
BinaryString.fromString("preR_")));
+        assertThat(countMatchingRows(rb)).isEqualTo(5);
+    }
+
+    /**
+     * Columnar-split: two files cover the same row id range, each carrying a 
different subset of
+     * columns. A query that projects only columns owned by one file should 
not read the other.
+     */
+    @Test
+    public void testNoFilterProjectionPrunesColumnarSplitFiles() throws 
Exception {
+        write(5);
+        FileStoreTable table = getTableDefault();
+        Schema schema = schemaDefault();
+        assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+
+        RowType readF0 = 
schema.rowType().project(Collections.singletonList("f0"));
+        assertThat(plannedFileCount(table, readF0, null)).isEqualTo(1);
+
+        RowType readF1 = 
schema.rowType().project(Collections.singletonList("f1"));
+        assertThat(plannedFileCount(table, readF1, null)).isEqualTo(1);
+
+        RowType readF2 = 
schema.rowType().project(Collections.singletonList("f2"));
+        assertThat(plannedFileCount(table, readF2, null)).isEqualTo(1);
+
+        RowType readF0F2 = schema.rowType().project(Arrays.asList("f0", "f2"));
+        assertThat(plannedFileCount(table, readF0F2, null)).isEqualTo(2);
+
+        assertThat(plannedFileCount(table, schema.rowType(), 
null)).isEqualTo(2);
+    }
+
+    /**
+     * Row-disjoint pre-ALTER files must not be dropped by the column-pruning 
logic — the reader
+     * needs them to emit rowCount NULL-filled rows for the projection.
+     */
+    @Test
+    public void testNoFilterProjectionKeepsRowDisjointFiles() throws Exception 
{
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        RowType writeType = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeType)) {
+            for (int i = 0; i < 5; i++) {
+                write.write(GenericRow.of(i, BinaryString.fromString("a" + 
i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+        builder = getTableDefault().newBatchWriteBuilder();
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(schema.rowType())) {
+            for (int i = 5; i < 10; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("a" + i),
+                                BinaryString.fromString("b" + i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+        FileStoreTable table = getTableDefault();
+
+        assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+
+        // Projecting f2 must still keep the pre-ALTER file as a row-count 
witness so
+        // the reader emits 5 NULL-filled rows for the pre-ALTER range.
+        RowType readF2 = 
schema.rowType().project(Collections.singletonList("f2"));
+        assertThat(plannedFileCount(table, readF2, null)).isEqualTo(2);
+    }
+
+    /**
+     * Columnar split + predicate on the file-A column: stats prune through 
file A's column, column
+     * pruning then drops file B from the kept group.
+     */
+    @Test
+    public void testColumnarSplitWithPredicateOnFileAColumn() throws Exception 
{
+        write(10);
+        FileStoreTable table = getTableDefault();
+        Schema schema = schemaDefault();
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+        int f0Idx = table.rowType().getFieldIndex("f0");
+        RowType readF0 = 
schema.rowType().project(Collections.singletonList("f0"));
+        assertThat(plannedFileCount(table, readF0, pb.greaterThan(f0Idx, 
5))).isEqualTo(1);
+        assertThat(plannedFileCount(table, readF0, pb.greaterThan(f0Idx, 
1000))).isEqualTo(0);
+    }
+
+    /**
+     * Columnar split + predicate on the file-B column: stats prune through 
file B's column, column
+     * pruning then drops file A from the kept group.
+     */
+    @Test
+    public void testColumnarSplitWithPredicateOnFileBColumn() throws Exception 
{
+        write(10);
+        FileStoreTable table = getTableDefault();
+        Schema schema = schemaDefault();
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+        int f2Idx = table.rowType().getFieldIndex("f2");
+        RowType readF2 = 
schema.rowType().project(Collections.singletonList("f2"));
+        assertThat(plannedFileCount(table, readF2, pb.equal(f2Idx, 
BinaryString.fromString("b5"))))
+                .isEqualTo(1);
+    }
+
+    /**
+     * Three-way columnar split: fileA{f0}, fileB{f1}, fileC{f2} share a row 
id range. A query that
+     * touches one column should retain exactly that one file.
+     */
+    @Test
+    public void testThreeWayColumnarSplitPruning() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+        RowType writeF0 = 
schema.rowType().project(Collections.singletonList("f0"));
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeF0)) {
+            for (int i = 0; i < 5; i++) {
+                write.write(GenericRow.of(i));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+
+        builder = getTableDefault().newBatchWriteBuilder();
+        RowType writeF1 = 
schema.rowType().project(Collections.singletonList("f1"));
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeF1)) {
+            for (int i = 0; i < 5; i++) {
+                write.write(GenericRow.of(BinaryString.fromString("f1_" + i)));
+            }
+            List<CommitMessage> msgs = write.prepareCommit();
+            setFirstRowId(msgs, 0L);
+            builder.newCommit().commit(msgs);
+        }
+
+        builder = getTableDefault().newBatchWriteBuilder();
+        RowType writeF2 = 
schema.rowType().project(Collections.singletonList("f2"));
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(writeF2)) {
+            for (int i = 0; i < 5; i++) {
+                write.write(GenericRow.of(BinaryString.fromString("f2_" + i)));
+            }
+            List<CommitMessage> msgs = write.prepareCommit();
+            setFirstRowId(msgs, 0L);
+            builder.newCommit().commit(msgs);
+        }
+
+        FileStoreTable table = getTableDefault();
+        assertThat(plannedFileCount(table, null, null)).isEqualTo(3);
+        assertThat(
+                        plannedFileCount(
+                                table,
+                                
schema.rowType().project(Collections.singletonList("f0")),
+                                null))
+                .isEqualTo(1);
+        assertThat(
+                        plannedFileCount(
+                                table,
+                                
schema.rowType().project(Collections.singletonList("f1")),
+                                null))
+                .isEqualTo(1);
+        assertThat(
+                        plannedFileCount(
+                                table,
+                                
schema.rowType().project(Collections.singletonList("f2")),
+                                null))
+                .isEqualTo(1);
+        assertThat(
+                        plannedFileCount(
+                                table, 
schema.rowType().project(Arrays.asList("f0", "f2")), null))
+                .isEqualTo(2);
+        assertThat(
+                        plannedFileCount(
+                                table, 
schema.rowType().project(Arrays.asList("f1", "f2")), null))
+                .isEqualTo(2);
+    }
+
+    /**
+     * A columnar-split group covering rows 0..4 (file A {f0,f1} + file B 
{f2}), plus a row-disjoint
+     * group at rows 5..9 (file C with the full schema). Per-group column 
pruning composes correctly
+     * across the two topologies.
+     */
+    @Test
+    public void testMixedColumnarSplitAndRowDisjoint() throws Exception {
+        write(5);
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(schema.rowType())) {
+            for (int i = 5; i < 10; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("a" + i),
+                                BinaryString.fromString("c" + i)));
+            }
+            builder.newCommit().commit(write.prepareCommit());
+        }
+        FileStoreTable table = getTableDefault();
+
+        assertThat(plannedFileCount(table, null, null)).isEqualTo(3);
+        RowType readF0 = 
schema.rowType().project(Collections.singletonList("f0"));
+        assertThat(plannedFileCount(table, readF0, null)).isEqualTo(2);
+        RowType readF2 = 
schema.rowType().project(Collections.singletonList("f2"));
+        assertThat(plannedFileCount(table, readF2, null)).isEqualTo(2);
+    }
+
+    /**
+     * System-field-only projection is filtered out of readType in
+     * DataEvolutionFileStoreScan.withReadType — readType stays null and
+     * postFilterManifestEntriesEnabled returns false. The column-pruning path 
is not entered, so
+     * every file in every group flows through unchanged.
+     */
+    @Test
+    public void testSystemFieldOnlyProjectionIsNotPruned() throws Exception {
+        write(5);
+        FileStoreTable table = getTableDefault();
+        assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+        assertThat(plannedFileCount(table, RowType.of(SpecialFields.ROW_ID), 
null)).isEqualTo(2);
+    }
+
+    private static int plannedFileCount(FileStoreTable table, RowType 
readType, Predicate filter) {
+        ReadBuilder rb = table.newReadBuilder();
+        if (readType != null) {
+            rb = rb.withReadType(readType);
+        }
+        if (filter != null) {
+            rb = rb.withFilter(filter);
+        }
+        return rb.newScan().plan().splits().stream()
+                .mapToInt(
+                        s ->
+                                s instanceof DataSplit
+                                        ? ((DataSplit) s).dataFiles().size()
+                                        : ((IndexedSplit) 
s).dataSplit().dataFiles().size())
+                .sum();
+    }
+
+    private static long countMatchingRows(ReadBuilder rb) throws Exception {
+        RecordReader<InternalRow> reader = 
rb.newRead().createReader(rb.newScan().plan());
+        AtomicInteger cnt = new AtomicInteger(0);
+        reader.forEachRemaining(r -> cnt.incrementAndGet());
+        reader.close();
+        return cnt.get();
+    }
+
     private Range assertContinuousRowIdRange(List<DataFileMeta> files) {
         files.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
         long start = files.get(0).nonNullFirstRowId();

Reply via email to