This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fe06e3beb [core] Add comprehensive blob and data-evolution tests
6fe06e3beb is described below

commit 6fe06e3bebaf9c76d781a76bca84d72138b88026
Author: JingsongLi <[email protected]>
AuthorDate: Fri May 22 23:01:18 2026 +0800

    [core] Add comprehensive blob and data-evolution tests
    
    Add tests for MultipleBlobTableTest, BlobTableTest, and
    DataEvolutionTableTest covering projection pushdown, null blobs, schema
    evolution, compaction, multi-batch scenarios, multi-blob-field queries,
    asymmetric compaction, and blob-only projection with row ranges.
---
 .../org/apache/paimon/append/BlobTableTest.java    | 673 +++++++++++++++++++++
 .../paimon/append/MultipleBlobTableTest.java       | 372 ++++++++++++
 .../paimon/table/DataEvolutionTableTest.java       | 471 ++++++++++++++
 3 files changed, 1516 insertions(+)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 38aabeda6a..d8fd0cce9b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.data.BlobView;
 import org.apache.paimon.data.BlobViewStruct;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
@@ -48,6 +49,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.system.RowTrackingTable;
 import org.apache.paimon.types.DataField;
@@ -885,6 +887,677 @@ public class BlobTableTest extends TableTestBase {
                         });
     }
 
