This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6fe06e3beb [core] Add comprehensive blob and data-evolution tests
6fe06e3beb is described below
commit 6fe06e3bebaf9c76d781a76bca84d72138b88026
Author: JingsongLi <[email protected]>
AuthorDate: Fri May 22 23:01:18 2026 +0800
[core] Add comprehensive blob and data-evolution tests
Add tests for MultipleBlobTableTest, BlobTableTest, and
DataEvolutionTableTest covering projection pushdown, null blobs, schema
evolution, compaction, multi-batch scenarios, multi-blob-field queries,
asymmetric compaction, and blob-only projection with row ranges.
---
.../org/apache/paimon/append/BlobTableTest.java | 673 +++++++++++++++++++++
.../paimon/append/MultipleBlobTableTest.java | 372 ++++++++++++
.../paimon/table/DataEvolutionTableTest.java | 471 ++++++++++++++
3 files changed, 1516 insertions(+)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 38aabeda6a..d8fd0cce9b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.data.BlobView;
import org.apache.paimon.data.BlobViewStruct;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
@@ -48,6 +49,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.RowTrackingTable;
import org.apache.paimon.types.DataField;
@@ -885,6 +887,677 @@ public class BlobTableTest extends TableTestBase {
});
}
+ @Test
+ public void testBlobProjectionExcludesBlobColumn() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 42, BinaryString.fromString("hello"), new
BlobData(blobBytes))));
+
+ Table table = getTableDefault();
+ // Project only f0 and f1 (indices 0, 1), excluding blob column f2
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1});
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getInt(0)).isEqualTo(42);
+ assertThat(row.getString(1).toString()).isEqualTo("hello");
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testBlobProjectionOnlyBlobColumn() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 42, BinaryString.fromString("hello"), new
BlobData(blobBytes))));
+
+ Table table = getTableDefault();
+ // Project only blob column f2 (index 2)
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {2});
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes);
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testNullBlobValues() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Arrays.asList(
+ GenericRow.of(0, BinaryString.fromString("a"), new
BlobData(blobBytes)),
+ GenericRow.of(1, BinaryString.fromString("b"), null),
+ GenericRow.of(2, BinaryString.fromString("c"), new
BlobData(blobBytes)),
+ GenericRow.of(3, BinaryString.fromString("d"), null),
+ GenericRow.of(4, BinaryString.fromString("e"), new
BlobData(blobBytes))));
+
+ Table table = getTableDefault();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+ InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+ assertThat(rows.size()).isEqualTo(5);
+
+ rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+ // Non-null blobs
+ assertThat(rows.get(0).getBlob(2).toData()).isEqualTo(blobBytes);
+ assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobBytes);
+ assertThat(rows.get(4).getBlob(2).toData()).isEqualTo(blobBytes);
+ // Null blobs
+ assertThat(rows.get(1).isNullAt(2)).isTrue();
+ assertThat(rows.get(3).isNullAt(2)).isTrue();
+ }
+
+ @Test
+ public void testBlobAsDescriptorReadMode() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1, BinaryString.fromString("test"), new
BlobData(blobBytes))));
+
+ FileStoreTable table = getTableDefault();
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true");
+ Table tableWithDescriptorMode = table.copy(options);
+
+ ReadBuilder readBuilder = tableWithDescriptorMode.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ Blob blob = row.getBlob(2);
+ // In descriptor mode, toDescriptor() should work
+ BlobDescriptor descriptor = blob.toDescriptor();
+ assertThat(descriptor).isNotNull();
+ assertThat(descriptor.uri()).isNotNull();
+ assertThat(descriptor.length()).isGreaterThan(0);
+ // The data should still be accessible via toData()
+ assertThat(blob.toData()).isEqualTo(blobBytes);
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testBlobCompactionSingleField() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+ schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ // Write multiple batches to create multiple blob files
+ commitDefault(writeDataDefault(50, 20));
+
+ FileStoreTable table = getTableDefault();
+ List<DataFileMeta> before =
+ table.store().newScan().plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+ long beforeBlobCount =
+ before.stream()
+ .filter(
+ file ->
+
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile(
+ file.fileName()))
+ .count();
+ assertThat(beforeBlobCount).isGreaterThan(1);
+
+ // Run blob compaction
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, true, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+
assertThat(tasks.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isTrue();
+
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+
+ // Read and verify data correctness
+ AtomicInteger readCount = new AtomicInteger(0);
+ readDefault(
+ row -> {
+ readCount.incrementAndGet();
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ });
+ assertThat(readCount.get()).isEqualTo(1000);
+
+ // Verify no more blob compaction tasks needed
+ table = getTableDefault();
+ coordinator = new DataEvolutionCompactCoordinator(table, true, false);
+ List<DataEvolutionCompactTask> tasks2;
+ try {
+ tasks2 = coordinator.plan();
+ } catch (EndOfScanException e) {
+ tasks2 = Collections.emptyList();
+ }
+
assertThat(tasks2.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isFalse();
+ }
+
+ @Test
+ public void testPartitionedTableWithBlob() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("pt", DataTypes.STRING());
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.BLOB());
+ schemaBuilder.partitionKeys("pt");
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ FileStoreTable table = getTableDefault();
+ byte[] blobP1 = randomBytes();
+ byte[] blobP2 = randomBytes();
+
+ writeRows(
+ table,
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("p1"), 1, new
BlobData(blobP1)),
+ GenericRow.of(BinaryString.fromString("p1"), 2, new
BlobData(blobP1))));
+ writeRows(
+ table,
+ Collections.singletonList(
+ GenericRow.of(BinaryString.fromString("p2"), 3, new
BlobData(blobP2))));
+
+ // Read all partitions
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+ assertThat(rows.size()).isEqualTo(3);
+
+ rows.sort((a, b) -> Integer.compare(a.getInt(1), b.getInt(1)));
+ // p1 rows
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("p1");
+ assertThat(rows.get(0).getBlob(2).toData()).isEqualTo(blobP1);
+ assertThat(rows.get(1).getString(0).toString()).isEqualTo("p1");
+ assertThat(rows.get(1).getBlob(2).toData()).isEqualTo(blobP1);
+ // p2 row
+ assertThat(rows.get(2).getString(0).toString()).isEqualTo("p2");
+ assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobP2);
+ }
+
+ @Test
+ public void testBlobWithRowRangeFilterMultipleFiles() throws Exception {
+ createTableDefault();
+
+ // Write enough rows to create multiple blob file rolls (100 rows per
batch * 10 batches)
+ commitDefault(writeDataDefault(100, 10));
+
+ Table table = getTableDefault();
+ // Read only rows in range [50, 60] — a narrow subset
+ ReadBuilder readBuilder =
+ table.newReadBuilder()
+ .withRowRanges(Collections.singletonList(new
Range(50L, 60L)));
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ });
+ assertThat(count.get()).isEqualTo(11);
+ }
+
+ @Test
+ public void testBlobConsumerFlushBehavior() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ List<BlobDescriptor> descriptors = new ArrayList<>();
+ AtomicInteger flushCount = new AtomicInteger(0);
+
+ try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) {
+ write.withBlobConsumer(
+ (blobFieldName, blobDescriptor) -> {
+ descriptors.add(blobDescriptor);
+ // Return true (flush) for every other blob
+ boolean shouldFlush = descriptors.size() % 2 == 0;
+ if (shouldFlush) {
+ flushCount.incrementAndGet();
+ }
+ return shouldFlush;
+ });
+ for (int i = 0; i < 5; i++) {
+ write.write(
+ GenericRow.of(
+ i, BinaryString.fromString("row" + i), new
BlobData(blobBytes)));
+ }
+ BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+ commit.commit(write.prepareCommit());
+ commit.close();
+ }
+
+ // All 5 descriptors collected
+ assertThat(descriptors.size()).isEqualTo(5);
+ assertThat(flushCount.get()).isEqualTo(2);
+
+ // All descriptors should be valid and point to blob data
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ for (BlobDescriptor desc : descriptors) {
+ assertThat(desc).isNotNull();
+ assertThat(Blob.fromDescriptor(uriReader,
desc).toData()).isEqualTo(blobBytes);
+ }
+ }
+
+ @Test
+ public void testMultipleBlobFieldsProjection() throws Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-proj-test.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("proj"),
+ new BlobData(blobBytes),
+ blobRef)));
+
+ // Project f0 and f2 only (indices 0, 2), skipping f1 and f3
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 2});
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getInt(0)).isEqualTo(1);
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes);
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMultipleBlobFieldsProjectOnlyDescriptorBlob() throws
Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-proj-desc.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("test"),
+ new BlobData(blobBytes),
+ blobRef)));
+
+ // Project only f3 (index 3) — the descriptor blob, skipping raw blob
f2
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {3});
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+
assertThat(row.getBlob(0).toDescriptor()).isEqualTo(descriptor);
+
assertThat(row.getBlob(0).toData()).isEqualTo(descriptorBytes);
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMultipleBlobFieldsProjectBothBlobs() throws Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-proj-both.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("test"),
+ new BlobData(blobBytes),
+ blobRef)));
+
+ // Project only f2 and f3 (indices 2, 3) — both blob columns, no
non-blob columns
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {2, 3});
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes);
+
assertThat(row.getBlob(1).toDescriptor()).isEqualTo(descriptor);
+
assertThat(row.getBlob(1).toData()).isEqualTo(descriptorBytes);
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMultipleBlobFieldsProjectStringAndDescriptorBlob() throws
Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-proj-str-desc.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("test"),
+ new BlobData(blobBytes),
+ blobRef)));
+
+ // Project f1 and f3 (indices 1, 3) — string + descriptor blob, skip
raw blob
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {1, 3});
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getString(0).toString()).isEqualTo("test");
+
assertThat(row.getBlob(1).toDescriptor()).isEqualTo(descriptor);
+
assertThat(row.getBlob(1).toData()).isEqualTo(descriptorBytes);
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMultipleBlobFieldsPartialNull() throws Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-partial-null.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Arrays.asList(
+ // f2=blob, f3=null
+ GenericRow.of(
+ 1, BinaryString.fromString("a"), new
BlobData(blobBytes), null),
+ // f2=null, f3=descriptor
+ GenericRow.of(2, BinaryString.fromString("b"), null,
blobRef),
+ // both non-null
+ GenericRow.of(
+ 3, BinaryString.fromString("c"), new
BlobData(blobBytes), blobRef),
+ // both null
+ GenericRow.of(4, BinaryString.fromString("d"), null,
null)));
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+ assertThat(rows.size()).isEqualTo(4);
+ rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+
+ // Row 1: f2=blob, f3=null
+ assertThat(rows.get(0).getBlob(2).toData()).isEqualTo(blobBytes);
+ assertThat(rows.get(0).isNullAt(3)).isTrue();
+
+ // Row 2: f2=null, f3=descriptor
+ assertThat(rows.get(1).isNullAt(2)).isTrue();
+
assertThat(rows.get(1).getBlob(3).toDescriptor()).isEqualTo(descriptor);
+
+ // Row 3: both non-null
+ assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobBytes);
+
assertThat(rows.get(2).getBlob(3).toDescriptor()).isEqualTo(descriptor);
+
+ // Row 4: both null
+ assertThat(rows.get(3).isNullAt(2)).isTrue();
+ assertThat(rows.get(3).isNullAt(3)).isTrue();
+ }
+
+ @Test
+ public void testMultipleBlobFieldsWithRowRangeFilter() throws Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-range-multi.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ List<InternalRow> inputRows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ inputRows.add(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("row" + i),
+ new BlobData(blobBytes),
+ blobRef));
+ }
+ writeDataDefault(inputRows);
+
+ // Read only rows [3, 5]
+ ReadBuilder readBuilder =
+
table.newReadBuilder().withRowRanges(Collections.singletonList(new Range(3L,
5L)));
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+ assertThat(rows.size()).isEqualTo(3);
+
+ rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+ for (int i = 0; i < 3; i++) {
+ assertThat(rows.get(i).getInt(0)).isEqualTo(i + 3);
+ assertThat(rows.get(i).getBlob(2).toData()).isEqualTo(blobBytes);
+
assertThat(rows.get(i).getBlob(3).toDescriptor()).isEqualTo(descriptor);
+ }
+ }
+
+ @Test
+ public void testMultipleRawBlobFieldsAllProjections() throws Exception {
+ // Create a table with two raw blob fields (both written to separate
.blob files)
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.BLOB());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ FileStoreTable table = getTableDefault();
+ byte[] blob1 = randomBytes();
+ byte[] blob2 = randomBytes();
+
+ writeRows(
+ table,
+ Arrays.asList(
+ GenericRow.of(1, new BlobData(blob1), new
BlobData(blob2)),
+ GenericRow.of(2, new BlobData(blob1), new
BlobData(blob2)),
+ GenericRow.of(3, new BlobData(blob1), new
BlobData(blob2))));
+
+ // Projection: f0 only
+ ReadBuilder rb = table.newReadBuilder().withProjection(new int[] {0});
+ RecordReader<InternalRow> reader =
rb.newRead().createReader(rb.newScan().plan());
+ InternalRowSerializer ser0 =
+ new InternalRowSerializer(table.rowType().project(new int[]
{0}));
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(r -> rows.add(ser0.copy(r)));
+ assertThat(rows.size()).isEqualTo(3);
+ rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+ for (int i = 0; i < 3; i++) {
+ assertThat(rows.get(i).getInt(0)).isEqualTo(i + 1);
+ }
+
+ // Projection: f1 only (first blob)
+ rb = table.newReadBuilder().withProjection(new int[] {1});
+ reader = rb.newRead().createReader(rb.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getBlob(0).toData()).isEqualTo(blob1);
+ });
+ assertThat(count.get()).isEqualTo(3);
+
+ // Projection: f2 only (second blob)
+ rb = table.newReadBuilder().withProjection(new int[] {2});
+ reader = rb.newRead().createReader(rb.newScan().plan());
+ count.set(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getBlob(0).toData()).isEqualTo(blob2);
+ });
+ assertThat(count.get()).isEqualTo(3);
+
+ // Projection: f0 + f2 (skip f1 blob)
+ rb = table.newReadBuilder().withProjection(new int[] {0, 2});
+ reader = rb.newRead().createReader(rb.newScan().plan());
+ InternalRowSerializer ser02 =
+ new InternalRowSerializer(table.rowType().project(new int[]
{0, 2}));
+ List<InternalRow> rows2 = new ArrayList<>();
+ reader.forEachRemaining(r -> rows2.add(ser02.copy(r)));
+ assertThat(rows2.size()).isEqualTo(3);
+ rows2.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+ for (int i = 0; i < 3; i++) {
+ assertThat(rows2.get(i).getInt(0)).isEqualTo(i + 1);
+ assertThat(rows2.get(i).getBlob(1).toData()).isEqualTo(blob2);
+ }
+ }
+
+ @Test
+ public void testMultipleBlobFieldsAsDescriptorReadMode() throws Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new
Path(tempPath.resolve("upstream-as-desc-multi.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0,
descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("desc-mode"),
+ new BlobData(blobBytes),
+ blobRef)));
+
+ // Read with BLOB_AS_DESCRIPTOR=true
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true");
+ Table tableDescMode = table.copy(options);
+ ReadBuilder readBuilder = tableDescMode.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ count.incrementAndGet();
+ // f2 (raw blob) — should now return a descriptor (lazy
reference)
+ Blob blob2 = row.getBlob(2);
+ BlobDescriptor desc2 = blob2.toDescriptor();
+ assertThat(desc2).isNotNull();
+ assertThat(desc2.uri()).contains(".blob");
+ assertThat(blob2.toData()).isEqualTo(blobBytes);
+
+ // f3 (descriptor blob) — already stored as descriptor,
should work
+ Blob blob3 = row.getBlob(3);
+ assertThat(blob3.toDescriptor()).isEqualTo(descriptor);
+ assertThat(blob3.toData()).isEqualTo(descriptorBytes);
+ });
+ assertThat(count.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testBlobViewResolutionFailsOnMissingUpstream() throws
Exception {
+ // Create a downstream table with blob-view-field
+ String downstreamName = "DownstreamViewFail";
+ Schema.Builder downstreamSchema = Schema.newBuilder();
+ downstreamSchema.column("id", DataTypes.INT());
+ downstreamSchema.column("image", DataTypes.BLOB());
+ downstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ downstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(),
"true");
+ downstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(),
"true");
+ downstreamSchema.option(CoreOptions.BLOB_FIELD.key(), "image");
+ downstreamSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image");
+ catalog.createTable(identifier(downstreamName),
downstreamSchema.build(), true);
+
+ FileStoreTable downstreamTable = getTable(identifier(downstreamName));
+
+ // Write a BlobView reference pointing to a non-existent upstream table
+ BlobViewStruct viewStruct =
+ new
BlobViewStruct(Identifier.fromString("default.NonExistent"), 2, 0L);
+ writeRows(
+ downstreamTable,
+ Collections.singletonList(GenericRow.of(1,
Blob.fromView(viewStruct))));
+
+ // Reading should fail because the upstream table doesn't exist
+ ReadBuilder readBuilder = downstreamTable.newReadBuilder();
+ assertThatThrownBy(() ->
readBuilder.newRead().createReader(readBuilder.newScan().plan()))
+ .isInstanceOf(RuntimeException.class);
+ }
+
private void createExternalStorageTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
index 7b5758909f..b3fa2450d2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -188,6 +189,377 @@ public class MultipleBlobTableTest extends TableTestBase {
new BlobData(blobBytes2));
}
+ @Test
+ public void testProjectOnlyTwoBlobFields() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ FileStoreTable table = getTableDefault();
+
+ // Project only blob fields: f2 (index 2) and f3 (index 3)
+ List<InternalRow> rows = read(table, new int[] {2, 3});
+
+ assertThat(rows.size()).isEqualTo(100);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+ }
+ }
+
+ @Test
+ public void testProjectOneBlobAndOneNonBlob() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ // Project f0 (INT, index 0) + f2 (BLOB, index 2)
+ FileStoreTable table = getTableDefault();
+ List<InternalRow> rows = read(table, new int[] {0, 2});
+
+ assertThat(rows.size()).isEqualTo(100);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes1);
+ }
+ }
+
+ @Test
+ public void testProjectSingleBlobField() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ // Project only f2 (BLOB, index 2)
+ FileStoreTable table = getTableDefault();
+ List<InternalRow> rows = read(table, new int[] {2});
+
+ assertThat(rows.size()).isEqualTo(100);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(1);
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+ }
+ }
+
+ @Test
+ public void testProjectOnlyNonBlobFields() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ // Project only non-blob fields: f0 (index 0) and f1 (index 1)
+ FileStoreTable table = getTableDefault();
+ List<InternalRow> rows = read(table, new int[] {0, 1});
+
+ assertThat(rows.size()).isEqualTo(100);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.isNullAt(0)).isFalse();
+ assertThat(row.isNullAt(1)).isFalse();
+ }
+ }
+
+ @Test
+ public void testProjectSecondBlobOnly() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ // Project only f3 (BLOB, index 3) — the second blob field
+ FileStoreTable table = getTableDefault();
+ List<InternalRow> rows = read(table, new int[] {3});
+
+ assertThat(rows.size()).isEqualTo(100);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(1);
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes2);
+ }
+ }
+
+ @Test
+ public void testMultiBatchThenReadBothBlobs() throws Exception {
+ createTableDefault();
+
+ // Write multiple batches
+ for (int i = 0; i < 5; i++) {
+ commitDefault(writeDataDefault(100, 1));
+ }
+
+ AtomicInteger count = new AtomicInteger(0);
+ readDefault(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes1);
+ assertThat(row.getBlob(3).toData()).isEqualTo(blobBytes2);
+ });
+ assertThat(count.get()).isEqualTo(500);
+ }
+
+ @Test
+ public void testDropOneBlobColumnKeepOther() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ // Drop f2, keep f3
+ catalog.alterTable(
+ identifier(), Arrays.asList(SchemaChange.dropColumn("f2")),
false);
+
+ FileStoreTable table = getTableDefault();
+ AtomicInteger count = new AtomicInteger(0);
+ readDefault(
+ row -> {
+ count.incrementAndGet();
+ assertThat(row.getFieldCount()).isEqualTo(3);
+ // f0, f1, f3 (f2 dropped)
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes2);
+ });
+ assertThat(count.get()).isEqualTo(100);
+ }
+
+ @Test
+ public void testNullBlobValues() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.column("f3", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+ schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ FileStoreTable table = getTableDefault();
+ org.apache.paimon.table.sink.BatchWriteBuilder builder =
table.newBatchWriteBuilder();
+ try (org.apache.paimon.table.sink.BatchTableWrite write =
builder.newWrite()) {
+ // Mix null and non-null blob values
+ write.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("a"),
+ new BlobData(blobBytes1),
+ null));
+ write.write(
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("b"),
+ null,
+ new BlobData(blobBytes2)));
+ write.write(
+ GenericRow.of(
+ 3,
+ BinaryString.fromString("c"),
+ new BlobData(blobBytes1),
+ new BlobData(blobBytes2)));
+ org.apache.paimon.table.sink.BatchTableCommit commit =
builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ List<InternalRow> rows = read(table, new int[] {0, 2, 3});
+ assertThat(rows.size()).isEqualTo(3);
+
+ rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+ // Row 1: f2=blobBytes1, f3=null
+ assertThat(rows.get(0).getBlob(1).toData()).isEqualTo(blobBytes1);
+ assertThat(rows.get(0).isNullAt(2)).isTrue();
+ // Row 2: f2=null, f3=blobBytes2
+ assertThat(rows.get(1).isNullAt(1)).isTrue();
+ assertThat(rows.get(1).getBlob(2).toData()).isEqualTo(blobBytes2);
+ // Row 3: f2=blobBytes1, f3=blobBytes2
+ assertThat(rows.get(2).getBlob(1).toData()).isEqualTo(blobBytes1);
+ assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobBytes2);
+ }
+
+ @Test
+ public void testProjectTwoBlobsAfterCompaction() throws Exception {
+ createTableDefault();
+
+ // Write enough batches to trigger compaction
+ commitDefault(writeDataDefault(50, 20));
+
+ FileStoreTable table = getTableDefault();
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, true, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+
+ // Project only both blob fields after compaction
+ table = getTableDefault();
+ List<InternalRow> rows = read(table, new int[] {2, 3});
+
+ assertThat(rows.size()).isEqualTo(1000);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+ }
+ }
+
+ @Test
+ public void testAddBlobColumnThenProjectBothBlobs() throws Exception {
+ // Start with table having only one blob column
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+ schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ // Write data with only f2 blob
+ FileStoreTable table = getTableDefault();
+ org.apache.paimon.table.sink.BatchWriteBuilder builder =
table.newBatchWriteBuilder();
+ try (org.apache.paimon.table.sink.BatchTableWrite write =
builder.newWrite()) {
+ for (int i = 0; i < 50; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("row" + i),
+ new BlobData(blobBytes1)));
+ }
+ org.apache.paimon.table.sink.BatchTableCommit commit =
builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ // Add new blob column f3
+ catalog.alterTable(
+ identifier(),
+ Collections.singletonList(SchemaChange.addColumn("f3",
DataTypes.BLOB())),
+ false);
+
+ // Write more data with both f2 and f3
+ table = getTableDefault();
+ builder = table.newBatchWriteBuilder();
+ try (org.apache.paimon.table.sink.BatchTableWrite write =
builder.newWrite()) {
+ for (int i = 50; i < 100; i++) {
+ write.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("row" + i),
+ new BlobData(blobBytes1),
+ new BlobData(blobBytes2)));
+ }
+ org.apache.paimon.table.sink.BatchTableCommit commit =
builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ // Project only both blob fields: f2 (index 2) and f3 (index 3)
+ table = getTableDefault();
+ List<InternalRow> rows = read(table, new int[] {2, 3});
+
+ assertThat(rows.size()).isEqualTo(100);
+ // Sort by checking which rows have null f3
+ int nullF3Count = 0;
+ int nonNullF3Count = 0;
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+ if (row.isNullAt(1)) {
+ nullF3Count++;
+ } else {
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+ nonNullF3Count++;
+ }
+ }
+ // First 50 rows have null f3 (added before schema evolution)
+ assertThat(nullF3Count).isEqualTo(50);
+ assertThat(nonNullF3Count).isEqualTo(50);
+ }
+
+ @Test
+ public void testAsymmetricCompactionThenProjectBothBlobs() throws
Exception {
+ // Use smaller blob target to produce more blob files per field
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.column("f3", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+ schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "3");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ // Write multiple batches — each batch creates separate blob files
+ FileStoreTable table = getTableDefault();
+ org.apache.paimon.table.sink.BatchWriteBuilder builder =
table.newBatchWriteBuilder();
+ for (int batch = 0; batch < 6; batch++) {
+ try (org.apache.paimon.table.sink.BatchTableWrite write =
builder.newWrite()) {
+ for (int i = 0; i < 10; i++) {
+ write.write(
+ GenericRow.of(
+ batch * 10 + i,
+ BinaryString.fromString("v" + i),
+ new BlobData(blobBytes1),
+ new BlobData(blobBytes2)));
+ }
+ org.apache.paimon.table.sink.BatchTableCommit commit =
builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ table = getTableDefault();
+
+ // Run compaction
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, true, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+ if (!tasks.isEmpty()) {
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+ }
+
+ // Project only both blob fields after compaction
+ table = getTableDefault();
+ List<InternalRow> rows = read(table, new int[] {2, 3});
+ assertThat(rows.size()).isEqualTo(60);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+ }
+ }
+
+ @Test
+ public void testBlobOnlyProjectionWithRowRanges() throws Exception {
+ createTableDefault();
+
+ // Write 1000 rows (will create ~10 blob files per field)
+ commitDefault(writeDataDefault(1000, 1));
+
+ FileStoreTable table = getTableDefault();
+
+ // Read with row ranges [100, 199] — only blob projection
+ org.apache.paimon.table.source.ReadBuilder readBuilder =
table.newReadBuilder();
+ readBuilder.withProjection(new int[] {2, 3});
+ readBuilder.withRowRanges(
+ Arrays.asList(new org.apache.paimon.utils.Range(100, 199)));
+ org.apache.paimon.reader.RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+
+ assertThat(rows.size()).isEqualTo(100);
+ for (InternalRow row : rows) {
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+ }
+ }
+
@Override
protected byte[] randomBytes() {
byte[] binary = new byte[2 * 1024 * 124];
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 14b7f7a5c7..cc1b07fe6b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.index.IndexFileMeta;
@@ -37,6 +38,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.DataEvolutionFileReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -45,6 +47,7 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -1070,6 +1073,474 @@ public class DataEvolutionTableTest extends
DataEvolutionTestBase {
assertThat(path4.toString()).isEqualTo(testExternalpath2);
}
+ @Test
+ public void testProjectionPushdown() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+ // Write f0 and f1 together
+ RowType writeType0 = schema.rowType().project(Arrays.asList("f0",
"f1"));
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(1, BinaryString.fromString("a")));
+ write0.write(GenericRow.of(2, BinaryString.fromString("b")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ commit.commit(commitables);
+ }
+
+ // Write f2 separately with same firstRowId
+ RowType writeType1 =
schema.rowType().project(Collections.singletonList("f2"));
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(GenericRow.of(BinaryString.fromString("x")));
+ write1.write(GenericRow.of(BinaryString.fromString("y")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ // Project only f0 - should filter out the f2-only file
+ ReadBuilder readBuilder =
getTableDefault().newReadBuilder().withProjection(new int[] {0});
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ DataSplit dataSplit = (DataSplit) plan.splits().get(0);
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+ RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan);
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows.size()).isEqualTo(2);
+
+ // Project only f2 - should filter out the f0,f1-only file
+ readBuilder = getTableDefault().newReadBuilder().withProjection(new
int[] {2});
+ plan = readBuilder.newScan().plan();
+ dataSplit = (DataSplit) plan.splits().get(0);
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+ reader = readBuilder.newRead().createReader(plan);
+ rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows.size()).isEqualTo(2);
+
+ // Project f0 and f2 (skip f1) - needs both files
+ readBuilder = getTableDefault().newReadBuilder().withProjection(new
int[] {0, 2});
+ plan = readBuilder.newScan().plan();
+ dataSplit = (DataSplit) plan.splits().get(0);
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(2);
+ reader = readBuilder.newRead().createReader(plan);
+ RowType projectedType = schema.rowType().project(Arrays.asList("f0",
"f2"));
+ InternalRowSerializer serializer = new
InternalRowSerializer(projectedType);
+ List<InternalRow> projectedRows = new ArrayList<>();
+ reader.forEachRemaining(r -> projectedRows.add(serializer.copy(r)));
+ assertThat(projectedRows.size()).isEqualTo(2);
+ projectedRows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ assertThat(projectedRows.get(0).getInt(0)).isEqualTo(1);
+
assertThat(projectedRows.get(0).getString(1).toString()).isEqualTo("x");
+ assertThat(projectedRows.get(1).getInt(0)).isEqualTo(2);
+
assertThat(projectedRows.get(1).getString(1).toString()).isEqualTo("y");
+ }
+
+ @Test
+ public void testSequenceNumberConflictResolution() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+ // Write all columns first
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ write.write(
+ GenericRow.of(1, BinaryString.fromString("a"),
BinaryString.fromString("old")));
+ write.write(
+ GenericRow.of(2, BinaryString.fromString("b"),
BinaryString.fromString("old")));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ // Overwrite f2 with a new value (higher sequence number)
+ RowType writeType1 =
schema.rowType().project(Collections.singletonList("f2"));
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(GenericRow.of(BinaryString.fromString("new")));
+ write1.write(GenericRow.of(BinaryString.fromString("new")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ // Read and verify f2 shows the new value (higher seq number wins)
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(schema.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+ assertThat(rows.size()).isEqualTo(2);
+ rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+ assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("new");
+ assertThat(rows.get(1).getInt(0)).isEqualTo(2);
+ assertThat(rows.get(1).getString(1).toString()).isEqualTo("b");
+ assertThat(rows.get(1).getString(2).toString()).isEqualTo("new");
+ }
+
+ @Test
+ public void testMultipleOverwritesSameColumn() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+ // Write initial data with all columns
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ write.write(
+ GenericRow.of(
+ 1, BinaryString.fromString("a"),
BinaryString.fromString("version1")));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ // Overwrite f2 - second time
+ RowType writeType1 =
schema.rowType().project(Collections.singletonList("f2"));
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(GenericRow.of(BinaryString.fromString("version2")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ // Overwrite f2 - third time
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(GenericRow.of(BinaryString.fromString("version3")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ // Read and verify only the latest version is visible
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows.size()).isEqualTo(1);
+ assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+ assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("version3");
+ }
+
+ @Test
+ public void testCompactThenReadCorrectness() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ write(100000L);
+ }
+ FileStoreTable table = getTableDefault();
+
+ // Run compaction
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false, false);
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ List<DataEvolutionCompactTask> tasks;
+ try {
+ while (!(tasks = coordinator.plan()).isEmpty()) {
+ for (DataEvolutionCompactTask task : tasks) {
+ commitMessages.add(task.doCompact(table, "test-compact"));
+ }
+ }
+ } catch (EndOfScanException ignore) {
+ }
+ assertThat(commitMessages.isEmpty()).isFalse();
+ table.newBatchWriteBuilder().newCommit().commit(commitMessages);
+
+ // Verify data after compaction
+ Schema schema = schemaDefault();
+ table = getTableDefault();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan);
+ InternalRowSerializer serializer = new
InternalRowSerializer(schema.rowType());
+ List<InternalRow> rowsAfter = new ArrayList<>();
+ reader.forEachRemaining(r -> rowsAfter.add(serializer.copy(r)));
+ assertThat(rowsAfter.size()).isEqualTo(500000);
+
+ // Each write produces rows with f0=0..99999, so 5 writes gives 5
copies of each
+ rowsAfter.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ // First 5 rows should all have f0=0 with correct f1 and f2
+ for (int i = 0; i < 5; i++) {
+ assertThat(rowsAfter.get(i).getInt(0)).isEqualTo(0);
+
assertThat(rowsAfter.get(i).getString(1).toString()).isEqualTo("a0");
+
assertThat(rowsAfter.get(i).getString(2).toString()).isEqualTo("b0");
+ }
+ // Spot check other values
+ for (int i = 5; i < 10; i++) {
+ assertThat(rowsAfter.get(i).getInt(0)).isEqualTo(1);
+
assertThat(rowsAfter.get(i).getString(1).toString()).isEqualTo("a1");
+
assertThat(rowsAfter.get(i).getString(2).toString()).isEqualTo("b1");
+ }
+
+ // After compaction, only 1 file should remain
+ assertThat(plan.splits().size()).isEqualTo(1);
+ DataSplit dataSplit = (DataSplit) plan.splits().get(0);
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testStreamingRead() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ FileStoreTable table = getTableDefault();
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ StreamTableScan streamScan = readBuilder.newStreamScan();
+
+ // Initial plan should be empty (no snapshots yet), or return existing
data
+ // Write first batch - full row
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ write.write(
+ GenericRow.of(1, BinaryString.fromString("a"),
BinaryString.fromString("b")));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ // Streaming plan should return the new data
+ TableScan.Plan plan = streamScan.plan();
+ RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan);
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows.size()).isEqualTo(1);
+ assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+ assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("b");
+
+ // Write a partial-column append for new row
+ RowType writeType0 = schema.rowType().project(Arrays.asList("f0",
"f1"));
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(2, BinaryString.fromString("c")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ setFirstRowId(commitables, 1L);
+ commit.commit(commitables);
+ }
+
+ // Streaming plan returns only the new incremental data
+ plan = streamScan.plan();
+ reader = readBuilder.newRead().createReader(plan);
+ List<InternalRow> rows2 = new ArrayList<>();
+ reader.forEachRemaining(rows2::add);
+ assertThat(rows2.size()).isEqualTo(1);
+ assertThat(rows2.get(0).getInt(0)).isEqualTo(2);
+ assertThat(rows2.get(0).getString(1).toString()).isEqualTo("c");
+ assertThat(rows2.get(0).isNullAt(2)).isTrue();
+ }
+
+ @Test
+ public void testPartitionedTable() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("pt", DataTypes.STRING());
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.STRING());
+ schemaBuilder.partitionKeys("pt");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.ROW_TRACKING_PARTITION_GROUP_ON_COMMIT.key(),
"true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+ FileStoreTable table = getTableDefault();
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+ RowType fullType = table.rowType();
+ RowType writeType0 = fullType.project(Arrays.asList("pt", "f0", "f1"));
+ RowType writeType1 = fullType.project(Arrays.asList("pt", "f2"));
+
+ // Write f0, f1 for partition p1 only
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(
+ GenericRow.of(BinaryString.fromString("p1"), 1,
BinaryString.fromString("a")));
+ write0.write(
+ GenericRow.of(BinaryString.fromString("p1"), 2,
BinaryString.fromString("b")));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write0.prepareCommit());
+ }
+
+ long p1RowId =
+ table.snapshotManager().latestSnapshot().nextRowId() - 2; // 2
rows written for p1
+
+ // Write f2 for p1 with matching firstRowId
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(
+ GenericRow.of(BinaryString.fromString("p1"),
BinaryString.fromString("x")));
+ write1.write(
+ GenericRow.of(BinaryString.fromString("p1"),
BinaryString.fromString("y")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, p1RowId);
+ commit.commit(commitables);
+ }
+
+ // Write f0, f1 for partition p2
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(
+ GenericRow.of(BinaryString.fromString("p2"), 3,
BinaryString.fromString("c")));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write0.prepareCommit());
+ }
+
+ long p2RowId =
+ table.snapshotManager().latestSnapshot().nextRowId() - 1; // 1
row written to p2
+
+ // Write f2 for p2 with matching firstRowId
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(
+ GenericRow.of(BinaryString.fromString("p2"),
BinaryString.fromString("z")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, p2RowId);
+ commit.commit(commitables);
+ }
+
+ // Read all and verify
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new InternalRowSerializer(fullType);
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+ rows.sort(Comparator.comparingInt(r -> r.getInt(1)));
+
+ assertThat(rows.size()).isEqualTo(3);
+ // p1 rows
+ assertThat(rows.get(0).getString(0).toString()).isEqualTo("p1");
+ assertThat(rows.get(0).getInt(1)).isEqualTo(1);
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("a");
+ assertThat(rows.get(0).getString(3).toString()).isEqualTo("x");
+
+ assertThat(rows.get(1).getString(0).toString()).isEqualTo("p1");
+ assertThat(rows.get(1).getInt(1)).isEqualTo(2);
+ assertThat(rows.get(1).getString(2).toString()).isEqualTo("b");
+ assertThat(rows.get(1).getString(3).toString()).isEqualTo("y");
+
+ // p2 row
+ assertThat(rows.get(2).getString(0).toString()).isEqualTo("p2");
+ assertThat(rows.get(2).getInt(1)).isEqualTo(3);
+ assertThat(rows.get(2).getString(2).toString()).isEqualTo("c");
+ assertThat(rows.get(2).getString(3).toString()).isEqualTo("z");
+ }
+
+ @Test
+ public void testSchemaEvolution() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+ // Write initial data with original schema (f0, f1, f2)
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(schema.rowType())) {
+ write.write(
+ GenericRow.of(1, BinaryString.fromString("a"),
BinaryString.fromString("b")));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ // Add a new column f3
+ catalog.alterTable(identifier(), SchemaChange.addColumn("f3",
DataTypes.STRING()), false);
+
+ // Reload table to pick up new schema
+ FileStoreTable table = getTableDefault();
+ builder = table.newBatchWriteBuilder();
+
+ // Write data with new schema (f0, f1, f2, f3)
+ try (BatchTableWrite write = builder.newWrite()) {
+ write.write(
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("c"),
+ BinaryString.fromString("d"),
+ BinaryString.fromString("e")));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write.prepareCommit());
+ }
+
+ // Read and verify - old rows should have null for f3
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+
+ assertThat(rows.size()).isEqualTo(2);
+ // Old row - f3 should be null
+ assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+ assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("b");
+ assertThat(rows.get(0).isNullAt(3)).isTrue();
+
+ // New row - all columns present
+ assertThat(rows.get(1).getInt(0)).isEqualTo(2);
+ assertThat(rows.get(1).getString(1).toString()).isEqualTo("c");
+ assertThat(rows.get(1).getString(2).toString()).isEqualTo("d");
+ assertThat(rows.get(1).getString(3).toString()).isEqualTo("e");
+ }
+
+ @Test
+ public void testReadAfterMultipleAppendsToDifferentColumnSets() throws
Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+ // Commit 1: Write only f0 for row 0
+ RowType writeType0 =
schema.rowType().project(Collections.singletonList("f0"));
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(1));
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write0.prepareCommit());
+ }
+
+ // Commit 2: Write only f1 for row 1 (different row)
+ RowType writeType1 =
schema.rowType().project(Collections.singletonList("f1"));
+ try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
+ write1.write(GenericRow.of(BinaryString.fromString("a")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write1.prepareCommit();
+ setFirstRowId(commitables, 1L);
+ commit.commit(commitables);
+ }
+
+ // Commit 3: Write only f2 for row 2 (different row)
+ RowType writeType2 =
schema.rowType().project(Collections.singletonList("f2"));
+ try (BatchTableWrite write2 =
builder.newWrite().withWriteType(writeType2)) {
+ write2.write(GenericRow.of(BinaryString.fromString("b")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write2.prepareCommit();
+ setFirstRowId(commitables, 2L);
+ commit.commit(commitables);
+ }
+
+ // Read all rows - each row should have only its written column with
others null
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+
+ assertThat(rows.size()).isEqualTo(3);
+
+ // Row 0: only f0 is set
+ assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+ assertThat(rows.get(0).isNullAt(1)).isTrue();
+ assertThat(rows.get(0).isNullAt(2)).isTrue();
+
+ // Row 1: only f1 is set
+ assertThat(rows.get(1).isNullAt(0)).isTrue();
+ assertThat(rows.get(1).getString(1).toString()).isEqualTo("a");
+ assertThat(rows.get(1).isNullAt(2)).isTrue();
+
+ // Row 2: only f2 is set
+ assertThat(rows.get(2).isNullAt(0)).isTrue();
+ assertThat(rows.get(2).isNullAt(1)).isTrue();
+ assertThat(rows.get(2).getString(2).toString()).isEqualTo("b");
+ }
+
private Range assertContinuousRowIdRange(List<DataFileMeta> files) {
files.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
long start = files.get(0).nonNullFirstRowId();