This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5dda7da972 [mosaic] Skip row group read for all-missing projection
(#8157)
5dda7da972 is described below
commit 5dda7da972ab0f9c4a6a4c9a8c86438e33430df0
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 8 12:09:10 2026 +0800
[mosaic] Skip row group read for all-missing projection (#8157)
When all projected columns are missing from a Mosaic file due to schema
evolution, the reader currently does not push any native projection. It
still calls `readRowGroup`, which may read physical file columns even
though the final result only contains
NULL values.
This PR adds a Mosaic reader fast path for all-missing projections. The
reader still uses row group metadata for row counts and predicate
pruning, but returns an all-NULL batch directly when the row group
matches, avoiding the native row group read.
---
.../paimon/format/mosaic/MosaicRecordsReader.java | 42 ++++++++++++++++++++++
.../format/mosaic/MosaicRecordsReaderTest.java | 39 ++++++++++++++++++++
2 files changed, 81 insertions(+)
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
index 6a81d8aad8..d31dc8c3e8 100644
---
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
@@ -60,6 +60,8 @@ public class MosaicRecordsReader implements
FileRecordReader<InternalRow> {
private final BufferAllocator allocator;
private final int numRowGroups;
private final RowType dataSchemaRowType;
+ private final int projectedFieldCount;
+ private final boolean allProjectedColumnsMissing;
@Nullable private final List<Predicate> predicates;
private int currentRowGroup;
@@ -96,12 +98,14 @@ public class MosaicRecordsReader implements
FileRecordReader<InternalRow> {
this.filePath = filePath;
this.inputFileAdapter = inputFileAdapter;
this.dataSchemaRowType = dataSchemaRowType;
+ this.projectedFieldCount = projectedRowType.getFieldCount();
this.predicates = predicates;
this.allocator = allocator;
MosaicReader createdReader = null;
int createdNumRowGroups;
ArrowBatchReader createdArrowBatchReader;
+ boolean createdAllProjectedColumnsMissing = false;
try {
createdReader = nativeReaderOpener.open(inputFileAdapter,
fileSize, allocator);
@@ -117,6 +121,7 @@ public class MosaicRecordsReader implements
FileRecordReader<InternalRow> {
existingColumns.add(name);
}
}
+ createdAllProjectedColumnsMissing = existingColumns.isEmpty();
if (!existingColumns.isEmpty()) {
createdReader.project(existingColumns.toArray(new String[0]));
}
@@ -130,6 +135,7 @@ public class MosaicRecordsReader implements
FileRecordReader<InternalRow> {
this.reader = createdReader;
this.numRowGroups = createdNumRowGroups;
+ this.allProjectedColumnsMissing = createdAllProjectedColumnsMissing;
this.currentRowGroup = 0;
this.arrowBatchReader = createdArrowBatchReader;
}
@@ -147,6 +153,11 @@ public class MosaicRecordsReader implements
FileRecordReader<InternalRow> {
releaseCurrentVsr();
+ if (allProjectedColumnsMissing) {
+ currentRowGroup++;
+ return allNullIterator(numRows);
+ }
+
VectorSchemaRoot vsr = reader.readRowGroup(currentRowGroup,
allocator);
currentRowGroup++;
this.currentVsr = vsr;
@@ -183,6 +194,37 @@ public class MosaicRecordsReader implements
FileRecordReader<InternalRow> {
return null;
}
+ private FileRecordIterator<InternalRow> allNullIterator(int numRows) {
+ GenericRow row = new GenericRow(projectedFieldCount);
+ return new FileRecordIterator<InternalRow>() {
+ private int position;
+
+ @Override
+ public long returnedPosition() {
+ return returnedPosition;
+ }
+
+ @Override
+ public Path filePath() {
+ return filePath;
+ }
+
+ @Nullable
+ @Override
+ public InternalRow next() {
+ if (position < numRows) {
+ position++;
+ returnedPosition++;
+ return row;
+ }
+ return null;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
private boolean matchesRowGroup(int rowGroupIndex, long rowCount) {
if (predicates == null || predicates.isEmpty()) {
return true;
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
index f2725dc675..45bd36c46b 100644
---
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
@@ -18,10 +18,12 @@
package org.apache.paimon.format.mosaic;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.mosaic.MosaicReader;
+import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -34,8 +36,11 @@ import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -163,6 +168,24 @@ class MosaicRecordsReaderTest {
assertThat(inputStream.closeCount()).isEqualTo(1);
}
+ @Test
+ void testAllProjectedColumnsMissingSkipsRowGroupRead() throws IOException {
+ CloseCountingSeekableInputStream inputStream = new
CloseCountingSeekableInputStream();
+ MosaicInputFileAdapter inputFileAdapter =
createInputFileAdapter(inputStream);
+ CloseCountingRootAllocator allocator = new
CloseCountingRootAllocator();
+ MosaicReader reader = createReader();
+ when(reader.numRowGroups()).thenReturn(1);
+ when(reader.rowGroupNumRows(0)).thenReturn(3);
+
+ MosaicRecordsReader recordsReader =
+ createRecordsReader(inputFileAdapter, allocator, reader);
+
+ assertThat(readerBatchSize(recordsReader)).isEqualTo(3);
+ verify(reader, never()).readRowGroup(anyInt(), any());
+
+ recordsReader.close();
+ }
+
private static MosaicInputFileAdapter createInputFileAdapter(
CloseCountingSeekableInputStream inputStream) throws IOException {
return new MosaicInputFileAdapter(
@@ -190,6 +213,22 @@ class MosaicRecordsReaderTest {
return reader;
}
+ private static int readerBatchSize(MosaicRecordsReader recordsReader)
throws IOException {
+ int count = 0;
+ while (true) {
+ FileRecordIterator<InternalRow> batch = recordsReader.readBatch();
+ if (batch == null) {
+ return count;
+ }
+ InternalRow row;
+ while ((row = batch.next()) != null) {
+ assertThat(row.isNullAt(0)).isTrue();
+ count++;
+ }
+ batch.releaseBatch();
+ }
+ }
+
private static RowType rowType() {
return DataTypes.ROW(DataTypes.INT());
}