+    @Test
+    public void testBlobProjectionExcludesBlobColumn() throws Exception {
+        createTableDefault();
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                42, BinaryString.fromString("hello"), new 
BlobData(blobBytes))));
+
+        Table table = getTableDefault();
+        // Project only f0 and f1 (indices 0, 1), excluding blob column f2
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1});
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getInt(0)).isEqualTo(42);
+                    assertThat(row.getString(1).toString()).isEqualTo("hello");
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testBlobProjectionOnlyBlobColumn() throws Exception {
+        createTableDefault();
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                42, BinaryString.fromString("hello"), new 
BlobData(blobBytes))));
+
+        Table table = getTableDefault();
+        // Project only blob column f2 (index 2)
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {2});
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes);
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testNullBlobValues() throws Exception {
+        createTableDefault();
+        writeDataDefault(
+                Arrays.asList(
+                        GenericRow.of(0, BinaryString.fromString("a"), new 
BlobData(blobBytes)),
+                        GenericRow.of(1, BinaryString.fromString("b"), null),
+                        GenericRow.of(2, BinaryString.fromString("c"), new 
BlobData(blobBytes)),
+                        GenericRow.of(3, BinaryString.fromString("d"), null),
+                        GenericRow.of(4, BinaryString.fromString("e"), new 
BlobData(blobBytes))));
+
+        Table table = getTableDefault();
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+        assertThat(rows.size()).isEqualTo(5);
+
+        rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+        // Non-null blobs
+        assertThat(rows.get(0).getBlob(2).toData()).isEqualTo(blobBytes);
+        assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobBytes);
+        assertThat(rows.get(4).getBlob(2).toData()).isEqualTo(blobBytes);
+        // Null blobs
+        assertThat(rows.get(1).isNullAt(2)).isTrue();
+        assertThat(rows.get(3).isNullAt(2)).isTrue();
+    }
+
+    @Test
+    public void testBlobAsDescriptorReadMode() throws Exception {
+        createTableDefault();
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                1, BinaryString.fromString("test"), new 
BlobData(blobBytes))));
+
+        FileStoreTable table = getTableDefault();
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true");
+        Table tableWithDescriptorMode = table.copy(options);
+
+        ReadBuilder readBuilder = tableWithDescriptorMode.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    Blob blob = row.getBlob(2);
+                    // In descriptor mode, toDescriptor() should work
+                    BlobDescriptor descriptor = blob.toDescriptor();
+                    assertThat(descriptor).isNotNull();
+                    assertThat(descriptor.uri()).isNotNull();
+                    assertThat(descriptor.length()).isGreaterThan(0);
+                    // The data should still be accessible via toData()
+                    assertThat(blob.toData()).isEqualTo(blobBytes);
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testBlobCompactionSingleField() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+        schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        // Write multiple batches to create multiple blob files
+        commitDefault(writeDataDefault(50, 20));
+
+        FileStoreTable table = getTableDefault();
+        List<DataFileMeta> before =
+                table.store().newScan().plan().files().stream()
+                        .map(ManifestEntry::file)
+                        .collect(Collectors.toList());
+        long beforeBlobCount =
+                before.stream()
+                        .filter(
+                                file ->
+                                        
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile(
+                                                file.fileName()))
+                        .count();
+        assertThat(beforeBlobCount).isGreaterThan(1);
+
+        // Run blob compaction
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, true, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        
assertThat(tasks.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isTrue();
+
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        // Read and verify data correctness
+        AtomicInteger readCount = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    readCount.incrementAndGet();
+                    assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+                });
+        assertThat(readCount.get()).isEqualTo(1000);
+
+        // Verify no more blob compaction tasks needed
+        table = getTableDefault();
+        coordinator = new DataEvolutionCompactCoordinator(table, true, false);
+        List<DataEvolutionCompactTask> tasks2;
+        try {
+            tasks2 = coordinator.plan();
+        } catch (EndOfScanException e) {
+            tasks2 = Collections.emptyList();
+        }
+        
assertThat(tasks2.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isFalse();
+    }
+
+    @Test
+    public void testPartitionedTableWithBlob() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("pt", DataTypes.STRING());
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.BLOB());
+        schemaBuilder.partitionKeys("pt");
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        FileStoreTable table = getTableDefault();
+        byte[] blobP1 = randomBytes();
+        byte[] blobP2 = randomBytes();
+
+        writeRows(
+                table,
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("p1"), 1, new 
BlobData(blobP1)),
+                        GenericRow.of(BinaryString.fromString("p1"), 2, new 
BlobData(blobP1))));
+        writeRows(
+                table,
+                Collections.singletonList(
+                        GenericRow.of(BinaryString.fromString("p2"), 3, new 
BlobData(blobP2))));
+
+        // Read all partitions
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+        assertThat(rows.size()).isEqualTo(3);
+
+        rows.sort((a, b) -> Integer.compare(a.getInt(1), b.getInt(1)));
+        // p1 rows
+        assertThat(rows.get(0).getString(0).toString()).isEqualTo("p1");
+        assertThat(rows.get(0).getBlob(2).toData()).isEqualTo(blobP1);
+        assertThat(rows.get(1).getString(0).toString()).isEqualTo("p1");
+        assertThat(rows.get(1).getBlob(2).toData()).isEqualTo(blobP1);
+        // p2 row
+        assertThat(rows.get(2).getString(0).toString()).isEqualTo("p2");
+        assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobP2);
+    }
+
+    @Test
+    public void testBlobWithRowRangeFilterMultipleFiles() throws Exception {
+        createTableDefault();
+
+        // Write enough rows to create multiple blob file rolls (100 rows per 
batch * 10 batches)
+        commitDefault(writeDataDefault(100, 10));
+
+        Table table = getTableDefault();
+        // Read only rows in range [50, 60] — a narrow subset
+        ReadBuilder readBuilder =
+                table.newReadBuilder()
+                        .withRowRanges(Collections.singletonList(new 
Range(50L, 60L)));
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+                });
+        assertThat(count.get()).isEqualTo(11);
+    }
+
+    @Test
+    public void testBlobConsumerFlushBehavior() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+        List<BlobDescriptor> descriptors = new ArrayList<>();
+        AtomicInteger flushCount = new AtomicInteger(0);
+
+        try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) {
+            write.withBlobConsumer(
+                    (blobFieldName, blobDescriptor) -> {
+                        descriptors.add(blobDescriptor);
+                        // Return true (flush) for every other blob
+                        boolean shouldFlush = descriptors.size() % 2 == 0;
+                        if (shouldFlush) {
+                            flushCount.incrementAndGet();
+                        }
+                        return shouldFlush;
+                    });
+            for (int i = 0; i < 5; i++) {
+                write.write(
+                        GenericRow.of(
+                                i, BinaryString.fromString("row" + i), new 
BlobData(blobBytes)));
+            }
+            BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+            commit.commit(write.prepareCommit());
+            commit.close();
+        }
+
+        // All 5 descriptors collected
+        assertThat(descriptors.size()).isEqualTo(5);
+        assertThat(flushCount.get()).isEqualTo(2);
+
+        // All descriptors should be valid and point to blob data
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        for (BlobDescriptor desc : descriptors) {
+            assertThat(desc).isNotNull();
+            assertThat(Blob.fromDescriptor(uriReader, 
desc).toData()).isEqualTo(blobBytes);
+        }
+    }
+
+    @Test
+    public void testMultipleBlobFieldsProjection() throws Exception {
+        createMixedModeTable();
+        FileStoreTable table = getTableDefault();
+
+        byte[] descriptorBytes = randomBytes();
+        Path external = new 
Path(tempPath.resolve("upstream-proj-test.bin").toString());
+        writeFile(table.fileIO(), external, descriptorBytes);
+
+        BlobDescriptor descriptor =
+                new BlobDescriptor(external.toString(), 0, 
descriptorBytes.length);
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("proj"),
+                                new BlobData(blobBytes),
+                                blobRef)));
+
+        // Project f0 and f2 only (indices 0, 2), skipping f1 and f3
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 2});
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getInt(0)).isEqualTo(1);
+                    assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes);
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testMultipleBlobFieldsProjectOnlyDescriptorBlob() throws 
Exception {
+        createMixedModeTable();
+        FileStoreTable table = getTableDefault();
+
+        byte[] descriptorBytes = randomBytes();
+        Path external = new 
Path(tempPath.resolve("upstream-proj-desc.bin").toString());
+        writeFile(table.fileIO(), external, descriptorBytes);
+
+        BlobDescriptor descriptor =
+                new BlobDescriptor(external.toString(), 0, 
descriptorBytes.length);
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("test"),
+                                new BlobData(blobBytes),
+                                blobRef)));
+
+        // Project only f3 (index 3) — the descriptor blob, skipping raw blob 
f2
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {3});
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    
assertThat(row.getBlob(0).toDescriptor()).isEqualTo(descriptor);
+                    
assertThat(row.getBlob(0).toData()).isEqualTo(descriptorBytes);
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testMultipleBlobFieldsProjectBothBlobs() throws Exception {
+        createMixedModeTable();
+        FileStoreTable table = getTableDefault();
+
+        byte[] descriptorBytes = randomBytes();
+        Path external = new 
Path(tempPath.resolve("upstream-proj-both.bin").toString());
+        writeFile(table.fileIO(), external, descriptorBytes);
+
+        BlobDescriptor descriptor =
+                new BlobDescriptor(external.toString(), 0, 
descriptorBytes.length);
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("test"),
+                                new BlobData(blobBytes),
+                                blobRef)));
+
+        // Project only f2 and f3 (indices 2, 3) — both blob columns, no 
non-blob columns
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {2, 3});
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes);
+                    
assertThat(row.getBlob(1).toDescriptor()).isEqualTo(descriptor);
+                    
assertThat(row.getBlob(1).toData()).isEqualTo(descriptorBytes);
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testMultipleBlobFieldsProjectStringAndDescriptorBlob() throws 
Exception {
+        createMixedModeTable();
+        FileStoreTable table = getTableDefault();
+
+        byte[] descriptorBytes = randomBytes();
+        Path external = new 
Path(tempPath.resolve("upstream-proj-str-desc.bin").toString());
+        writeFile(table.fileIO(), external, descriptorBytes);
+
+        BlobDescriptor descriptor =
+                new BlobDescriptor(external.toString(), 0, 
descriptorBytes.length);
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("test"),
+                                new BlobData(blobBytes),
+                                blobRef)));
+
+        // Project f1 and f3 (indices 1, 3) — string + descriptor blob, skip 
raw blob
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {1, 3});
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getString(0).toString()).isEqualTo("test");
+                    
assertThat(row.getBlob(1).toDescriptor()).isEqualTo(descriptor);
+                    
assertThat(row.getBlob(1).toData()).isEqualTo(descriptorBytes);
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testMultipleBlobFieldsPartialNull() throws Exception {
+        createMixedModeTable();
+        FileStoreTable table = getTableDefault();
+
+        byte[] descriptorBytes = randomBytes();
+        Path external = new 
Path(tempPath.resolve("upstream-partial-null.bin").toString());
+        writeFile(table.fileIO(), external, descriptorBytes);
+
+        BlobDescriptor descriptor =
+                new BlobDescriptor(external.toString(), 0, 
descriptorBytes.length);
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+        writeDataDefault(
+                Arrays.asList(
+                        // f2=blob, f3=null
+                        GenericRow.of(
+                                1, BinaryString.fromString("a"), new 
BlobData(blobBytes), null),
+                        // f2=null, f3=descriptor
+                        GenericRow.of(2, BinaryString.fromString("b"), null, 
blobRef),
+                        // both non-null
+                        GenericRow.of(
+                                3, BinaryString.fromString("c"), new 
BlobData(blobBytes), blobRef),
+                        // both null
+                        GenericRow.of(4, BinaryString.fromString("d"), null, 
null)));
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+        assertThat(rows.size()).isEqualTo(4);
+        rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+
+        // Row 1: f2=blob, f3=null
+        assertThat(rows.get(0).getBlob(2).toData()).isEqualTo(blobBytes);
+        assertThat(rows.get(0).isNullAt(3)).isTrue();
+
+        // Row 2: f2=null, f3=descriptor
+        assertThat(rows.get(1).isNullAt(2)).isTrue();
+        
assertThat(rows.get(1).getBlob(3).toDescriptor()).isEqualTo(descriptor);
+
+        // Row 3: both non-null
+        assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobBytes);
+        
assertThat(rows.get(2).getBlob(3).toDescriptor()).isEqualTo(descriptor);
+
+        // Row 4: both null
+        assertThat(rows.get(3).isNullAt(2)).isTrue();
+        assertThat(rows.get(3).isNullAt(3)).isTrue();
+    }
+
+    @Test
+    public void testMultipleBlobFieldsWithRowRangeFilter() throws Exception {
+        createMixedModeTable();
+        FileStoreTable table = getTableDefault();
+
+        byte[] descriptorBytes = randomBytes();
+        Path external = new 
Path(tempPath.resolve("upstream-range-multi.bin").toString());
+        writeFile(table.fileIO(), external, descriptorBytes);
+
+        BlobDescriptor descriptor =
+                new BlobDescriptor(external.toString(), 0, 
descriptorBytes.length);
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+        List<InternalRow> inputRows = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            inputRows.add(
+                    GenericRow.of(
+                            i,
+                            BinaryString.fromString("row" + i),
+                            new BlobData(blobBytes),
+                            blobRef));
+        }
+        writeDataDefault(inputRows);
+
+        // Read only rows [3, 5]
+        ReadBuilder readBuilder =
+                
table.newReadBuilder().withRowRanges(Collections.singletonList(new Range(3L, 
5L)));
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+        assertThat(rows.size()).isEqualTo(3);
+
+        rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+        for (int i = 0; i < 3; i++) {
+            assertThat(rows.get(i).getInt(0)).isEqualTo(i + 3);
+            assertThat(rows.get(i).getBlob(2).toData()).isEqualTo(blobBytes);
+            
assertThat(rows.get(i).getBlob(3).toDescriptor()).isEqualTo(descriptor);
+        }
+    }
+
+    @Test
+    public void testMultipleRawBlobFieldsAllProjections() throws Exception {
+        // Create a table with two raw blob fields (both written to separate 
.blob files)
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.BLOB());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        FileStoreTable table = getTableDefault();
+        byte[] blob1 = randomBytes();
+        byte[] blob2 = randomBytes();
+
+        writeRows(
+                table,
+                Arrays.asList(
+                        GenericRow.of(1, new BlobData(blob1), new 
BlobData(blob2)),
+                        GenericRow.of(2, new BlobData(blob1), new 
BlobData(blob2)),
+                        GenericRow.of(3, new BlobData(blob1), new 
BlobData(blob2))));
+
+        // Projection: f0 only
+        ReadBuilder rb = table.newReadBuilder().withProjection(new int[] {0});
+        RecordReader<InternalRow> reader = 
rb.newRead().createReader(rb.newScan().plan());
+        InternalRowSerializer ser0 =
+                new InternalRowSerializer(table.rowType().project(new int[] 
{0}));
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(r -> rows.add(ser0.copy(r)));
+        assertThat(rows.size()).isEqualTo(3);
+        rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+        for (int i = 0; i < 3; i++) {
+            assertThat(rows.get(i).getInt(0)).isEqualTo(i + 1);
+        }
+
+        // Projection: f1 only (first blob)
+        rb = table.newReadBuilder().withProjection(new int[] {1});
+        reader = rb.newRead().createReader(rb.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getBlob(0).toData()).isEqualTo(blob1);
+                });
+        assertThat(count.get()).isEqualTo(3);
+
+        // Projection: f2 only (second blob)
+        rb = table.newReadBuilder().withProjection(new int[] {2});
+        reader = rb.newRead().createReader(rb.newScan().plan());
+        count.set(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getBlob(0).toData()).isEqualTo(blob2);
+                });
+        assertThat(count.get()).isEqualTo(3);
+
+        // Projection: f0 + f2 (skip f1 blob)
+        rb = table.newReadBuilder().withProjection(new int[] {0, 2});
+        reader = rb.newRead().createReader(rb.newScan().plan());
+        InternalRowSerializer ser02 =
+                new InternalRowSerializer(table.rowType().project(new int[] 
{0, 2}));
+        List<InternalRow> rows2 = new ArrayList<>();
+        reader.forEachRemaining(r -> rows2.add(ser02.copy(r)));
+        assertThat(rows2.size()).isEqualTo(3);
+        rows2.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+        for (int i = 0; i < 3; i++) {
+            assertThat(rows2.get(i).getInt(0)).isEqualTo(i + 1);
+            assertThat(rows2.get(i).getBlob(1).toData()).isEqualTo(blob2);
+        }
+    }
+
+    @Test
+    public void testMultipleBlobFieldsAsDescriptorReadMode() throws Exception {
+        createMixedModeTable();
+        FileStoreTable table = getTableDefault();
+
+        byte[] descriptorBytes = randomBytes();
+        Path external = new 
Path(tempPath.resolve("upstream-as-desc-multi.bin").toString());
+        writeFile(table.fileIO(), external, descriptorBytes);
+
+        BlobDescriptor descriptor =
+                new BlobDescriptor(external.toString(), 0, 
descriptorBytes.length);
+        UriReader uriReader = UriReader.fromFile(table.fileIO());
+        Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("desc-mode"),
+                                new BlobData(blobBytes),
+                                blobRef)));
+
+        // Read with BLOB_AS_DESCRIPTOR=true
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true");
+        Table tableDescMode = table.copy(options);
+        ReadBuilder readBuilder = tableDescMode.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    count.incrementAndGet();
+                    // f2 (raw blob) — should now return a descriptor (lazy 
reference)
+                    Blob blob2 = row.getBlob(2);
+                    BlobDescriptor desc2 = blob2.toDescriptor();
+                    assertThat(desc2).isNotNull();
+                    assertThat(desc2.uri()).contains(".blob");
+                    assertThat(blob2.toData()).isEqualTo(blobBytes);
+
+                    // f3 (descriptor blob) — already stored as descriptor, 
should work
+                    Blob blob3 = row.getBlob(3);
+                    assertThat(blob3.toDescriptor()).isEqualTo(descriptor);
+                    assertThat(blob3.toData()).isEqualTo(descriptorBytes);
+                });
+        assertThat(count.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void testBlobViewResolutionFailsOnMissingUpstream() throws 
Exception {
+        // Create a downstream table with blob-view-field
+        String downstreamName = "DownstreamViewFail";
+        Schema.Builder downstreamSchema = Schema.newBuilder();
+        downstreamSchema.column("id", DataTypes.INT());
+        downstreamSchema.column("image", DataTypes.BLOB());
+        downstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        downstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), 
"true");
+        downstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), 
"true");
+        downstreamSchema.option(CoreOptions.BLOB_FIELD.key(), "image");
+        downstreamSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image");
+        catalog.createTable(identifier(downstreamName), 
downstreamSchema.build(), true);
+
+        FileStoreTable downstreamTable = getTable(identifier(downstreamName));
+
+        // Write a BlobView reference pointing to a non-existent upstream table
+        BlobViewStruct viewStruct =
+                new 
BlobViewStruct(Identifier.fromString("default.NonExistent"), 2, 0L);
+        writeRows(
+                downstreamTable,
+                Collections.singletonList(GenericRow.of(1, 
Blob.fromView(viewStruct))));
+
+        // Reading should fail because the upstream table doesn't exist
+        ReadBuilder readBuilder = downstreamTable.newReadBuilder();
+        assertThatThrownBy(() -> 
readBuilder.newRead().createReader(readBuilder.newScan().plan()))
+                .isInstanceOf(RuntimeException.class);
+    }
+
     private void createExternalStorageTable() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
