This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4e54917b06 [core] Fix DataEvolutionFileStoreScan schema-evolution
filtering (#8084)
4e54917b06 is described below
commit 4e54917b06cd603d9e6f3cbb79fec0694b3ae69e
Author: duanyyyyyyy <[email protected]>
AuthorDate: Thu Jun 4 13:53:43 2026 +0800
[core] Fix DataEvolutionFileStoreScan schema-evolution filtering (#8084)
---
.../apache/paimon/reader/DataEvolutionArray.java | 30 ++
.../operation/DataEvolutionFileStoreScan.java | 119 +++++--
.../paimon/table/DataEvolutionTableTest.java | 376 +++++++++++++++++++++
3 files changed, 490 insertions(+), 35 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
index 8b89c1ce7f..0995e84e96 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
@@ -31,14 +31,31 @@ import org.apache.paimon.data.variant.Variant;
/** The array which is made up by several rows. */
public class DataEvolutionArray implements InternalArray {
+ /** Sentinel for "no fallback"; positions with rowOffsets[pos] < 0 stay
null. */
+ public static final long NO_MISSING_FIELD_FALLBACK = Long.MIN_VALUE;
+
private final InternalArray[] rows;
private final int[] rowOffsets;
private final int[] fieldOffsets;
+ /**
+ * Value to return from getLong(pos) when {@code rowOffsets[pos] < 0}.
Used by data-evolution
+ * null-count arrays to encode "field not physically present in any file
in the group" as "all
+ * rowCount rows are null" instead of "unknown stats". {@link
#NO_MISSING_FIELD_FALLBACK}
+ * disables the fallback.
+ */
+ private final long missingFieldLong;
+
public DataEvolutionArray(int rowNumber, int[] rowOffsets, int[]
fieldOffsets) {
+ this(rowNumber, rowOffsets, fieldOffsets, NO_MISSING_FIELD_FALLBACK);
+ }
+
+ public DataEvolutionArray(
+ int rowNumber, int[] rowOffsets, int[] fieldOffsets, long
missingFieldLong) {
this.rows = new InternalArray[rowNumber];
this.rowOffsets = rowOffsets;
this.fieldOffsets = fieldOffsets;
+ this.missingFieldLong = missingFieldLong;
}
public void setRow(int pos, InternalArray row) {
@@ -73,6 +90,16 @@ public class DataEvolutionArray implements InternalArray {
@Override
public boolean isNullAt(int pos) {
+ // rowOffsets[pos] == -1: field is absent from every file in the
group, so every
+ // logical row is null for it; with missingFieldLong set this is
encoded as a
+ // known count rather than "unknown stats" (isNullAt=false), so
non-IS-NULL
+ // predicates can prune the file.
+ // rowOffsets[pos] == -2: field exists in a file but its stats were
not captured
+ // (e.g. valueStatsCols did not include it). Treat as unknown stats
so callers
+ // stay conservative.
+ if (rowOffsets[pos] == -1 && missingFieldLong !=
NO_MISSING_FIELD_FALLBACK) {
+ return false;
+ }
if (rowOffsets[pos] < 0) {
return true;
}
@@ -101,6 +128,9 @@ public class DataEvolutionArray implements InternalArray {
@Override
public long getLong(int pos) {
+ if (rowOffsets[pos] == -1 && missingFieldLong !=
NO_MISSING_FIELD_FALLBACK) {
+ return missingFieldLong;
+ }
return chooseArray(pos).getLong(offsetInRow(pos));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 514afff296..38ab9b0db0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -46,11 +46,13 @@ import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
@@ -60,16 +62,18 @@ import java.util.stream.Collectors;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** {@link FileStoreScan} for data-evolution enabled table. */
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
- private final ConcurrentMap<Pair<Long, List<String>>, List<String>>
fileFields;
-
private boolean dropStats = false;
@Nullable private RowType readType;
+ // Cache file's physical field id set per (schemaId, writeCols) to avoid
recomputing during
+ // per-file column pruning in postFilterManifestEntries.
+ private final ConcurrentMap<Pair<Long, List<String>>, Set<Integer>>
fileFieldIdsCache =
+ new ConcurrentHashMap<>();
+
public DataEvolutionFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
@@ -90,8 +94,6 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
false,
deletionVectorsEnabled,
true);
-
- this.fileFields = new ConcurrentHashMap<>();
}
@Override
@@ -175,21 +177,22 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
@Override
protected boolean postFilterManifestEntriesEnabled() {
- return inputFilter != null;
+ // Always enable post-filtering. The list filterByStats handles
predicate-based pruning
+ // and pruneByReadType strips per-file columns that are not requested
— both
+ // need row-id-range grouping that single filterByStats(ManifestEntry)
cannot see.
+ return inputFilter != null || readType != null;
}
@Override
protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
- checkNotNull(inputFilter);
-
// group by row id range
RangeHelper<ManifestEntry> rangeHelper =
new RangeHelper<>(e -> e.file().nonNullRowIdRange());
List<List<ManifestEntry>> splitByRowId =
rangeHelper.mergeOverlappingRanges(entries);
return splitByRowId.stream()
- .filter(this::filterByStats)
- .flatMap(Collection::stream)
+ .filter(group -> inputFilter == null || filterByStats(group))
+ .flatMap(group -> pruneByReadType(group).stream())
.map(entry -> dropStats ? dropStats(entry) : entry)
.collect(Collectors.toList());
}
@@ -200,6 +203,62 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
stats.rowCount(), stats.minValues(), stats.maxValues(),
stats.nullCounts());
}
+ /**
+ * Per-file column pruning within a row-id-range group: drop files whose
physical columns have
+ * no overlap with the query's {@code readType}. Necessary for
columnar-split DE scenarios where
+ * a logical row is reconstructed from multiple files in the same row id
range — a query that
+ * does not reference a file's columns has no reason to read it.
+ *
+ * <p>When every file in the group lacks a requested column (e.g. an ADD
COLUMN projection over
+ * a row-disjoint pre-ALTER group), one file is kept as a row-count
representative so the reader
+ * can emit the right number of NULL-filled rows.
+ */
+ private List<ManifestEntry> pruneByReadType(List<ManifestEntry> group) {
+ if (readType == null || group.size() <= 1) {
+ return group;
+ }
+ Set<Integer> readFieldIds = new HashSet<>();
+ for (DataField f : readType.getFields()) {
+ readFieldIds.add(f.id());
+ }
+ List<ManifestEntry> kept = new ArrayList<>(group.size());
+ for (ManifestEntry entry : group) {
+ Set<Integer> fileIds = fileFieldIdsForEntry(entry);
+ for (int id : readFieldIds) {
+ if (fileIds.contains(id)) {
+ kept.add(entry);
+ break;
+ }
+ }
+ }
+ // Group must contribute at least one file so the reader sees rowCount
and can NULL-fill
+ // missing columns for the projection's rows.
+ return kept.isEmpty() ? Collections.singletonList(group.get(0)) : kept;
+ }
+
+ private Set<Integer> fileFieldIdsForEntry(ManifestEntry entry) {
+ return fileFieldIdsCache.computeIfAbsent(
+ Pair.of(entry.file().schemaId(), entry.file().writeCols()),
+ pair -> computeFileFieldIds(this::scanTableSchema,
entry.file()));
+ }
+
+ /**
+ * Field ids of the columns physically present in {@code file}, resolved
through the file's own
+ * schema (i.e. the schema the file was written under). Field id, not
field name, is the stable
+ * identity across schemas — necessary so a renamed column matches an old
file written under the
+ * pre-rename name.
+ */
+ @VisibleForTesting
+ static Set<Integer> computeFileFieldIds(
+ Function<Long, TableSchema> scanTableSchema, DataFileMeta file) {
+ Set<Integer> ids = new HashSet<>();
+ for (DataField f :
+
scanTableSchema.apply(file.schemaId()).project(file.writeCols()).fields()) {
+ ids.add(f.id());
+ }
+ return ids;
+ }
+
/** TODO: Optimize implementation of this method. */
@VisibleForTesting
static EvolutionStats evolutionStats(
@@ -279,16 +338,20 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
}
}
+ long groupRowCount = metas.get(0).file().rowCount();
DataEvolutionRow finalMin = new DataEvolutionRow(metas.size(),
rowOffsets, fieldOffsets);
DataEvolutionRow finalMax = new DataEvolutionRow(metas.size(),
rowOffsets, fieldOffsets);
+ // For null-count specifically, a field absent from every file in the
group means every
+ // logical row is null for that field — encode as groupRowCount so
stats predicates can
+ // prune non-null comparisons (e.g. `extra2 = 'x'`) instead of falling
back to
+ // "unknown stats -> keep" in LeafPredicate.test.
DataEvolutionArray finalNullCounts =
- new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets);
+ new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets,
groupRowCount);
finalMin.setRows(min);
finalMax.setRows(max);
finalNullCounts.setRows(nullCounts);
- return new EvolutionStats(
- metas.get(0).file().rowCount(), finalMin, finalMax,
finalNullCounts);
+ return new EvolutionStats(groupRowCount, finalMin, finalMax,
finalNullCounts);
}
/** Note: Keep this thread-safe. */
@@ -296,27 +359,13 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
protected boolean filterByStats(ManifestEntry entry) {
DataFileMeta file = entry.file();
- if (readType != null) {
- boolean containsReadCol = false;
- List<String> fileFieldNmes =
- fileFields.computeIfAbsent(
- Pair.of(file.schemaId(), file.writeCols()),
- pair ->
- scanTableSchema(file.schemaId())
- .project(file.writeCols())
- .logicalRowType()
- .getFieldNames());
-
- for (String field : readType.getFieldNames()) {
- if (fileFieldNmes.contains(field)) {
- containsReadCol = true;
- break;
- }
- }
- if (!containsReadCol) {
- return false;
- }
- }
+ // Do not drop a file based on read-column intersection. For
data-evolution
+ // tables a field absent from a file is an implicit NULL across
rowCount()
+ // rows, and predicates such as `new_col IS NULL` should still match
those
+ // rows. Predicate-based stats pruning runs in
+ // filterByStats(List<ManifestEntry>), which evolves stats per file via
+ // DataEvolutionRow / DataEvolutionArray and correctly reports missing
+ // fields as null.
// If rowRanges is null, all entries should be kept
if (this.rowRangeIndex == null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index cc1b07fe6b..0b1799ac81 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -1541,6 +1541,382 @@ public class DataEvolutionTableTest extends
DataEvolutionTestBase {
assertThat(rows.get(2).getString(2).toString()).isEqualTo("b");
}
+ /**
+ * Central repro for the ADD COLUMN bug fixed in this change. Pre-ALTER
files do not carry the
+ * new column physically; {@code WHERE new_col IS NULL} must match every
pre-ALTER row. Before
+ * the fix, the single-entry filterByStats dropped pre-ALTER files at the
manifest layer and the
+ * predicate returned zero rows.
+ */
+ @Test
+ public void testAddColumnIsNullKeepsPreAlterRows() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+
+ // Pre-ALTER write: only (f0, f1).
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ RowType writeF0F1 = schema.rowType().project(Arrays.asList("f0",
"f1"));
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeF0F1)) {
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(i, BinaryString.fromString("a" +
i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+
+ // ADD COLUMN f3 (post-ALTER) and write a full-schema row at a fresh
row id.
+ catalog.alterTable(identifier(), SchemaChange.addColumn("f3",
DataTypes.STRING()), false);
+ FileStoreTable table = getTableDefault();
+ builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = builder.newWrite()) {
+ for (int i = 5; i < 10; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("a" + i),
+ BinaryString.fromString("c" + i),
+ BinaryString.fromString("e" + i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+
+ // WHERE f3 IS NULL -> pre-ALTER rows (5 of them).
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+ int f3Idx = table.rowType().getFieldIndex("f3");
+ ReadBuilder rb = table.newReadBuilder().withFilter(pb.isNull(f3Idx));
+ assertThat(countMatchingRows(rb)).isEqualTo(5);
+ }
+
+ /**
+ * Predicate-aware stats pruning for ADD COLUMN: WHERE new_col =
'something' cannot match
+ * pre-ALTER rows (their new_col is implicit NULL), so the pre-ALTER
manifest must be pruned at
+ * planning time. The all-NULL encoding in EvolutionStats /
DataEvolutionArray makes
+ * LeafPredicate.test drop the file via the leaf's normal decision instead
of falling back to
+ * "unknown stats -> keep".
+ */
+ @Test
+ public void testAddColumnEqualityPredicatePrunesPreAlterFiles() throws
Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+
+ // Pre-ALTER write: only (f0, f1).
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ RowType writeF0F1 = schema.rowType().project(Arrays.asList("f0",
"f1"));
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeF0F1)) {
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(i, BinaryString.fromString("a" +
i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+
+ catalog.alterTable(identifier(), SchemaChange.addColumn("f3",
DataTypes.STRING()), false);
+ FileStoreTable table = getTableDefault();
+ builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = builder.newWrite()) {
+ for (int i = 5; i < 10; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("a" + i),
+ BinaryString.fromString("c" + i),
+ BinaryString.fromString("e" + i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+
+ // Total files on the table.
+ assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+
+ // WHERE f3 = 'e7' -> only the post-ALTER file can match. The
pre-ALTER file is
+ // pruned at planning because EvolutionStats encodes its missing f3 as
all-NULL,
+ // letting LeafPredicate.test evaluate Equal against (min=null,
max=null,
+ // nullCount=rowCount) and return false instead of falling through to
+ // "unknown stats -> keep".
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+ int f3Idx = table.rowType().getFieldIndex("f3");
+ Predicate filter = pb.equal(f3Idx, BinaryString.fromString("e7"));
+ assertThat(plannedFileCount(table, null, filter)).isEqualTo(1);
+ }
+
+ /**
+ * Central repro for the RENAME COLUMN bug fixed in this change. The
renamed field's id is
+ * preserved across schemas, so a predicate on the latest name must still
match rows in the
+ * pre-rename file (whose physical writeCols carry the old name). Before
the fix, the
+ * single-entry filterByStats compared by name and dropped pre-rename
files at the manifest
+ * layer.
+ */
+ @Test
+ public void testRenameColumnPredicateKeepsPreRenameRows() throws Exception
{
+ createTableDefault();
+ Schema schema = schemaDefault();
+
+ // Pre-rename write: f2 carries the values that will later be queried
as f3.
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ for (int i = 0; i < 5; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("a" + i),
+ BinaryString.fromString("preR_" + i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+
+ catalog.alterTable(identifier(), SchemaChange.renameColumn("f2",
"f3"), false);
+ FileStoreTable table = getTableDefault();
+ builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = builder.newWrite()) {
+ for (int i = 5; i < 10; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("a" + i),
+ BinaryString.fromString("postR_" + i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+
+ // WHERE f3 LIKE 'preR_%' -> rows from the pre-rename file (5 rows).
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+ int f3Idx = table.rowType().getFieldIndex("f3");
+ ReadBuilder rb =
+ table.newReadBuilder()
+ .withFilter(pb.startsWith(f3Idx,
BinaryString.fromString("preR_")));
+ assertThat(countMatchingRows(rb)).isEqualTo(5);
+ }
+
+ /**
+ * Columnar-split: two files cover the same row id range, each carrying a
different subset of
+ * columns. A query that projects only columns owned by one file should
not read the other.
+ */
+ @Test
+ public void testNoFilterProjectionPrunesColumnarSplitFiles() throws
Exception {
+ write(5);
+ FileStoreTable table = getTableDefault();
+ Schema schema = schemaDefault();
+ assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+
+ RowType readF0 =
schema.rowType().project(Collections.singletonList("f0"));
+ assertThat(plannedFileCount(table, readF0, null)).isEqualTo(1);
+
+ RowType readF1 =
schema.rowType().project(Collections.singletonList("f1"));
+ assertThat(plannedFileCount(table, readF1, null)).isEqualTo(1);
+
+ RowType readF2 =
schema.rowType().project(Collections.singletonList("f2"));
+ assertThat(plannedFileCount(table, readF2, null)).isEqualTo(1);
+
+ RowType readF0F2 = schema.rowType().project(Arrays.asList("f0", "f2"));
+ assertThat(plannedFileCount(table, readF0F2, null)).isEqualTo(2);
+
+ assertThat(plannedFileCount(table, schema.rowType(),
null)).isEqualTo(2);
+ }
+
+ /**
+ * Row-disjoint pre-ALTER files must not be dropped by the column-pruning
logic — the reader
+ * needs them to emit rowCount NULL-filled rows for the projection.
+ */
+ @Test
+ public void testNoFilterProjectionKeepsRowDisjointFiles() throws Exception
{
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ RowType writeType = schema.rowType().project(Arrays.asList("f0",
"f1"));
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeType)) {
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(i, BinaryString.fromString("a" +
i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+ builder = getTableDefault().newBatchWriteBuilder();
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ for (int i = 5; i < 10; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("a" + i),
+ BinaryString.fromString("b" + i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+ FileStoreTable table = getTableDefault();
+
+ assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+
+ // Projecting f2 must still keep the pre-ALTER file as a row-count
witness so
+ // the reader emits 5 NULL-filled rows for the pre-ALTER range.
+ RowType readF2 =
schema.rowType().project(Collections.singletonList("f2"));
+ assertThat(plannedFileCount(table, readF2, null)).isEqualTo(2);
+ }
+
+ /**
+ * Columnar split + predicate on the file-A column: stats prune through
file A's column, column
+ * pruning then drops file B from the kept group.
+ */
+ @Test
+ public void testColumnarSplitWithPredicateOnFileAColumn() throws Exception
{
+ write(10);
+ FileStoreTable table = getTableDefault();
+ Schema schema = schemaDefault();
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+ int f0Idx = table.rowType().getFieldIndex("f0");
+ RowType readF0 =
schema.rowType().project(Collections.singletonList("f0"));
+ assertThat(plannedFileCount(table, readF0, pb.greaterThan(f0Idx,
5))).isEqualTo(1);
+ assertThat(plannedFileCount(table, readF0, pb.greaterThan(f0Idx,
1000))).isEqualTo(0);
+ }
+
+ /**
+ * Columnar split + predicate on the file-B column: stats prune through
file B's column, column
+ * pruning then drops file A from the kept group.
+ */
+ @Test
+ public void testColumnarSplitWithPredicateOnFileBColumn() throws Exception
{
+ write(10);
+ FileStoreTable table = getTableDefault();
+ Schema schema = schemaDefault();
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+ int f2Idx = table.rowType().getFieldIndex("f2");
+ RowType readF2 =
schema.rowType().project(Collections.singletonList("f2"));
+ assertThat(plannedFileCount(table, readF2, pb.equal(f2Idx,
BinaryString.fromString("b5"))))
+ .isEqualTo(1);
+ }
+
+ /**
+ * Three-way columnar split: fileA{f0}, fileB{f1}, fileC{f2} share a row
id range. A query that
+ * touches one column should retain exactly that one file.
+ */
+ @Test
+ public void testThreeWayColumnarSplitPruning() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+ RowType writeF0 =
schema.rowType().project(Collections.singletonList("f0"));
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeF0)) {
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(i));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+
+ builder = getTableDefault().newBatchWriteBuilder();
+ RowType writeF1 =
schema.rowType().project(Collections.singletonList("f1"));
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeF1)) {
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(BinaryString.fromString("f1_" + i)));
+ }
+ List<CommitMessage> msgs = write.prepareCommit();
+ setFirstRowId(msgs, 0L);
+ builder.newCommit().commit(msgs);
+ }
+
+ builder = getTableDefault().newBatchWriteBuilder();
+ RowType writeF2 =
schema.rowType().project(Collections.singletonList("f2"));
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(writeF2)) {
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(BinaryString.fromString("f2_" + i)));
+ }
+ List<CommitMessage> msgs = write.prepareCommit();
+ setFirstRowId(msgs, 0L);
+ builder.newCommit().commit(msgs);
+ }
+
+ FileStoreTable table = getTableDefault();
+ assertThat(plannedFileCount(table, null, null)).isEqualTo(3);
+ assertThat(
+ plannedFileCount(
+ table,
+
schema.rowType().project(Collections.singletonList("f0")),
+ null))
+ .isEqualTo(1);
+ assertThat(
+ plannedFileCount(
+ table,
+
schema.rowType().project(Collections.singletonList("f1")),
+ null))
+ .isEqualTo(1);
+ assertThat(
+ plannedFileCount(
+ table,
+
schema.rowType().project(Collections.singletonList("f2")),
+ null))
+ .isEqualTo(1);
+ assertThat(
+ plannedFileCount(
+ table,
schema.rowType().project(Arrays.asList("f0", "f2")), null))
+ .isEqualTo(2);
+ assertThat(
+ plannedFileCount(
+ table,
schema.rowType().project(Arrays.asList("f1", "f2")), null))
+ .isEqualTo(2);
+ }
+
+ /**
+ * A columnar-split group covering rows 0..4 (file A {f0,f1} + file B
{f2}), plus a row-disjoint
+ * group at rows 5..9 (file C with the full schema). Per-group column
pruning composes correctly
+ * across the two topologies.
+ */
+ @Test
+ public void testMixedColumnarSplitAndRowDisjoint() throws Exception {
+ write(5);
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ for (int i = 5; i < 10; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("a" + i),
+ BinaryString.fromString("c" + i)));
+ }
+ builder.newCommit().commit(write.prepareCommit());
+ }
+ FileStoreTable table = getTableDefault();
+
+ assertThat(plannedFileCount(table, null, null)).isEqualTo(3);
+ RowType readF0 =
schema.rowType().project(Collections.singletonList("f0"));
+ assertThat(plannedFileCount(table, readF0, null)).isEqualTo(2);
+ RowType readF2 =
schema.rowType().project(Collections.singletonList("f2"));
+ assertThat(plannedFileCount(table, readF2, null)).isEqualTo(2);
+ }
+
+ /**
+ * System-field-only projection is filtered out of readType in
+ * DataEvolutionFileStoreScan.withReadType — readType stays null and
+ * postFilterManifestEntriesEnabled returns false. The column-pruning path
is not entered, so
+ * every file in every group flows through unchanged.
+ */
+ @Test
+ public void testSystemFieldOnlyProjectionIsNotPruned() throws Exception {
+ write(5);
+ FileStoreTable table = getTableDefault();
+ assertThat(plannedFileCount(table, null, null)).isEqualTo(2);
+ assertThat(plannedFileCount(table, RowType.of(SpecialFields.ROW_ID),
null)).isEqualTo(2);
+ }
+
+ private static int plannedFileCount(FileStoreTable table, RowType
readType, Predicate filter) {
+ ReadBuilder rb = table.newReadBuilder();
+ if (readType != null) {
+ rb = rb.withReadType(readType);
+ }
+ if (filter != null) {
+ rb = rb.withFilter(filter);
+ }
+ return rb.newScan().plan().splits().stream()
+ .mapToInt(
+ s ->
+ s instanceof DataSplit
+ ? ((DataSplit) s).dataFiles().size()
+ : ((IndexedSplit)
s).dataSplit().dataFiles().size())
+ .sum();
+ }
+
+ private static long countMatchingRows(ReadBuilder rb) throws Exception {
+ RecordReader<InternalRow> reader =
rb.newRead().createReader(rb.newScan().plan());
+ AtomicInteger cnt = new AtomicInteger(0);
+ reader.forEachRemaining(r -> cnt.incrementAndGet());
+ reader.close();
+ return cnt.get();
+ }
+
private Range assertContinuousRowIdRange(List<DataFileMeta> files) {
files.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
long start = files.get(0).nonNullFirstRowId();