index 7b5758909f..b3fa2450d2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -188,6 +189,377 @@ public class MultipleBlobTableTest extends TableTestBase {
                 new BlobData(blobBytes2));
     }
 
+    @Test
+    public void testProjectOnlyTwoBlobFields() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        FileStoreTable table = getTableDefault();
+
+        // Project only blob fields: f2 (index 2) and f3 (index 3)
+        List<InternalRow> rows = read(table, new int[] {2, 3});
+
+        assertThat(rows.size()).isEqualTo(100);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+            assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+        }
+    }
+
+    @Test
+    public void testProjectOneBlobAndOneNonBlob() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        // Project f0 (INT, index 0) + f2 (BLOB, index 2)
+        FileStoreTable table = getTableDefault();
+        List<InternalRow> rows = read(table, new int[] {0, 2});
+
+        assertThat(rows.size()).isEqualTo(100);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes1);
+        }
+    }
+
+    @Test
+    public void testProjectSingleBlobField() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        // Project only f2 (BLOB, index 2)
+        FileStoreTable table = getTableDefault();
+        List<InternalRow> rows = read(table, new int[] {2});
+
+        assertThat(rows.size()).isEqualTo(100);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(1);
+            assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+        }
+    }
+
+    @Test
+    public void testProjectOnlyNonBlobFields() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        // Project only non-blob fields: f0 (index 0) and f1 (index 1)
+        FileStoreTable table = getTableDefault();
+        List<InternalRow> rows = read(table, new int[] {0, 1});
+
+        assertThat(rows.size()).isEqualTo(100);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            assertThat(row.isNullAt(0)).isFalse();
+            assertThat(row.isNullAt(1)).isFalse();
+        }
+    }
+
+    @Test
+    public void testProjectSecondBlobOnly() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        // Project only f3 (BLOB, index 3) — the second blob field
+        FileStoreTable table = getTableDefault();
+        List<InternalRow> rows = read(table, new int[] {3});
+
+        assertThat(rows.size()).isEqualTo(100);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(1);
+            assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes2);
+        }
+    }
+
+    @Test
+    public void testMultiBatchThenReadBothBlobs() throws Exception {
+        createTableDefault();
+
+        // Write multiple batches
+        for (int i = 0; i < 5; i++) {
+            commitDefault(writeDataDefault(100, 1));
+        }
+
+        AtomicInteger count = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes1);
+                    assertThat(row.getBlob(3).toData()).isEqualTo(blobBytes2);
+                });
+        assertThat(count.get()).isEqualTo(500);
+    }
+
+    @Test
+    public void testDropOneBlobColumnKeepOther() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        // Drop f2, keep f3
+        catalog.alterTable(
+                identifier(), Arrays.asList(SchemaChange.dropColumn("f2")), 
false);
+
+        FileStoreTable table = getTableDefault();
+        AtomicInteger count = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    count.incrementAndGet();
+                    assertThat(row.getFieldCount()).isEqualTo(3);
+                    // f0, f1, f3 (f2 dropped)
+                    assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes2);
+                });
+        assertThat(count.get()).isEqualTo(100);
+    }
+
+    @Test
+    public void testNullBlobValues() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.column("f3", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+        schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        FileStoreTable table = getTableDefault();
+        org.apache.paimon.table.sink.BatchWriteBuilder builder = 
table.newBatchWriteBuilder();
+        try (org.apache.paimon.table.sink.BatchTableWrite write = 
builder.newWrite()) {
+            // Mix null and non-null blob values
+            write.write(
+                    GenericRow.of(
+                            1,
+                            BinaryString.fromString("a"),
+                            new BlobData(blobBytes1),
+                            null));
+            write.write(
+                    GenericRow.of(
+                            2,
+                            BinaryString.fromString("b"),
+                            null,
+                            new BlobData(blobBytes2)));
+            write.write(
+                    GenericRow.of(
+                            3,
+                            BinaryString.fromString("c"),
+                            new BlobData(blobBytes1),
+                            new BlobData(blobBytes2)));
+            org.apache.paimon.table.sink.BatchTableCommit commit = 
builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        List<InternalRow> rows = read(table, new int[] {0, 2, 3});
+        assertThat(rows.size()).isEqualTo(3);
+
+        rows.sort((a, b) -> Integer.compare(a.getInt(0), b.getInt(0)));
+        // Row 1: f2=blobBytes1, f3=null
+        assertThat(rows.get(0).getBlob(1).toData()).isEqualTo(blobBytes1);
+        assertThat(rows.get(0).isNullAt(2)).isTrue();
+        // Row 2: f2=null, f3=blobBytes2
+        assertThat(rows.get(1).isNullAt(1)).isTrue();
+        assertThat(rows.get(1).getBlob(2).toData()).isEqualTo(blobBytes2);
+        // Row 3: f2=blobBytes1, f3=blobBytes2
+        assertThat(rows.get(2).getBlob(1).toData()).isEqualTo(blobBytes1);
+        assertThat(rows.get(2).getBlob(2).toData()).isEqualTo(blobBytes2);
+    }
+
+    @Test
+    public void testProjectTwoBlobsAfterCompaction() throws Exception {
+        createTableDefault();
+
+        // Write enough batches to trigger compaction
+        commitDefault(writeDataDefault(50, 20));
+
+        FileStoreTable table = getTableDefault();
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, true, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        // Project only both blob fields after compaction
+        table = getTableDefault();
+        List<InternalRow> rows = read(table, new int[] {2, 3});
+
+        assertThat(rows.size()).isEqualTo(1000);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+            assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+        }
+    }
+
+    @Test
+    public void testAddBlobColumnThenProjectBothBlobs() throws Exception {
+        // Start with table having only one blob column
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+        schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        // Write data with only f2 blob
+        FileStoreTable table = getTableDefault();
+        org.apache.paimon.table.sink.BatchWriteBuilder builder = 
table.newBatchWriteBuilder();
+        try (org.apache.paimon.table.sink.BatchTableWrite write = 
builder.newWrite()) {
+            for (int i = 0; i < 50; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("row" + i),
+                                new BlobData(blobBytes1)));
+            }
+            org.apache.paimon.table.sink.BatchTableCommit commit = 
builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        // Add new blob column f3
+        catalog.alterTable(
+                identifier(),
+                Collections.singletonList(SchemaChange.addColumn("f3", 
DataTypes.BLOB())),
+                false);
+
+        // Write more data with both f2 and f3
+        table = getTableDefault();
+        builder = table.newBatchWriteBuilder();
+        try (org.apache.paimon.table.sink.BatchTableWrite write = 
builder.newWrite()) {
+            for (int i = 50; i < 100; i++) {
+                write.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("row" + i),
+                                new BlobData(blobBytes1),
+                                new BlobData(blobBytes2)));
+            }
+            org.apache.paimon.table.sink.BatchTableCommit commit = 
builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        // Project only both blob fields: f2 (index 2) and f3 (index 3)
+        table = getTableDefault();
+        List<InternalRow> rows = read(table, new int[] {2, 3});
+
+        assertThat(rows.size()).isEqualTo(100);
+        // Sort by checking which rows have null f3
+        int nullF3Count = 0;
+        int nonNullF3Count = 0;
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+            if (row.isNullAt(1)) {
+                nullF3Count++;
+            } else {
+                assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+                nonNullF3Count++;
+            }
+        }
+        // First 50 rows have null f3 (added before schema evolution)
+        assertThat(nullF3Count).isEqualTo(50);
+        assertThat(nonNullF3Count).isEqualTo(50);
+    }
+
+    @Test
+    public void testAsymmetricCompactionThenProjectBothBlobs() throws 
Exception {
+        // Use smaller blob target to produce more blob files per field
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.column("f3", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+        schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "3");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        // Write multiple batches — each batch creates separate blob files
+        FileStoreTable table = getTableDefault();
+        org.apache.paimon.table.sink.BatchWriteBuilder builder = 
table.newBatchWriteBuilder();
+        for (int batch = 0; batch < 6; batch++) {
+            try (org.apache.paimon.table.sink.BatchTableWrite write = 
builder.newWrite()) {
+                for (int i = 0; i < 10; i++) {
+                    write.write(
+                            GenericRow.of(
+                                    batch * 10 + i,
+                                    BinaryString.fromString("v" + i),
+                                    new BlobData(blobBytes1),
+                                    new BlobData(blobBytes2)));
+                }
+                org.apache.paimon.table.sink.BatchTableCommit commit = 
builder.newCommit();
+                commit.commit(write.prepareCommit());
+            }
+        }
+
+        table = getTableDefault();
+
+        // Run compaction
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, true, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        if (!tasks.isEmpty()) {
+            List<CommitMessage> compactMessages = new ArrayList<>();
+            for (DataEvolutionCompactTask task : tasks) {
+                compactMessages.add(task.doCompact(table, commitUser));
+            }
+            commitDefault(compactMessages);
+        }
+
+        // Project only both blob fields after compaction
+        table = getTableDefault();
+        List<InternalRow> rows = read(table, new int[] {2, 3});
+        assertThat(rows.size()).isEqualTo(60);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+            assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+        }
+    }
+
+    @Test
+    public void testBlobOnlyProjectionWithRowRanges() throws Exception {
+        createTableDefault();
+
+        // Write 1000 rows (will create ~10 blob files per field)
+        commitDefault(writeDataDefault(1000, 1));
+
+        FileStoreTable table = getTableDefault();
+
+        // Read with row ranges [100, 199] — only blob projection
+        org.apache.paimon.table.source.ReadBuilder readBuilder = 
table.newReadBuilder();
+        readBuilder.withProjection(new int[] {2, 3});
+        readBuilder.withRowRanges(
+                Arrays.asList(new org.apache.paimon.utils.Range(100, 199)));
+        org.apache.paimon.reader.RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+
+        assertThat(rows.size()).isEqualTo(100);
+        for (InternalRow row : rows) {
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            assertThat(row.getBlob(0).toData()).isEqualTo(blobBytes1);
+            assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes2);
+        }
+    }
+
     @Override
     protected byte[] randomBytes() {
         byte[] binary = new byte[2 * 1024 * 124];
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 14b7f7a5c7..cc1b07fe6b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.globalindex.IndexedSplit;
 import org.apache.paimon.index.IndexFileMeta;
@@ -37,6 +38,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.DataEvolutionFileReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -45,6 +47,7 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -1070,6 +1073,474 @@ public class DataEvolutionTableTest extends 
DataEvolutionTestBase {
         assertThat(path4.toString()).isEqualTo(testExternalpath2);
     }
 
+    @Test
+    public void testProjectionPushdown() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+        // Write f0 and f1 together
+        RowType writeType0 = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(1, BinaryString.fromString("a")));
+            write0.write(GenericRow.of(2, BinaryString.fromString("b")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            commit.commit(commitables);
+        }
+
+        // Write f2 separately with same firstRowId
+        RowType writeType1 = 
schema.rowType().project(Collections.singletonList("f2"));
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(GenericRow.of(BinaryString.fromString("x")));
+            write1.write(GenericRow.of(BinaryString.fromString("y")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        // Project only f0 - should filter out the f2-only file
+        ReadBuilder readBuilder = 
getTableDefault().newReadBuilder().withProjection(new int[] {0});
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        DataSplit dataSplit = (DataSplit) plan.splits().get(0);
+        assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+        RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan);
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows.size()).isEqualTo(2);
+
+        // Project only f2 - should filter out the f0,f1-only file
+        readBuilder = getTableDefault().newReadBuilder().withProjection(new 
int[] {2});
+        plan = readBuilder.newScan().plan();
+        dataSplit = (DataSplit) plan.splits().get(0);
+        assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+        reader = readBuilder.newRead().createReader(plan);
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows.size()).isEqualTo(2);
+
+        // Project f0 and f2 (skip f1) - needs both files
+        readBuilder = getTableDefault().newReadBuilder().withProjection(new 
int[] {0, 2});
+        plan = readBuilder.newScan().plan();
+        dataSplit = (DataSplit) plan.splits().get(0);
+        assertThat(dataSplit.dataFiles().size()).isEqualTo(2);
+        reader = readBuilder.newRead().createReader(plan);
+        RowType projectedType = schema.rowType().project(Arrays.asList("f0", 
"f2"));
+        InternalRowSerializer serializer = new 
InternalRowSerializer(projectedType);
+        List<InternalRow> projectedRows = new ArrayList<>();
+        reader.forEachRemaining(r -> projectedRows.add(serializer.copy(r)));
+        assertThat(projectedRows.size()).isEqualTo(2);
+        projectedRows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+        assertThat(projectedRows.get(0).getInt(0)).isEqualTo(1);
+        
assertThat(projectedRows.get(0).getString(1).toString()).isEqualTo("x");
+        assertThat(projectedRows.get(1).getInt(0)).isEqualTo(2);
+        
assertThat(projectedRows.get(1).getString(1).toString()).isEqualTo("y");
+    }
+
+    @Test
+    public void testSequenceNumberConflictResolution() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+        // Write all columns first
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(schema.rowType())) {
+            write.write(
+                    GenericRow.of(1, BinaryString.fromString("a"), 
BinaryString.fromString("old")));
+            write.write(
+                    GenericRow.of(2, BinaryString.fromString("b"), 
BinaryString.fromString("old")));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        // Overwrite f2 with a new value (higher sequence number)
+        RowType writeType1 = 
schema.rowType().project(Collections.singletonList("f2"));
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(GenericRow.of(BinaryString.fromString("new")));
+            write1.write(GenericRow.of(BinaryString.fromString("new")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        // Read and verify f2 shows the new value (higher seq number wins)
+        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(schema.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+        assertThat(rows.size()).isEqualTo(2);
+        rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+        assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("new");
+        assertThat(rows.get(1).getInt(0)).isEqualTo(2);
+        assertThat(rows.get(1).getString(1).toString()).isEqualTo("b");
+        assertThat(rows.get(1).getString(2).toString()).isEqualTo("new");
+    }
+
+    @Test
+    public void testMultipleOverwritesSameColumn() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+        // Write initial data with all columns
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(schema.rowType())) {
+            write.write(
+                    GenericRow.of(
+                            1, BinaryString.fromString("a"), 
BinaryString.fromString("version1")));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        // Overwrite f2 - second time
+        RowType writeType1 = 
schema.rowType().project(Collections.singletonList("f2"));
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(GenericRow.of(BinaryString.fromString("version2")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        // Overwrite f2 - third time
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(GenericRow.of(BinaryString.fromString("version3")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        // Read and verify only the latest version is visible
+        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows.size()).isEqualTo(1);
+        assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("version3");
+    }
+
+    @Test
+    public void testCompactThenReadCorrectness() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            write(100000L);
+        }
+        FileStoreTable table = getTableDefault();
+
+        // Run compaction
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false, false);
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        List<DataEvolutionCompactTask> tasks;
+        try {
+            while (!(tasks = coordinator.plan()).isEmpty()) {
+                for (DataEvolutionCompactTask task : tasks) {
+                    commitMessages.add(task.doCompact(table, "test-compact"));
+                }
+            }
+        } catch (EndOfScanException ignore) {
+        }
+        assertThat(commitMessages.isEmpty()).isFalse();
+        table.newBatchWriteBuilder().newCommit().commit(commitMessages);
+
+        // Verify data after compaction
+        Schema schema = schemaDefault();
+        table = getTableDefault();
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan);
+        InternalRowSerializer serializer = new 
InternalRowSerializer(schema.rowType());
+        List<InternalRow> rowsAfter = new ArrayList<>();
+        reader.forEachRemaining(r -> rowsAfter.add(serializer.copy(r)));
+        assertThat(rowsAfter.size()).isEqualTo(500000);
+
+        // Each write produces rows with f0=0..99999, so 5 writes gives 5 
copies of each
+        rowsAfter.sort(Comparator.comparingInt(r -> r.getInt(0)));
+        // First 5 rows should all have f0=0 with correct f1 and f2
+        for (int i = 0; i < 5; i++) {
+            assertThat(rowsAfter.get(i).getInt(0)).isEqualTo(0);
+            
assertThat(rowsAfter.get(i).getString(1).toString()).isEqualTo("a0");
+            
assertThat(rowsAfter.get(i).getString(2).toString()).isEqualTo("b0");
+        }
+        // Spot check other values
+        for (int i = 5; i < 10; i++) {
+            assertThat(rowsAfter.get(i).getInt(0)).isEqualTo(1);
+            
assertThat(rowsAfter.get(i).getString(1).toString()).isEqualTo("a1");
+            
assertThat(rowsAfter.get(i).getString(2).toString()).isEqualTo("b1");
+        }
+
+        // After compaction, only 1 file should remain
+        assertThat(plan.splits().size()).isEqualTo(1);
+        DataSplit dataSplit = (DataSplit) plan.splits().get(0);
+        assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+    }
+
+    @Test
+    public void testStreamingRead() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        FileStoreTable table = getTableDefault();
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        StreamTableScan streamScan = readBuilder.newStreamScan();
+
+        // Initial plan should be empty (no snapshots yet), or return existing 
data
+        // Write first batch - full row
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(schema.rowType())) {
+            write.write(
+                    GenericRow.of(1, BinaryString.fromString("a"), 
BinaryString.fromString("b")));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        // Streaming plan should return the new data
+        TableScan.Plan plan = streamScan.plan();
+        RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan);
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows.size()).isEqualTo(1);
+        assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("b");
+
+        // Write a partial-column append for new row
+        RowType writeType0 = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(2, BinaryString.fromString("c")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            setFirstRowId(commitables, 1L);
+            commit.commit(commitables);
+        }
+
+        // Streaming plan returns only the new incremental data
+        plan = streamScan.plan();
+        reader = readBuilder.newRead().createReader(plan);
+        List<InternalRow> rows2 = new ArrayList<>();
+        reader.forEachRemaining(rows2::add);
+        assertThat(rows2.size()).isEqualTo(1);
+        assertThat(rows2.get(0).getInt(0)).isEqualTo(2);
+        assertThat(rows2.get(0).getString(1).toString()).isEqualTo("c");
+        assertThat(rows2.get(0).isNullAt(2)).isTrue();
+    }
+
+    @Test
+    public void testPartitionedTable() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("pt", DataTypes.STRING());
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.STRING());
+        schemaBuilder.partitionKeys("pt");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        
schemaBuilder.option(CoreOptions.ROW_TRACKING_PARTITION_GROUP_ON_COMMIT.key(), 
"true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+        FileStoreTable table = getTableDefault();
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+        RowType fullType = table.rowType();
+        RowType writeType0 = fullType.project(Arrays.asList("pt", "f0", "f1"));
+        RowType writeType1 = fullType.project(Arrays.asList("pt", "f2"));
+
+        // Write f0, f1 for partition p1 only
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(
+                    GenericRow.of(BinaryString.fromString("p1"), 1, 
BinaryString.fromString("a")));
+            write0.write(
+                    GenericRow.of(BinaryString.fromString("p1"), 2, 
BinaryString.fromString("b")));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write0.prepareCommit());
+        }
+
+        long p1RowId =
+                table.snapshotManager().latestSnapshot().nextRowId() - 2; // 2 
rows written for p1
+
+        // Write f2 for p1 with matching firstRowId
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(
+                    GenericRow.of(BinaryString.fromString("p1"), 
BinaryString.fromString("x")));
+            write1.write(
+                    GenericRow.of(BinaryString.fromString("p1"), 
BinaryString.fromString("y")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, p1RowId);
+            commit.commit(commitables);
+        }
+
+        // Write f0, f1 for partition p2
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(
+                    GenericRow.of(BinaryString.fromString("p2"), 3, 
BinaryString.fromString("c")));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write0.prepareCommit());
+        }
+
+        long p2RowId =
+                table.snapshotManager().latestSnapshot().nextRowId() - 1; // 1 
row written to p2
+
+        // Write f2 for p2 with matching firstRowId
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(
+                    GenericRow.of(BinaryString.fromString("p2"), 
BinaryString.fromString("z")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, p2RowId);
+            commit.commit(commitables);
+        }
+
+        // Read all and verify
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new InternalRowSerializer(fullType);
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(r -> rows.add(serializer.copy(r)));
+        rows.sort(Comparator.comparingInt(r -> r.getInt(1)));
+
+        assertThat(rows.size()).isEqualTo(3);
+        // p1 rows
+        assertThat(rows.get(0).getString(0).toString()).isEqualTo("p1");
+        assertThat(rows.get(0).getInt(1)).isEqualTo(1);
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("a");
+        assertThat(rows.get(0).getString(3).toString()).isEqualTo("x");
+
+        assertThat(rows.get(1).getString(0).toString()).isEqualTo("p1");
+        assertThat(rows.get(1).getInt(1)).isEqualTo(2);
+        assertThat(rows.get(1).getString(2).toString()).isEqualTo("b");
+        assertThat(rows.get(1).getString(3).toString()).isEqualTo("y");
+
+        // p2 row
+        assertThat(rows.get(2).getString(0).toString()).isEqualTo("p2");
+        assertThat(rows.get(2).getInt(1)).isEqualTo(3);
+        assertThat(rows.get(2).getString(2).toString()).isEqualTo("c");
+        assertThat(rows.get(2).getString(3).toString()).isEqualTo("z");
+    }
+
+    @Test
+    public void testSchemaEvolution() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+        // Write initial data with original schema (f0, f1, f2)
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(schema.rowType())) {
+            write.write(
+                    GenericRow.of(1, BinaryString.fromString("a"), 
BinaryString.fromString("b")));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        // Add a new column f3
+        catalog.alterTable(identifier(), SchemaChange.addColumn("f3", 
DataTypes.STRING()), false);
+
+        // Reload table to pick up new schema
+        FileStoreTable table = getTableDefault();
+        builder = table.newBatchWriteBuilder();
+
+        // Write data with new schema (f0, f1, f2, f3)
+        try (BatchTableWrite write = builder.newWrite()) {
+            write.write(
+                    GenericRow.of(
+                            2,
+                            BinaryString.fromString("c"),
+                            BinaryString.fromString("d"),
+                            BinaryString.fromString("e")));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write.prepareCommit());
+        }
+
+        // Read and verify - old rows should have null for f3
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+
+        assertThat(rows.size()).isEqualTo(2);
+        // Old row - f3 should be null
+        assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("a");
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("b");
+        assertThat(rows.get(0).isNullAt(3)).isTrue();
+
+        // New row - all columns present
+        assertThat(rows.get(1).getInt(0)).isEqualTo(2);
+        assertThat(rows.get(1).getString(1).toString()).isEqualTo("c");
+        assertThat(rows.get(1).getString(2).toString()).isEqualTo("d");
+        assertThat(rows.get(1).getString(3).toString()).isEqualTo("e");
+    }
+
+    @Test
+    public void testReadAfterMultipleAppendsToDifferentColumnSets() throws 
Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+        // Commit 1: Write only f0 for row 0
+        RowType writeType0 = 
schema.rowType().project(Collections.singletonList("f0"));
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(1));
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write0.prepareCommit());
+        }
+
+        // Commit 2: Write only f1 for row 1 (different row)
+        RowType writeType1 = 
schema.rowType().project(Collections.singletonList("f1"));
+        try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
+            write1.write(GenericRow.of(BinaryString.fromString("a")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write1.prepareCommit();
+            setFirstRowId(commitables, 1L);
+            commit.commit(commitables);
+        }
+
+        // Commit 3: Write only f2 for row 2 (different row)
+        RowType writeType2 = 
schema.rowType().project(Collections.singletonList("f2"));
+        try (BatchTableWrite write2 = 
builder.newWrite().withWriteType(writeType2)) {
+            write2.write(GenericRow.of(BinaryString.fromString("b")));
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write2.prepareCommit();
+            setFirstRowId(commitables, 2L);
+            commit.commit(commitables);
+        }
+
+        // Read all rows - each row should have only its written column with 
others null
+        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+
+        assertThat(rows.size()).isEqualTo(3);
+
+        // Row 0: only f0 is set
+        assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+        assertThat(rows.get(0).isNullAt(1)).isTrue();
+        assertThat(rows.get(0).isNullAt(2)).isTrue();
+
+        // Row 1: only f1 is set
+        assertThat(rows.get(1).isNullAt(0)).isTrue();
+        assertThat(rows.get(1).getString(1).toString()).isEqualTo("a");
+        assertThat(rows.get(1).isNullAt(2)).isTrue();
+
+        // Row 2: only f2 is set
+        assertThat(rows.get(2).isNullAt(0)).isTrue();
+        assertThat(rows.get(2).isNullAt(1)).isTrue();
+        assertThat(rows.get(2).getString(2).toString()).isEqualTo("b");
+    }
+
     private Range assertContinuousRowIdRange(List<DataFileMeta> files) {
         files.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
         long start = files.get(0).nonNullFirstRowId();

Reply via email to