This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ce4e018b2 [core] introduce Placeholder for Blob File Format (#7889)
5ce4e018b2 is described below

commit 5ce4e018b2758040c46feb135e47f834fefd1bd5
Author: Faiz <[email protected]>
AuthorDate: Tue May 26 22:09:05 2026 +0800

    [core] introduce Placeholder for Blob File Format (#7889)
---
 .../org/apache/paimon/data/BlobPlaceholder.java    |  59 +++
 .../paimon/reader/DataEvolutionFileReader.java     |   2 +-
 .../paimon/operation/BlobFallbackRecordReader.java | 428 +++++++++++++++++
 .../paimon/operation/DataEvolutionSplitRead.java   | 270 ++++++++---
 .../org/apache/paimon/append/BlobUpdateTest.java   | 532 +++++++++++++++++++++
 .../operation/BlobFallbackRecordReaderTest.java    | 396 +++++++++++++++
 .../paimon/operation/DataEvolutionReadTest.java    | 340 ++++++++-----
 .../paimon/table/AppendOnlySimpleTableTest.java    |  21 +-
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  20 +-
 .../apache/paimon/format/blob/BlobFileMeta.java    |   8 +-
 .../paimon/format/blob/BlobFormatReader.java       |   3 +
 .../paimon/format/blob/BlobFormatWriter.java       |   9 +-
 .../paimon/format/blob/BlobFileFormatTest.java     |  40 +-
 .../pypaimon/read/reader/format_blob_reader.py     |  18 +-
 paimon-python/pypaimon/table/row/blob.py           |  15 +
 paimon-python/pypaimon/tests/blob_test.py          |  75 ++-
 paimon-python/pypaimon/write/blob_format_writer.py |  10 +-
 17 files changed, 2002 insertions(+), 244 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BlobPlaceholder.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobPlaceholder.java
new file mode 100644
index 0000000000..7a693962cb
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobPlaceholder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.fs.SeekableInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The placeholder blob, mainly for blob update in data-evolution. It should 
never be exposed to
+ * users.
+ */
+public class BlobPlaceholder implements Blob, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final BlobPlaceholder INSTANCE = new BlobPlaceholder();
+
+    private BlobPlaceholder() {}
+
+    private Object readResolve() {
+        return INSTANCE;
+    }
+
+    @Override
+    public byte[] toData() {
+        throw new UnsupportedOperationException(
+                "Should never call this method for placeholder blob.");
+    }
+
+    @Override
+    public BlobDescriptor toDescriptor() {
+        throw new UnsupportedOperationException(
+                "Should never call this method for placeholder blob.");
+    }
+
+    @Override
+    public SeekableInputStream newInputStream() throws IOException {
+        throw new UnsupportedOperationException(
+                "Should never call this method for placeholder blob.");
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
index 1c50410fa1..53766440ff 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
@@ -66,7 +66,7 @@ public class DataEvolutionFileReader implements 
RecordReader<InternalRow> {
                 rowOffsets.length == fieldOffsets.length,
                 "Row offsets and field offsets must have the same length");
         checkArgument(rowOffsets.length > 0, "Row offsets must not be empty");
-        checkArgument(readers != null && readers.length > 1, "Readers should 
be more than 1");
+        checkArgument(readers != null && readers.length >= 1, "should not pass 
empty readers.");
         this.rowOffsets = rowOffsets;
         this.fieldOffsets = fieldOffsets;
         this.readers = readers;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
new file mode 100644
index 0000000000..86f555748f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.append.ForceSingleBatchReader;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Collections.reverseOrder;
+import static java.util.Comparator.comparingLong;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Resolves blob placeholder rows by falling back through older sequence 
groups. The read logic is
+ * as below:
+ *
+ * <ol>
+ *   <li>Group files by max-seq, higher-seq files will have newer records.
+ *   <li>Sort files by range within each group, and create sequential readers 
for them. Note that
+ *       absent ranges will be read as all-placeholder rows.
+ *   <li>Sort by max-seq and merge read all readers, records at each index 
will be the first
+ *       non-placeholder blob.
+ * </ol>
+ */
+public class BlobFallbackRecordReader implements RecordReader<InternalRow> {
+
+    private final List<RecordReader<InternalRow>> groupReaders = new 
ArrayList<>();
+    private final int blobIndex;
+    private boolean returned;
+
+    BlobFallbackRecordReader(
+            List<DataFileMeta> files,
+            BlobFileReaderFactory readerFactory,
+            List<Range> rowRanges,
+            RowType readRowType,
+            int blobIndex) {
+        this.blobIndex = blobIndex;
+
+        checkArgument(!files.isEmpty(), "Blob bunch should not be empty.");
+        long firstRowId = Long.MAX_VALUE;
+        long lastRowId = Long.MIN_VALUE;
+
+        // sort group readers in descending order
+        Map<Long, List<DataFileMeta>> sequenceGroups = new 
TreeMap<>(reverseOrder());
+        for (DataFileMeta file : files) {
+            Range fileRange = file.nonNullRowIdRange();
+            firstRowId = Math.min(firstRowId, fileRange.from);
+            lastRowId = Math.max(lastRowId, fileRange.to);
+
+            sequenceGroups
+                    .computeIfAbsent(file.maxSequenceNumber(), ignored -> new 
ArrayList<>())
+                    .add(file);
+        }
+
+        for (Map.Entry<Long, List<DataFileMeta>> entry : 
sequenceGroups.entrySet()) {
+            // within each group, sort by first row id
+            List<DataFileMeta> groupFiles = entry.getValue();
+            groupFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId));
+
+            DataFileMeta current, next;
+            for (int i = 0; i < groupFiles.size() - 1; i++) {
+                current = groupFiles.get(i);
+                next = groupFiles.get(i + 1);
+
+                Preconditions.checkState(
+                        
!current.nonNullRowIdRange().hasIntersection(next.nonNullRowIdRange()),
+                        "Blob files within a same max_seq_num should not 
overlap. Find: %s, %s",
+                        current,
+                        next);
+            }
+
+            groupReaders.add(
+                    new ForceSingleBatchReader(
+                            new BlobSequenceGroupRecordReader(
+                                    groupFiles,
+                                    readerFactory,
+                                    rowRanges,
+                                    readRowType,
+                                    blobIndex,
+                                    firstRowId,
+                                    lastRowId)));
+        }
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        if (returned) {
+            return null;
+        }
+        returned = true;
+
+        // all readers are forced returning single batch
+        RecordIterator<InternalRow>[] iterators = new 
RecordIterator[groupReaders.size()];
+        for (int i = 0; i < groupReaders.size(); i++) {
+            RecordIterator<InternalRow> iterator = 
groupReaders.get(i).readBatch();
+            if (iterator == null) {
+                if (i != 0) {
+                    throw new IllegalStateException("All readers should be a 
single batch reader.");
+                }
+                for (int j = i + 1; j < groupReaders.size(); j++) {
+                    if (groupReaders.get(j).readBatch() != null) {
+                        throw new IllegalStateException(
+                                "All readers should be a single batch 
reader.");
+                    }
+                }
+                return null;
+            }
+            iterators[i] = iterator;
+        }
+
+        return new RecordIterator<InternalRow>() {
+            @Nullable
+            @Override
+            public InternalRow next() throws IOException {
+                InternalRow result = null;
+                // We should always move each iterator forward
+                // This may significantly increase memory usage and decrease 
read efficiency
+                // if `blob-as-descriptor` is disabled and many non-null blobs 
are updated
+                // TODO: Do not read stale records if there's a newer 
non-placeholder
+                //   record. e.g. introduce a discard method to directly 
discard the
+                //   next record?
+                for (int i = 0; i < iterators.length; i++) {
+                    RecordIterator<InternalRow> iterator = iterators[i];
+                    InternalRow row = iterator.next();
+                    if (row == null) {
+                        if (i != 0) {
+                            throw new IllegalStateException(
+                                    "All readers of each max_seq group should 
have the same number of records.");
+                        }
+                        for (int j = i + 1; j < iterators.length; j++) {
+                            if (iterators[j].next() != null) {
+                                throw new IllegalStateException(
+                                        "All readers of each max_seq group 
should have the same number of records.");
+                            }
+                        }
+                        return null;
+                    }
+                    // result is the first non-placeholder record
+                    if (result == null && !isPlaceHolder(row)) {
+                        result = row;
+                    }
+                }
+                if (result == null) {
+                    throw new IllegalStateException(
+                            "Invalid state: all blob files at the same row id 
store a placeholder, it's a bug.");
+                }
+                return result;
+            }
+
+            @Override
+            public void releaseBatch() {
+                for (RecordIterator<InternalRow> iterator : iterators) {
+                    iterator.releaseBatch();
+                }
+            }
+        };
+    }
+
+    private boolean isPlaceHolder(InternalRow row) {
+        return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == 
BlobPlaceholder.INSTANCE;
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException exception = null;
+        for (RecordReader<InternalRow> reader : groupReaders) {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                if (exception == null) {
+                    exception = e;
+                } else {
+                    exception.addSuppressed(e);
+                }
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    /**
+     * Reads one blob sequence group (all blob files with the same 
max_seq_num) and emits
+     * placeholder rows for row id gaps. For example, if the full row range is 
[0, 100], but there's
+     * only one blob file with row range [20, 80], then the rows with row id 
[0, 19] and [81, 100]
+     * will be emitted as placeholder rows.
+     *
+     * <p>This reader should always be fully consumed, or the internal states 
may be broken.
+     *
+     * <p>Note that we can not simply concat all data files and read them, 
even though we guarantee
+     * writing the full-range data during data-evolution. The complexity is 
introduced by row-level
+     * compaction. For example, if we execute following operations:
+     *
+     * <ol>
+     *   <li>Write [0, 100], generate files [0, 50], [51, 100]
+     *   <li>Update [0, 100] files, generate files [0, 25], [26, 75], [76, 100]
+     *   <li>Insert new blobs for range [101, 200], generate files [101, 200]
+     *   <li>Update new blobs for range [101, 200], generate files [101, 150], 
[151, 200]
+     *   <li>Compact, merge [0, 100], [101, 200] to a single range
+     *   <li>Update the compacted files, generate files [0, 200]
+     * </ol>
+     *
+     * <p>The data files layout would be:
+     *
+     * <pre>
+     *         |<-----------------------  merged range: row 0 ~ 200  
--------------------------------->|
+     *         |                                                               
                        |
+     *         ┌─────────────────────────┐┌──────────────────────┐
+     *   seq1: │      file1 (0~50)       ││     file2 (51~100)   │           
(empty on 101~200)
+     *         └─────────────────────────┘└──────────────────────┘
+     *         ┌─────────────┐┌──────────────────────┐┌──────────┐
+     *   seq2: │ file3(0~25) ││    file4 (26~75)     ││f5(76~100)│           
(empty on 101~200)
+     *         └─────────────┘└──────────────────────┘└──────────┘
+     *                                                            
┌────────────────────────────────────┐
+     *   seq3:                (empty on 0~100)                    │         
file6 (101~200)            │
+     *                                                            
└────────────────────────────────────┘
+     *                                                            
┌─────────────────┐┌─────────────────┐
+     *   seq4:                (empty on 0~100)                    │ file7 
(101~150) ││ file8 (151~200) │
+     *                                                            
└─────────────────┘└─────────────────┘
+     *         
┌───────────────────────────────────────────────────────────────────────────────────────┐
+     *   seq6: │                              file9 (0~200)                    
                        │
+     *         
└───────────────────────────────────────────────────────────────────────────────────────┘
+     * </pre>
+     *
+     * <p>We treat all gaps as full-placeholders, and correctly resolve 
pushed-ranges.
+     */
+    public static class BlobSequenceGroupRecordReader implements 
RecordReader<InternalRow> {
+
+        private final List<DataFileMeta> files;
+        private final BlobFileReaderFactory readerFactory;
+        // pushed row ranges
+        private final List<Range> rowRanges;
+        private final RowType readRowType;
+        private final int blobIndex;
+        private final long lastRowId;
+
+        private RecordReader<InternalRow> currentReader;
+        private DataFileMeta currentFile;
+        private int nextFileIndex;
+        private int nextRowRangeIndex;
+        // expected next row id
+        private long nextRowId;
+
+        private InternalRow placeholderRow;
+
+        BlobSequenceGroupRecordReader(
+                List<DataFileMeta> files,
+                BlobFileReaderFactory readerFactory,
+                List<Range> rowRanges,
+                RowType readRowType,
+                int blobIndex,
+                long firstRowId,
+                long lastRowId) {
+            this.files = files;
+            this.readerFactory = readerFactory;
+            this.rowRanges = rowRanges == null ? null : 
Range.sortAndMergeOverlap(rowRanges);
+            this.readRowType = readRowType;
+            this.blobIndex = blobIndex;
+            this.lastRowId = lastRowId;
+
+            this.nextFileIndex = 0;
+            this.nextRowRangeIndex = 0;
+            setNextRowId(firstRowId);
+
+            this.placeholderRow = null;
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<InternalRow> readBatch() throws IOException {
+            while (true) {
+                if (currentReader != null) {
+                    RecordIterator<InternalRow> batch = 
currentReader.readBatch();
+                    if (batch != null) {
+                        return batch;
+                    }
+                    // row ranges have been pushed to readers
+                    // directly set nextRowId as the lastRowId + 1
+                    setNextRowId(lastRowId(currentFile) + 1);
+                    closeCurrentFileReader();
+                    continue;
+                }
+
+                if (nextRowId > lastRowId) {
+                    return null;
+                }
+
+                // skip files whose ranges are before nextRowId
+                while (nextFileIndex < files.size()
+                        && lastRowId(files.get(nextFileIndex)) < nextRowId) {
+                    nextFileIndex++;
+                }
+                if (nextFileIndex >= files.size()) {
+                    return placeHolderBatch(lastRowId);
+                }
+
+                DataFileMeta nextFile = files.get(nextFileIndex);
+                if (nextFile.nonNullFirstRowId() > nextRowId) {
+                    return placeHolderBatch(nextFile.nonNullFirstRowId() - 1);
+                }
+
+                createReader(nextFile);
+            }
+        }
+
+        /**
+         * Set nextRowId and try to move to the next selected row id. So the 
final nextRowId may be
+         * greater than the input value.
+         */
+        private void setNextRowId(long nextRowId) {
+            this.nextRowId = nextRowId;
+            tryMoveToSelectedRow();
+        }
+
+        private void tryMoveToSelectedRow() {
+            if (nextRowId > lastRowId || rowRanges == null) {
+                return;
+            }
+
+            while (nextRowRangeIndex < rowRanges.size()) {
+                Range range = rowRanges.get(nextRowRangeIndex);
+                if (nextRowId >= range.from && nextRowId <= range.to) {
+                    // if nextRowId is within the range, do not need to move
+                    return;
+                } else if (nextRowId < range.from) {
+                    // else if nextRowId < next range, move to next range's 
`from`
+                    nextRowId = range.from;
+                    return;
+                }
+                // else nextRowId > range.to, try next range
+                nextRowRangeIndex++;
+            }
+
+            // all ranges consumed, no need to read
+            nextRowId = lastRowId + 1;
+        }
+
+        private RecordIterator<InternalRow> placeHolderBatch(long endRowId) {
+            return new RecordIterator<InternalRow>() {
+                long rowId;
+
+                @Nullable
+                @Override
+                public InternalRow next() {
+                    rowId = nextRowId;
+                    if (rowId > endRowId) {
+                        return null;
+                    }
+                    setNextRowId(rowId + 1);
+                    return placeHolderRow();
+                }
+
+                @Override
+                public void releaseBatch() {
+                    // nothing to release
+                }
+            };
+        }
+
+        private InternalRow placeHolderRow() {
+            if (placeholderRow == null) {
+                GenericRow row = new GenericRow(readRowType.getFieldCount());
+                row.setField(blobIndex, BlobPlaceholder.INSTANCE);
+                placeholderRow = row;
+            }
+            return placeholderRow;
+        }
+
+        private long lastRowId(DataFileMeta file) {
+            return file.nonNullFirstRowId() + file.rowCount() - 1;
+        }
+
+        private void closeCurrentFileReader() throws IOException {
+            if (currentReader != null) {
+                currentReader.close();
+                currentReader = null;
+            }
+            currentFile = null;
+        }
+
+        private void createReader(DataFileMeta nextFile) throws IOException {
+            currentFile = nextFile;
+            currentReader = readerFactory.create(nextFile);
+            nextFileIndex++;
+        }
+
+        @Override
+        public void close() throws IOException {
+            closeCurrentFileReader();
+        }
+    }
+
+    /** Factory to create readers. */
+    interface BlobFileReaderFactory {
+        RecordReader<InternalRow> create(DataFileMeta file) throws IOException;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 0df1b1428e..145fdc9ad4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -46,10 +46,12 @@ import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FormatReaderMapping;
 import org.apache.paimon.utils.FormatReaderMapping.Builder;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RangeHelper;
 import org.apache.paimon.utils.RoaringBitmap32;
@@ -228,7 +230,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                             || 
isVectorStoreFile(file.fileName()),
                                     "Only blob/vector-store files need to call 
this method.");
                             return 
schemaFetcher.apply(file.schemaId()).logicalRowType();
-                        });
+                        },
+                        rowRanges != null);
 
         long rowCount = fieldsFiles.get(0).rowCount();
         long firstRowId = 
fieldsFiles.get(0).files().get(0).nonNullFirstRowId();
@@ -302,15 +305,16 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                                 dataSchema,
                                                 readFields,
                                                 false));
+                RowType partialReadRowType = new RowType(readFields);
                 fileRecordReaders[i] =
                         new ForceSingleBatchReader(
-                                createFileReader(
+                                createFieldBunchReader(
                                         partition,
                                         bunch,
                                         dataFilePathFactory,
                                         formatReaderMapping,
                                         rowRanges,
-                                        readRowType));
+                                        partialReadRowType));
             }
         }
 
@@ -327,31 +331,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         return new DataEvolutionFileReader(rowOffsets, fieldOffsets, 
fileRecordReaders);
     }
 
-    private FileRecordReader<InternalRow> createFileReader(
-            BinaryRow partition,
-            DataFilePathFactory dataFilePathFactory,
-            DataFileMeta file,
-            Builder formatBuilder,
-            List<Range> rowRanges,
-            RowType readRowType)
-            throws IOException {
-        String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
-        long schemaId = file.schemaId();
-        FormatReaderMapping formatReaderMapping =
-                formatReaderMappings.computeIfAbsent(
-                        new FormatKey(file.schemaId(), formatIdentifier),
-                        key ->
-                                formatBuilder.build(
-                                        formatIdentifier,
-                                        schema,
-                                        schemaId == schema.id()
-                                                ? schema
-                                                : 
schemaFetcher.apply(schemaId)));
-        return createFileReader(
-                partition, file, dataFilePathFactory, formatReaderMapping, 
rowRanges, readRowType);
-    }
-
-    private RecordReader<InternalRow> createFileReader(
+    private RecordReader<InternalRow> createFieldBunchReader(
             BinaryRow partition,
             FieldBunch bunch,
             DataFilePathFactory dataFilePathFactory,
@@ -359,7 +339,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             List<Range> rowRanges,
             RowType readRowType)
             throws IOException {
-        if (bunch.files().size() == 1) {
+        if (bunch instanceof DataBunch) {
+            // for data bunch, directly read the single file
             return createFileReader(
                     partition,
                     bunch.files().get(0),
@@ -367,9 +348,51 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                     formatReaderMapping,
                     rowRanges,
                     readRowType);
+        } else if (bunch instanceof VectorFileBunch) {
+            // for vector bunch, sequential read all data files and concat them
+            return sequentialReadFiles(
+                    bunch.files(), partition, dataFilePathFactory, 
formatReaderMapping, rowRanges);
+        } else if (bunch instanceof BlobFileBunch) {
+            // for blob bunch, fallback on placeholders
+
+            // fast path: only contains one max_seq group
+            if (((BlobFileBunch) bunch).sequentialReadOptimize()) {
+                return sequentialReadFiles(
+                        bunch.files(),
+                        partition,
+                        dataFilePathFactory,
+                        formatReaderMapping,
+                        rowRanges);
+            }
+            int blobIndex = findBlobFieldIndex(readRowType);
+            checkArgument(blobIndex >= 0, "Blob bunch read type should contain 
a blob field.");
+            return new BlobFallbackRecordReader(
+                    bunch.files(),
+                    file ->
+                            createFileReader(
+                                    partition,
+                                    file,
+                                    dataFilePathFactory,
+                                    formatReaderMapping,
+                                    rowRanges,
+                                    readRowType),
+                    rowRanges,
+                    readRowType,
+                    blobIndex);
+        } else {
+            throw new UnsupportedOperationException("Unsupported bunch type: " 
+ bunch);
         }
+    }
+
+    private RecordReader<InternalRow> sequentialReadFiles(
+            List<DataFileMeta> files,
+            BinaryRow partition,
+            DataFilePathFactory dataFilePathFactory,
+            FormatReaderMapping formatReaderMapping,
+            List<Range> rowRanges)
+            throws IOException {
         List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
-        for (DataFileMeta file : bunch.files()) {
+        for (DataFileMeta file : files) {
             RoaringBitmap32 selection = file.toFileSelection(rowRanges);
             FormatReaderContext formatReaderContext =
                     new FormatReaderContext(
@@ -394,6 +417,39 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         return ConcatRecordReader.create(readerSuppliers);
     }
 
+    private static int findBlobFieldIndex(RowType rowType) {
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private FileRecordReader<InternalRow> createFileReader(
+            BinaryRow partition,
+            DataFilePathFactory dataFilePathFactory,
+            DataFileMeta file,
+            Builder formatBuilder,
+            List<Range> rowRanges,
+            RowType readRowType)
+            throws IOException {
+        String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
+        long schemaId = file.schemaId();
+        FormatReaderMapping formatReaderMapping =
+                formatReaderMappings.computeIfAbsent(
+                        new FormatKey(file.schemaId(), formatIdentifier),
+                        key ->
+                                formatBuilder.build(
+                                        formatIdentifier,
+                                        schema,
+                                        schemaId == schema.id()
+                                                ? schema
+                                                : 
schemaFetcher.apply(schemaId)));
+        return createFileReader(
+                partition, file, dataFilePathFactory, formatReaderMapping, 
rowRanges, readRowType);
+    }
+
     private FileRecordReader<InternalRow> createFileReader(
             BinaryRow partition,
             DataFileMeta file,
@@ -433,8 +489,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             Function<DataFileMeta, RowType> fileToRowType,
             boolean rowIdPushDown) {
         List<FieldBunch> fieldsFiles = new ArrayList<>();
-        Map<Integer, SpecialFieldBunch> blobBunchMap = new HashMap<>();
-        Map<VectorStoreBunchKey, SpecialFieldBunch> vectorStoreBunchMap = new 
TreeMap<>();
+        Map<Integer, BlobFileBunch> blobBunchMap = new HashMap<>();
+        Map<VectorStoreBunchKey, VectorFileBunch> vectorStoreBunchMap = new 
TreeMap<>();
         long rowCount = -1;
         for (DataFileMeta file : needMergeFiles) {
             if (isBlobFile(file.fileName())) {
@@ -443,8 +499,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 final long expectedRowCount = rowCount;
                 blobBunchMap
                         .computeIfAbsent(
-                                fieldId,
-                                key -> new SpecialFieldBunch(expectedRowCount, 
rowIdPushDown))
+                                fieldId, key -> new 
BlobFileBunch(expectedRowCount, rowIdPushDown))
                         .add(file);
             } else if (isVectorStoreFile(file.fileName())) {
                 RowType rowType = fileToRowType.apply(file);
@@ -456,7 +511,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 vectorStoreBunchMap
                         .computeIfAbsent(
                                 vectorStoreKey,
-                                key -> new SpecialFieldBunch(expectedRowCount, 
rowIdPushDown))
+                                key -> new VectorFileBunch(expectedRowCount, 
rowIdPushDown))
                         .add(file);
             } else {
                 // Normal file, just add it to the current merge split
@@ -496,8 +551,84 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         }
     }
 
+    /**
+     * The {@link FieldBunch} for blobs. Compared to {@link VectorFileBunch} 
which only contains
+     * data files of the max max_seq number, {@link BlobFileBunch} contains 
all blob files.
+     */
     @VisibleForTesting
-    static class SpecialFieldBunch implements FieldBunch {
+    static class BlobFileBunch implements FieldBunch {
+
+        final List<DataFileMeta> files;
+        final List<Range> ranges;
+        final long expectedRowCount;
+        final boolean rowIdPushdown;
+
+        BlobFileBunch(long expectedRowCount, boolean rowIdPushdown) {
+            this.files = new ArrayList<>();
+            this.expectedRowCount = expectedRowCount;
+            this.ranges = new ArrayList<>();
+            this.rowIdPushdown = rowIdPushdown;
+        }
+
+        void add(DataFileMeta file) {
+            if (!isBlobFile(file.fileName())) {
+                throw new IllegalArgumentException("Only blob file can be 
added to this bunch.");
+            }
+            if (!files.isEmpty()) {
+                checkArgument(
+                        file.writeCols().equals(files.get(0).writeCols()),
+                        "All files in this bunch should have the same write 
columns.");
+            }
+
+            files.add(file);
+            ranges.add(file.nonNullRowIdRange());
+        }
+
+        @Override
+        public long rowCount() {
+            List<Range> merged = Range.sortAndMergeOverlap(ranges, true);
+            if (!rowIdPushdown) {
+                Preconditions.checkState(
+                        merged.size() == 1,
+                        "Blob file bunch should always contain a contiguous 
row range.");
+
+                long rowCount = merged.get(0).count();
+                if (expectedRowCount >= 0) {
+                    Preconditions.checkState(
+                            rowCount == expectedRowCount,
+                            "The merged rowCount %s of blob file bunch should 
be aligned with normal files %s.",
+                            rowCount,
+                            expectedRowCount);
+                }
+            }
+
+            return merged.stream().mapToLong(Range::count).sum();
+        }
+
+        public boolean sequentialReadOptimize() {
+            Preconditions.checkState(!files.isEmpty(), "Blob file bunch should 
not be empty.");
+
+            // If blob files share the same max_seq_num, we could sequentially 
read them.
+            // Files have already been sorted by first_row_id
+            long maxSeq = files.get(0).maxSequenceNumber();
+            for (int i = 1; i < files.size(); i++) {
+                if (files.get(i).maxSequenceNumber() != maxSeq) {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+        @Override
+        public List<DataFileMeta> files() {
+            return files;
+        }
+    }
+
+    /** {@link FieldBunch} for vector-store files. */
+    @VisibleForTesting
+    static class VectorFileBunch implements FieldBunch {
 
         final List<DataFileMeta> files;
         final long expectedRowCount;
@@ -508,72 +639,67 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         long latestMaxSequenceNumber = -1;
         long rowCount;
 
-        SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) {
+        VectorFileBunch(long expectedRowCount, boolean rowIdPushDown) {
             this.files = new ArrayList<>();
-            this.rowCount = 0;
             this.expectedRowCount = expectedRowCount;
             this.rowIdPushDown = rowIdPushDown;
         }
 
         void add(DataFileMeta file) {
-            if (!isBlobFile(file.fileName()) && 
!isVectorStoreFile(file.fileName())) {
+            if (!isVectorStoreFile(file.fileName())) {
                 throw new IllegalArgumentException(
-                        "Only blob/vector-store file can be added to this 
bunch.");
+                        "Only vector-store file can be added to this bunch.");
             }
-
             if (file.nonNullFirstRowId() == latestFistRowId) {
                 if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
                     throw new IllegalArgumentException(
-                            "Blob/vector-store file with same first row id 
should have decreasing sequence number.");
+                            "Vector file with same first row id should have 
decreasing sequence number.");
                 }
                 return;
             }
+
             if (!files.isEmpty()) {
                 long firstRowId = file.nonNullFirstRowId();
-                if (rowIdPushDown) {
-                    if (firstRowId < expectedNextFirstRowId) {
-                        if (file.maxSequenceNumber() > 
latestMaxSequenceNumber) {
-                            DataFileMeta lastFile = files.remove(files.size() 
- 1);
-                            rowCount -= lastFile.rowCount();
-                        } else {
-                            return;
-                        }
-                    }
-                } else {
-                    if (firstRowId < expectedNextFirstRowId) {
-                        checkArgument(
-                                file.maxSequenceNumber() < 
latestMaxSequenceNumber,
-                                "Blob/vector-store file with overlapping row 
id should have decreasing sequence number.");
+                if (rowIdPushDown && firstRowId < expectedNextFirstRowId) {
+                    if (file.maxSequenceNumber() > latestMaxSequenceNumber) {
+                        DataFileMeta lastFile = files.remove(files.size() - 1);
+                        rowCount -= lastFile.rowCount();
+                    } else {
                         return;
-                    } else if (firstRowId > expectedNextFirstRowId) {
-                        throw new IllegalArgumentException(
-                                "Blob/vector-store file first row id should be 
continuous, expect "
-                                        + expectedNextFirstRowId
-                                        + " but got "
-                                        + firstRowId);
                     }
+                } else if (firstRowId < expectedNextFirstRowId) {
+                    checkArgument(
+                            file.maxSequenceNumber() < latestMaxSequenceNumber,
+                            "Vector file with overlapping row id should have 
decreasing sequence number.");
+                    return;
+                } else if (!rowIdPushDown && firstRowId > 
expectedNextFirstRowId) {
+                    throw new IllegalArgumentException(
+                            "Vector file first row id should be continuous, 
expect "
+                                    + expectedNextFirstRowId
+                                    + " but got "
+                                    + firstRowId);
                 }
+
                 if (!files.isEmpty()) {
-                    if (!isBlobFile(file.fileName())) {
-                        checkArgument(
-                                file.schemaId() == files.get(0).schemaId(),
-                                "All files in this bunch should have the same 
schema id.");
-                    }
+                    checkArgument(
+                            file.schemaId() == files.get(0).schemaId(),
+                            "All files in this bunch should have the same 
schema id.");
                     checkArgument(
                             file.writeCols().equals(files.get(0).writeCols()),
                             "All files in this bunch should have the same 
write columns.");
                 }
             }
+
             files.add(file);
             rowCount += file.rowCount();
-            if (expectedRowCount >= 0) {
+            if (expectedRowCount > 0) {
                 checkArgument(
                         rowCount <= expectedRowCount,
-                        "Blob/vector-store files row count exceed the expect " 
+ expectedRowCount);
+                        "Vector files row count exceed the expect " + 
expectedRowCount);
             }
-            this.latestMaxSequenceNumber = file.maxSequenceNumber();
-            this.latestFistRowId = file.nonNullFirstRowId();
-            this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
+            latestMaxSequenceNumber = file.maxSequenceNumber();
+            latestFistRowId = file.nonNullFirstRowId();
+            expectedNextFirstRowId = latestFistRowId + file.rowCount();
         }
 
         @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java
new file mode 100644
index 0000000000..c8c2a7f90c
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for blob update scenarios. */
+public class BlobUpdateTest extends TableTestBase {
+
+    private final byte[] blobBytes = randomBytes();
+
+    @Test
+    public void testReadBlobPlaceHolderFallback() throws Exception {
+        createTableDefault();
+        writeDataDefault(
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("first"), new 
BlobData(blobBytes)),
+                        GenericRow.of(
+                                2, BinaryString.fromString("second"), new 
BlobData(blobBytes)),
+                        GenericRow.of(
+                                3, BinaryString.fromString("third"), new 
BlobData(blobBytes))));
+
+        FileStoreTable table = getTableDefault();
+        byte[] updatedBytes = "updated-blob".getBytes();
+        DataFilePathFactory pathFactory =
+                
table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+        DataFileMeta newBlobFile =
+                writeBlobFile(
+                        table.fileIO(),
+                        pathFactory.newBlobPath(),
+                        Arrays.asList(BlobPlaceholder.INSTANCE, new 
BlobData(updatedBytes)),
+                        0,
+                        2,
+                        table.schema().id(),
+                        Collections.singletonList("f2"));
+        commitBlobFiles(newBlobFile);
+
+        List<byte[]> actual = new ArrayList<>();
+        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+        assertThat(actual.size()).isEqualTo(3);
+        assertThat(actual.get(0)).isEqualTo(blobBytes);
+        assertThat(actual.get(1)).isEqualTo(updatedBytes);
+        assertThat(actual.get(2)).isEqualTo(blobBytes);
+    }
+
+    @Test
+    public void testReadBlobPlaceHolderFallbackWithRowIdPushDown() throws 
Exception {
+        createTableDefault();
+
+        List<byte[]> originalBlobs = new ArrayList<>();
+        List<InternalRow> rows = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            byte[] bytes = fixedBlobBytes(i);
+            originalBlobs.add(bytes);
+            rows.add(GenericRow.of(i, BinaryString.fromString("row-" + i), new 
BlobData(bytes)));
+        }
+        writeDataDefault(rows);
+
+        FileStoreTable table = getTableDefault();
+        assertBlobFileRowIdRanges(
+                table,
+                Arrays.asList(
+                        new Range(0L, 2L),
+                        new Range(3L, 5L),
+                        new Range(6L, 8L),
+                        new Range(9L, 9L)));
+
+        DataFilePathFactory pathFactory =
+                
table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+        byte[] updated4 = fixedBlobBytes(44);
+        byte[] updated9 = fixedBlobBytes(99);
+        DataFileMeta updateFile0 =
+                writeBlobFile(
+                        table.fileIO(),
+                        pathFactory.newBlobPath(),
+                        Arrays.asList(
+                                BlobPlaceholder.INSTANCE,
+                                BlobPlaceholder.INSTANCE,
+                                BlobPlaceholder.INSTANCE,
+                                BlobPlaceholder.INSTANCE,
+                                new BlobData(updated4)),
+                        0,
+                        2,
+                        table.schema().id(),
+                        Collections.singletonList("f2"));
+        DataFileMeta updateFile1 =
+                writeBlobFile(
+                        table.fileIO(),
+                        pathFactory.newBlobPath(),
+                        Arrays.asList(
+                                BlobPlaceholder.INSTANCE,
+                                BlobPlaceholder.INSTANCE,
+                                BlobPlaceholder.INSTANCE,
+                                BlobPlaceholder.INSTANCE,
+                                new BlobData(updated9)),
+                        5,
+                        2,
+                        table.schema().id(),
+                        Collections.singletonList("f2"));
+        commitBlobFiles(updateFile0, updateFile1);
+
+        FileStoreTable readTable = getTableDefault();
+        ReadBuilder readBuilder =
+                readTable
+                        .newReadBuilder()
+                        
.withReadType(readTable.rowType().project(Collections.singletonList("f2")))
+                        .withRowRanges(Arrays.asList(new Range(5L, 5L), new 
Range(9L, 9L)));
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        assertThat(plan.splits().size()).isEqualTo(1);
+        DataSplit dataSplit = ((IndexedSplit) 
plan.splits().get(0)).dataSplit();
+        assertThat(dataSplit.dataFiles().size()).isEqualTo(3);
+        RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan);
+
+        List<byte[]> actual = new ArrayList<>();
+        reader.forEachRemaining(row -> actual.add(row.getBlob(0).toData()));
+
+        assertThat(actual.size()).isEqualTo(2);
+        assertThat(actual.get(0)).isEqualTo(originalBlobs.get(5));
+        assertThat(actual.get(1)).isEqualTo(updated9);
+    }
+
+    /**
+     * This test manually simulates the compacted layout described in 
BlobSequenceGroupRecordReader,
+     * scaled down to row id [0, 9].
+     *
+     * <pre>
+     * row id:      0    1    2    3    4    5    6    7    8    9
+     * seq1:      [b0   b1   b2] [b3   b4]  .    .    .    .    .
+     * seq2:      [u20  P]  [P    u23][u24] .    .    .    .    .
+     * seq3:       .    .    .    .    .   [b5   b6   b7   b8   b9]
+     * seq4:       .    .    .    .    .   [P    u46  P]  [u48   P]
+     * seq6:      [P    u61  P    P    P    P    P    P    P   u69]
+     *
+     * RESULT:    u20   u61  b2   u23  u24  b5   u46  b7   u48  u69
+     * </pre>
+     */
+    @Test
+    public void testReadCompactedBlobSequenceGroups() throws Exception {
+        createTableDefault();
+
+        List<byte[]> baseBlobs = new ArrayList<>();
+        List<InternalRow> rows = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            byte[] bytes = fixedBlobBytes(i);
+            baseBlobs.add(bytes);
+            rows.add(GenericRow.of(i, BinaryString.fromString("row-" + i), new 
BlobData(bytes)));
+        }
+        writeDataDefault(rows);
+
+        FileStoreTable table = getTableDefault();
+        List<DataFileMeta> oldBlobFiles = blobFiles(table);
+        assertThat(oldBlobFiles.size()).isEqualTo(4);
+
+        byte[] seq2Row0 = fixedBlobBytes(20);
+        byte[] seq2Row3 = fixedBlobBytes(23);
+        byte[] seq2Row4 = fixedBlobBytes(24);
+        byte[] seq4Row6 = fixedBlobBytes(46);
+        byte[] seq4Row8 = fixedBlobBytes(48);
+        byte[] seq6Row1 = fixedBlobBytes(61);
+        byte[] seq6Row9 = fixedBlobBytes(69);
+
+        DataFilePathFactory pathFactory =
+                
table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+        List<DataFileMeta> compactedLayout =
+                Arrays.asList(
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(
+                                        new BlobData(baseBlobs.get(0)),
+                                        new BlobData(baseBlobs.get(1)),
+                                        new BlobData(baseBlobs.get(2))),
+                                0,
+                                1,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(
+                                        new BlobData(baseBlobs.get(3)),
+                                        new BlobData(baseBlobs.get(4))),
+                                3,
+                                1,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(new BlobData(seq2Row0), 
BlobPlaceholder.INSTANCE),
+                                0,
+                                2,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(BlobPlaceholder.INSTANCE, new 
BlobData(seq2Row3)),
+                                2,
+                                2,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Collections.singletonList(new 
BlobData(seq2Row4)),
+                                4,
+                                2,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(
+                                        new BlobData(baseBlobs.get(5)),
+                                        new BlobData(baseBlobs.get(6)),
+                                        new BlobData(baseBlobs.get(7)),
+                                        new BlobData(baseBlobs.get(8)),
+                                        new BlobData(baseBlobs.get(9))),
+                                5,
+                                3,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(
+                                        BlobPlaceholder.INSTANCE,
+                                        new BlobData(seq4Row6),
+                                        BlobPlaceholder.INSTANCE),
+                                5,
+                                4,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(new BlobData(seq4Row8), 
BlobPlaceholder.INSTANCE),
+                                8,
+                                4,
+                                table.schema().id(),
+                                Collections.singletonList("f2")),
+                        writeBlobFile(
+                                table.fileIO(),
+                                pathFactory.newBlobPath(),
+                                Arrays.asList(
+                                        BlobPlaceholder.INSTANCE,
+                                        new BlobData(seq6Row1),
+                                        BlobPlaceholder.INSTANCE,
+                                        BlobPlaceholder.INSTANCE,
+                                        BlobPlaceholder.INSTANCE,
+                                        BlobPlaceholder.INSTANCE,
+                                        BlobPlaceholder.INSTANCE,
+                                        BlobPlaceholder.INSTANCE,
+                                        BlobPlaceholder.INSTANCE,
+                                        new BlobData(seq6Row9)),
+                                0,
+                                6,
+                                table.schema().id(),
+                                Collections.singletonList("f2")));
+
+        commitCompactFiles(oldBlobFiles, compactedLayout);
+
+        FileStoreTable readTable = getTableDefault();
+        assertBlobFileLayout(
+                readTable,
+                Arrays.asList(
+                        "seq1:0-2",
+                        "seq1:3-4",
+                        "seq2:0-1",
+                        "seq2:2-3",
+                        "seq2:4-4",
+                        "seq3:5-9",
+                        "seq4:5-7",
+                        "seq4:8-9",
+                        "seq6:0-9"));
+
+        ReadBuilder readBuilder = readTable.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+        List<byte[]> actual = new ArrayList<>();
+        reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+        List<byte[]> expected =
+                Arrays.asList(
+                        seq2Row0,
+                        seq6Row1,
+                        baseBlobs.get(2),
+                        seq2Row3,
+                        seq2Row4,
+                        baseBlobs.get(5),
+                        seq4Row6,
+                        baseBlobs.get(7),
+                        seq4Row8,
+                        seq6Row9);
+        assertThat(actual.size()).isEqualTo(expected.size());
+        for (int i = 0; i < expected.size(); i++) {
+            assertThat(actual.get(i)).isEqualTo(expected.get(i));
+        }
+
+        // additionally, assert row id pushdown
+        for (int i = 0; i < expected.size(); i++) {
+            assertRowRangeRead(readTable, i, expected.get(i));
+        }
+    }
+
+    private void commitBlobFiles(DataFileMeta... dataFiles) throws Exception {
+        commitDataFiles(Arrays.asList(dataFiles), Collections.emptyList());
+    }
+
+    private void commitDataFiles(List<DataFileMeta> newFiles, 
List<DataFileMeta> deletedFiles)
+            throws Exception {
+        commitDefault(
+                Collections.singletonList(
+                        new CommitMessageImpl(
+                                BinaryRow.EMPTY_ROW,
+                                0,
+                                null,
+                                new DataIncrement(newFiles, deletedFiles, 
Collections.emptyList()),
+                                CompactIncrement.emptyIncrement())));
+    }
+
+    private void commitCompactFiles(
+            List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) 
throws Exception {
+        commitDefault(
+                Collections.singletonList(
+                        new CommitMessageImpl(
+                                BinaryRow.EMPTY_ROW,
+                                0,
+                                null,
+                                DataIncrement.emptyIncrement(),
+                                new CompactIncrement(
+                                        compactBefore, compactAfter, 
Collections.emptyList()))));
+    }
+
+    private static List<DataFileMeta> blobFiles(FileStoreTable table) {
+        List<DataFileMeta> actual = new ArrayList<>();
+        for (ManifestEntry entry : table.store().newScan().plan().files()) {
+            DataFileMeta file = entry.file();
+            if (BlobFileFormat.isBlobFile(file.fileName())) {
+                actual.add(file);
+            }
+        }
+        return actual;
+    }
+
+    private static void assertBlobFileRowIdRanges(FileStoreTable table, 
List<Range> expected) {
+        List<Range> actual = new ArrayList<>();
+        for (DataFileMeta file : blobFiles(table)) {
+            actual.add(file.nonNullRowIdRange());
+        }
+        Collections.sort(actual, (left, right) -> Long.compare(left.from, 
right.from));
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private static void assertBlobFileLayout(FileStoreTable table, 
List<String> expected) {
+        List<DataFileMeta> files = blobFiles(table);
+        Collections.sort(
+                files,
+                (left, right) -> {
+                    int seqCompare =
+                            Long.compare(left.maxSequenceNumber(), 
right.maxSequenceNumber());
+                    if (seqCompare != 0) {
+                        return seqCompare;
+                    }
+                    int fromCompare =
+                            Long.compare(left.nonNullFirstRowId(), 
right.nonNullFirstRowId());
+                    if (fromCompare != 0) {
+                        return fromCompare;
+                    }
+                    return Long.compare(left.nonNullRowIdRange().to, 
right.nonNullRowIdRange().to);
+                });
+        List<String> actual = new ArrayList<>();
+        for (DataFileMeta file : files) {
+            Range range = file.nonNullRowIdRange();
+            actual.add("seq" + file.maxSequenceNumber() + ":" + range.from + 
"-" + range.to);
+        }
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private static void assertRowRangeRead(FileStoreTable table, long rowId, 
byte[] expected)
+            throws Exception {
+        ReadBuilder readBuilder =
+                table.newReadBuilder()
+                        .withRowRanges(Collections.singletonList(new 
Range(rowId, rowId)));
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+        List<byte[]> actual = new ArrayList<>();
+        reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+        assertThat(actual.size()).isEqualTo(1);
+        assertThat(actual.get(0)).isEqualTo(expected);
+    }
+
+    private static byte[] fixedBlobBytes(int value) {
+        byte[] bytes = new byte[2 * 1024 * 124];
+        Arrays.fill(bytes, (byte) value);
+        return bytes;
+    }
+
+    private static DataFileMeta writeBlobFile(
+            FileIO fileIO,
+            Path path,
+            List<Blob> blobs,
+            long firstRowId,
+            long maxSequenceNumber,
+            long schemaId,
+            List<String> writeCols)
+            throws IOException {
+        try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
+            FormatWriter writer =
+                    new BlobFileFormat()
+                            .createWriterFactory(RowType.of(DataTypes.BLOB()))
+                            .create(out, "none");
+            for (Blob blob : blobs) {
+                writer.addElement(GenericRow.of(blob));
+            }
+            writer.close();
+        }
+        return DataFileMeta.create(
+                path.getName(),
+                fileIO.getFileSize(path),
+                blobs.size(),
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                maxSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                DataFileMeta.DUMMY_LEVEL,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(System.currentTimeMillis()),
+                0L,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                firstRowId,
+                writeCols);
+    }
+
+    @Override
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "700 
KB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        return schemaBuilder.build();
+    }
+
+    @Override
+    protected InternalRow dataDefault(int time, int size) {
+        return GenericRow.of(
+                RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new 
BlobData(blobBytes));
+    }
+
+    @Override
+    protected byte[] randomBytes() {
+        byte[] binary = new byte[2 * 1024 * 124];
+        RANDOM.nextBytes(binary);
+        return binary;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
new file mode 100644
index 0000000000..dcea0a6ff1
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import 
org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link BlobFallbackRecordReader}. */
+public class BlobFallbackRecordReaderTest {
+
+    private static final int BLOB_INDEX = 0;
+    private static final String BLOB_FIELD = "blob_col";
+    private static final RowType READ_ROW_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(BLOB_INDEX, BLOB_FIELD, 
DataTypes.BLOB()),
+                            new DataField(1, SpecialFields.ROW_ID.name(), 
DataTypes.BIGINT()),
+                            new DataField(
+                                    2, SpecialFields.SEQUENCE_NUMBER.name(), 
DataTypes.BIGINT())));
+
+    @Test
+    public void testBlobSequenceGroupReaderWithRowRanges() throws Exception {
+        List<DataFileMeta> files =
+                Arrays.asList(blobFile("blob1", 0, 3, 10), blobFile("blob2", 
5, 2, 10));
+        List<Range> rowRanges = ranges(1, 1, 3, 5);
+
+        ReadResult rows = readSequenceGroup(files, rowRanges, 0, 6, 10);
+
+        assertThat(rows.rowIds).containsExactly(1L, 5L);
+        assertThat(rows.placeholderRowCount).isEqualTo(2);
+        assertThat(rows.batchSizes).containsExactly(1, 2, 1);
+    }
+
+    @Test
+    public void testBlobSequenceGroupReaderWithMultipleRangesInFileAndGap() 
throws Exception {
+        DataFileMeta wideFile = blobFile("wide-file", 20, 31, 10);
+        List<Range> wideFileRanges = ranges(9, 10, 19, 20, 25, 30, 35, 40, 43, 
45, 48, 52, 55, 56);
+
+        ReadResult wideFileRows =
+                readSequenceGroup(Collections.singletonList(wideFile), 
wideFileRanges, 10, 60, 10);
+
+        List<Long> wideFileActualRowIds = rowIdsInRanges(20, 50, 
wideFileRanges);
+        
assertThat(wideFileRows.rowIds).containsExactlyElementsOf(wideFileActualRowIds);
+        assertThat(wideFileRows.placeholderRowCount)
+                .isEqualTo(
+                        rowIdsInRanges(10, 19, wideFileRanges).size()
+                                + rowIdsInRanges(51, 60, 
wideFileRanges).size());
+        assertThat(wideFileRows.batchSizes)
+                .containsExactlyElementsOf(batchSizes(2, 
wideFileActualRowIds.size(), 1, 4));
+
+        DataFileMeta firstFile = blobFile("first-file", 0, 11, 10);
+        DataFileMeta secondFile = blobFile("second-file", 50, 11, 10);
+        List<Range> gapRanges = ranges(0, 0, 9, 12, 25, 30, 35, 40, 43, 45, 
48, 52, 58, 62);
+
+        ReadResult gapRows =
+                readSequenceGroup(Arrays.asList(firstFile, secondFile), 
gapRanges, 0, 62, 10);
+
+        assertThat(gapRows.rowIds)
+                .containsExactlyElementsOf(
+                        concat(
+                                rowIdsInRanges(0, 10, gapRanges),
+                                rowIdsInRanges(50, 60, gapRanges)));
+        assertThat(gapRows.placeholderRowCount)
+                .isEqualTo(
+                        rowIdsInRanges(11, 49, gapRanges).size()
+                                + rowIdsInRanges(61, 62, gapRanges).size());
+        assertThat(gapRows.batchSizes).containsExactly(1, 1, 1, 19, 1, 1, 1, 
1, 1, 1, 2);
+    }
+
+    @Test
+    public void testBlobFallbackRecordReader() throws Exception {
+        DataFileMeta newFile = blobFile("new-file", 0, 3, 2);
+        DataFileMeta oldFile = blobFile("old-file", 0, 5, 1);
+
+        ReadResult rows =
+                readFallback(Arrays.asList(newFile, oldFile), null, 
placeholderRows(newFile, 1));
+
+        assertThat(rows.rowIds).containsExactly(0L, 1L, 2L, 3L, 4L);
+        assertThat(rows.sequenceNumbers).containsExactly(2L, 1L, 2L, 1L, 1L);
+    }
+
+    @Test
+    public void testBlobFallbackRecordReaderDerivesRowIdBoundsFromFiles() 
throws Exception {
+        DataFileMeta newFile = blobFile("new-file", 10, 2, 2);
+        DataFileMeta oldFile = blobFile("old-file", 8, 6, 1);
+
+        ReadResult rows =
+                readFallback(Arrays.asList(newFile, oldFile), null, 
placeholderRows(newFile, 10));
+
+        assertThat(rows.rowIds).containsExactly(8L, 9L, 10L, 11L, 12L, 13L);
+        assertThat(rows.sequenceNumbers).containsExactly(1L, 1L, 1L, 2L, 1L, 
1L);
+    }
+
+    @Test
+    public void testBlobFallbackRecordReaderThrowsIfAllRowsArePlaceholders() {
+        DataFileMeta newFile = blobFile("new-placeholder-file", 0, 1, 2);
+        DataFileMeta oldFile = blobFile("old-placeholder-file", 0, 1, 1);
+
+        assertThatThrownBy(
+                        () ->
+                                readFallback(
+                                        Arrays.asList(newFile, oldFile),
+                                        null,
+                                        placeholderRows(newFile, 0, oldFile, 
0)))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("all blob files at the same row id store 
a placeholder");
+    }
+
+    @Test
+    public void testBlobFallbackRecordReaderWithRowRanges() throws Exception {
+        DataFileMeta oldFile = blobFile("old-file", 20, 26, 1);
+        DataFileMeta newFile1 = blobFile("new-file-1", 20, 11, 2);
+        DataFileMeta newFile2 = blobFile("new-file-2", 40, 6, 2);
+        List<Range> rowRanges = ranges(15, 20, 25, 26, 29, 33, 35, 41);
+
+        ReadResult rows =
+                readFallback(
+                        Arrays.asList(newFile2, oldFile, newFile1),
+                        rowRanges,
+                        Collections.emptySet());
+
+        assertThat(rows.rowIds)
+                .containsExactly(
+                        20L, 25L, 26L, 29L, 30L, 31L, 32L, 33L, 35L, 36L, 37L, 
38L, 39L, 40L, 41L);
+        assertThat(rows.sequenceNumbers)
+                .containsExactly(2L, 2L, 2L, 2L, 2L, 1L, 1L, 1L, 1L, 1L, 1L, 
1L, 1L, 2L, 2L);
+    }
+
+    private static ReadResult readFallback(
+            List<DataFileMeta> files, List<Range> rowRanges, Set<String> 
placeholderRows)
+            throws Exception {
+        return ReadResult.read(
+                new BlobFallbackRecordReader(
+                        files,
+                        file -> oneRowPerBatchReader(fileRows(file, rowRanges, 
placeholderRows)),
+                        rowRanges,
+                        READ_ROW_TYPE,
+                        BLOB_INDEX));
+    }
+
+    private static ReadResult readSequenceGroup(
+            List<DataFileMeta> files,
+            List<Range> rowRanges,
+            long firstRowId,
+            long lastRowId,
+            long sequenceNumber)
+            throws Exception {
+        return ReadResult.read(
+                new BlobSequenceGroupRecordReader(
+                        files,
+                        file -> oneRowPerBatchReader(fileRows(file, 
rowRanges)),
+                        rowRanges,
+                        READ_ROW_TYPE,
+                        BLOB_INDEX,
+                        firstRowId,
+                        lastRowId));
+    }
+
+    private static DataFileMeta blobFile(
+            String fileName, long firstRowId, long rowCount, long 
maxSequenceNumber) {
+        return DataFileMeta.create(
+                fileName + ".blob",
+                rowCount,
+                rowCount,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                0,
+                maxSequenceNumber,
+                0L,
+                DataFileMeta.DUMMY_LEVEL,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(System.currentTimeMillis()),
+                rowCount,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                firstRowId,
+                Arrays.asList(BLOB_FIELD));
+    }
+
+    private static List<Range> ranges(long... bounds) {
+        if (bounds.length % 2 != 0) {
+            throw new IllegalArgumentException("Range bounds should be 
paired.");
+        }
+
+        List<Range> ranges = new ArrayList<>();
+        for (int i = 0; i < bounds.length; i += 2) {
+            ranges.add(new Range(bounds[i], bounds[i + 1]));
+        }
+        return ranges;
+    }
+
+    private static List<InternalRow> fileRows(DataFileMeta file, List<Range> 
rowRanges) {
+        return fileRows(file, rowRanges, Collections.emptySet());
+    }
+
+    private static List<InternalRow> fileRows(
+            DataFileMeta file, List<Range> rowRanges, Set<String> 
placeholderRows) {
+        List<InternalRow> rows = new ArrayList<>();
+        long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1;
+        for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; 
rowId++) {
+            if (selected(rowId, rowRanges)) {
+                rows.add(
+                        blobRow(
+                                rowId,
+                                file.maxSequenceNumber(),
+                                placeholderRows.contains(rowKey(file, 
rowId))));
+            }
+        }
+        return rows;
+    }
+
+    private static boolean selected(long rowId, List<Range> rowRanges) {
+        if (rowRanges == null) {
+            return true;
+        }
+        for (Range range : rowRanges) {
+            if (rowId < range.from) {
+                return false;
+            }
+            if (rowId <= range.to) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static List<Long> rowIdsInRanges(long firstRowId, long lastRowId, 
List<Range> ranges) {
+        List<Long> rowIds = new ArrayList<>();
+        for (long rowId = firstRowId; rowId <= lastRowId; rowId++) {
+            if (selected(rowId, ranges)) {
+                rowIds.add(rowId);
+            }
+        }
+        return rowIds;
+    }
+
+    private static List<Long> concat(List<Long> first, List<Long> second) {
+        List<Long> all = new ArrayList<>(first);
+        all.addAll(second);
+        return all;
+    }
+
+    private static List<Integer> batchSizes(
+            int firstBatchSize, int repeatedBatchCount, int repeatedBatchSize, 
int lastBatchSize) {
+        List<Integer> batchSizes = new ArrayList<>();
+        batchSizes.add(firstBatchSize);
+        for (int i = 0; i < repeatedBatchCount; i++) {
+            batchSizes.add(repeatedBatchSize);
+        }
+        batchSizes.add(lastBatchSize);
+        return batchSizes;
+    }
+
+    private static Set<String> placeholderRows(DataFileMeta file, long rowId) {
+        Set<String> keys = new HashSet<>();
+        keys.add(rowKey(file, rowId));
+        return keys;
+    }
+
+    private static Set<String> placeholderRows(
+            DataFileMeta firstFile, long firstRowId, DataFileMeta secondFile, 
long secondRowId) {
+        Set<String> keys = placeholderRows(firstFile, firstRowId);
+        keys.add(rowKey(secondFile, secondRowId));
+        return keys;
+    }
+
+    private static String rowKey(DataFileMeta file, long rowId) {
+        return file.fileName() + "#" + rowId;
+    }
+
+    private static InternalRow blobRow(long rowId, long sequenceNumber, 
boolean placeholder) {
+        GenericRow row = new GenericRow(3);
+        row.setField(
+                BLOB_INDEX,
+                placeholder ? BlobPlaceholder.INSTANCE : new BlobData(new 
byte[] {(byte) rowId}));
+        row.setField(1, rowId);
+        row.setField(2, sequenceNumber);
+        return row;
+    }
+
+    private static RecordReader<InternalRow> 
oneRowPerBatchReader(List<InternalRow> rows) {
+        return new RecordReader<InternalRow>() {
+
+            int index;
+
+            @Override
+            public RecordIterator<InternalRow> readBatch() {
+                if (index >= rows.size()) {
+                    return null;
+                }
+                InternalRow row = rows.get(index++);
+                return new RecordIterator<InternalRow>() {
+
+                    boolean returned;
+
+                    @Override
+                    public InternalRow next() {
+                        if (returned) {
+                            return null;
+                        }
+                        returned = true;
+                        return row;
+                    }
+
+                    @Override
+                    public void releaseBatch() {}
+                };
+            }
+
+            @Override
+            public void close() {}
+        };
+    }
+
+    private static class ReadResult {
+        final List<Long> rowIds = new ArrayList<>();
+        final List<Long> sequenceNumbers = new ArrayList<>();
+        final List<Integer> batchSizes = new ArrayList<>();
+        int placeholderRowCount;
+
+        static ReadResult read(RecordReader<InternalRow> reader) throws 
Exception {
+            try {
+                ReadResult result = new ReadResult();
+                RecordIterator<InternalRow> batch;
+                while ((batch = reader.readBatch()) != null) {
+                    int batchSize = 0;
+                    InternalRow row;
+                    while ((row = batch.next()) != null) {
+                        result.add(row);
+                        batchSize++;
+                    }
+                    batch.releaseBatch();
+                    result.batchSizes.add(batchSize);
+                }
+                return result;
+            } finally {
+                reader.close();
+            }
+        }
+
+        private void add(InternalRow row) {
+            if (row.getBlob(BLOB_INDEX) == BlobPlaceholder.INSTANCE) {
+                placeholderRowCount++;
+            } else {
+                rowIds.add(row.getLong(1));
+                sequenceNumbers.add(row.getLong(2));
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index d6e6998e6c..51a0a93aa2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -21,8 +21,9 @@ package org.apache.paimon.operation;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.DataEvolutionSplitRead.BlobFileBunch;
 import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
-import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch;
+import org.apache.paimon.operation.DataEvolutionSplitRead.VectorFileBunch;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -42,150 +43,149 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link SpecialFieldBunch}. */
+/** Tests for blob and vector field bunches. */
 public class DataEvolutionReadTest {
 
-    private SpecialFieldBunch blobBunch;
+    private VectorFileBunch vectorBunch;
 
     @BeforeEach
     public void setUp() {
-        blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false);
+        vectorBunch = new VectorFileBunch(Long.MAX_VALUE, false);
     }
 
     @Test
-    public void testAddSingleBlobEntry() {
-        DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L);
+    public void testAddSingleVectorEntry() {
+        DataFileMeta vectorEntry = createVectorFile("vector1", 0L, 100L, 1L);
 
-        blobBunch.add(blobEntry);
+        vectorBunch.add(vectorEntry);
 
-        assertThat(blobBunch.files).hasSize(1);
-        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
-        assertThat(blobBunch.rowCount()).isEqualTo(100);
-        assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
-        
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+        assertThat(vectorBunch.files).hasSize(1);
+        assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+        assertThat(vectorBunch.rowCount()).isEqualTo(100);
+        assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+        
assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
     }
 
     @Test
-    public void testAddBlobEntryAndTail() {
-        DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1);
-        DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1);
-
-        blobBunch.add(blobEntry);
-        blobBunch.add(blobTail);
-
-        assertThat(blobBunch.files).hasSize(2);
-        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
-        assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
-        assertThat(blobBunch.rowCount()).isEqualTo(300);
-        assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
-        
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
-        assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
+    public void testAddVectorEntryAndTail() {
+        DataFileMeta vectorEntry = createVectorFile("vector1", 0, 100, 1);
+        DataFileMeta vectorTail = createVectorFile("vector2", 100, 200, 1);
+
+        vectorBunch.add(vectorEntry);
+        vectorBunch.add(vectorTail);
+
+        assertThat(vectorBunch.files).hasSize(2);
+        assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+        assertThat(vectorBunch.files.get(1)).isEqualTo(vectorTail);
+        assertThat(vectorBunch.rowCount()).isEqualTo(300);
+        assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+        
assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
+        assertThat(vectorBunch.files.get(0).schemaId()).isEqualTo(0L);
     }
 
     @Test
-    public void testAddNonBlobFileThrowsException() {
+    public void testAddNonVectorFileThrowsException() {
         DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100, 
1, 0L);
 
-        assertThatThrownBy(() -> blobBunch.add(normalFile))
+        assertThatThrownBy(() -> vectorBunch.add(normalFile))
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("Only blob/vector-store file can be added to this 
bunch.");
+                .hasMessage("Only vector-store file can be added to this 
bunch.");
     }
 
     @Test
-    public void testAddBlobFileWithSameFirstRowId() {
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
-        DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2);
+    public void testAddVectorFileWithSameFirstRowId() {
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+        DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 2);
 
-        blobBunch.add(blobEntry1);
+        vectorBunch.add(vectorEntry1);
         // Adding file with same firstRowId but higher sequence number should 
throw exception
-        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+        assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
-                        "Blob/vector-store file with same first row id should 
have decreasing sequence number.");
+                        "Vector file with same first row id should have 
decreasing sequence number.");
     }
 
     @Test
-    public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() {
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
-        DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1);
+    public void testAddVectorFileWithSameFirstRowIdAndLowerSequenceNumber() {
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+        DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 1);
 
-        blobBunch.add(blobEntry1);
+        vectorBunch.add(vectorEntry1);
         // Adding file with same firstRowId and lower sequence number should 
be ignored
-        blobBunch.add(blobEntry2);
+        vectorBunch.add(vectorEntry2);
 
-        assertThat(blobBunch.files).hasSize(1);
-        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+        assertThat(vectorBunch.files).hasSize(1);
+        assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
     }
 
     @Test
-    public void testAddBlobFileWithOverlappingRowId() {
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
-        DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1);
+    public void testAddVectorFileWithOverlappingRowId() {
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+        DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 1);
 
-        blobBunch.add(blobEntry1);
+        vectorBunch.add(vectorEntry1);
         // Adding file with overlapping row id and lower sequence number 
should be ignored
-        blobBunch.add(blobEntry2);
+        vectorBunch.add(vectorEntry2);
 
-        assertThat(blobBunch.files).hasSize(1);
-        assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+        assertThat(vectorBunch.files).hasSize(1);
+        assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
     }
 
     @Test
-    public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() {
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
-        DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2);
+    public void testAddVectorFileWithOverlappingRowIdAndHigherSequenceNumber() 
{
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+        DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 2);
 
-        blobBunch.add(blobEntry1);
+        vectorBunch.add(vectorEntry1);
         // Adding file with overlapping row id and higher sequence number 
should throw exception
-        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+        assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
-                        "Blob/vector-store file with overlapping row id should 
have decreasing sequence number.");
+                        "Vector file with overlapping row id should have 
decreasing sequence number.");
     }
 
     @Test
-    public void testAddBlobFileWithNonContinuousRowId() {
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
-        DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+    public void testAddVectorFileWithNonContinuousRowId() {
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+        DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
 
-        blobBunch.add(blobEntry1);
+        vectorBunch.add(vectorEntry1);
         // Adding file with non-continuous row id should throw exception
-        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+        assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
-                        "Blob/vector-store file first row id should be 
continuous, expect 100 but got 200");
+                        "Vector file first row id should be continuous, expect 
100 but got 200");
     }
 
     @Test
-    public void testAddBlobFileWithDifferentWriteCols() {
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
-        DataFileMeta blobEntry2 =
-                createBlobFileWithCols("blob2", 100, 200, 1, 
Arrays.asList("different_col"));
+    public void testAddVectorFileWithDifferentWriteCols() {
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+        DataFileMeta vectorEntry2 =
+                createVectorFileWithCols("vector2", 100, 200, 1, 
Arrays.asList("different_col"));
 
-        blobBunch.add(blobEntry1);
+        vectorBunch.add(vectorEntry1);
         // Adding file with different write columns should throw exception
-        assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+        assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage("All files in this bunch should have the same 
write columns.");
     }
 
     @Test
-    public void testComplexBlobBunchScenario() {
-        // Create a complex scenario with multiple blob entries and a tail
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
-        DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1);
-        DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1);
-        DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1);
-
-        blobBunch.add(blobEntry1);
-        blobBunch.add(blobEntry2);
-        blobBunch.add(blobEntry3);
-        blobBunch.add(blobTail);
-
-        assertThat(blobBunch.files).hasSize(4);
-        assertThat(blobBunch.rowCount()).isEqualTo(1000);
-        assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
-        
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+    public void testComplexVectorBunchScenario() {
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+        DataFileMeta vectorEntry2 = createVectorFile("vector2", 100, 200, 1);
+        DataFileMeta vectorEntry3 = createVectorFile("vector3", 300, 300, 1);
+        DataFileMeta vectorTail = createVectorFile("vector4", 600, 400, 1);
+
+        vectorBunch.add(vectorEntry1);
+        vectorBunch.add(vectorEntry2);
+        vectorBunch.add(vectorEntry3);
+        vectorBunch.add(vectorTail);
+
+        assertThat(vectorBunch.files).hasSize(4);
+        assertThat(vectorBunch.rowCount()).isEqualTo(1000);
+        assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+        
assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
     }
 
     @Test
@@ -209,26 +209,31 @@ public class DataEvolutionReadTest {
 
         List<DataFileMeta> batch = batches.get(0);
 
-        assertThat(batch.get(1).fileName()).contains("blob5"); // pick
-        assertThat(batch.get(2).fileName()).contains("blob2"); // skip
-        assertThat(batch.get(3).fileName()).contains("blob1"); // skip
-        assertThat(batch.get(4).fileName()).contains("blob9"); // pick
-        assertThat(batch.get(5).fileName()).contains("blob6"); // skip
-        assertThat(batch.get(6).fileName()).contains("blob3"); // skip
-        assertThat(batch.get(7).fileName()).contains("blob7"); // pick
-        assertThat(batch.get(8).fileName()).contains("blob4"); // skip
-        assertThat(batch.get(9).fileName()).contains("blob8"); // pick
+        assertThat(batch.get(1).fileName()).contains("blob5");
+        assertThat(batch.get(2).fileName()).contains("blob2");
+        assertThat(batch.get(3).fileName()).contains("blob1");
+        assertThat(batch.get(4).fileName()).contains("blob9");
+        assertThat(batch.get(5).fileName()).contains("blob6");
+        assertThat(batch.get(6).fileName()).contains("blob3");
+        assertThat(batch.get(7).fileName()).contains("blob7");
+        assertThat(batch.get(8).fileName()).contains("blob4");
+        assertThat(batch.get(9).fileName()).contains("blob8");
 
         List<FieldBunch> fieldBunches =
                 splitFieldBunches(batch, file -> 
makeBlobRowType(file.writeCols(), f -> 0));
         assertThat(fieldBunches.size()).isEqualTo(2);
 
-        SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
-        assertThat(blobBunch.files).hasSize(4);
+        BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+        assertThat(blobBunch.files).hasSize(9);
         assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
-        assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
-        assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
-        assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+        assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+        assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+        assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+        assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+        assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+        assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+        assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+        assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
     }
 
     @Test
@@ -275,19 +280,29 @@ public class DataEvolutionReadTest {
                         batch, file -> makeBlobRowType(file.writeCols(), 
String::hashCode));
         assertThat(fieldBunches.size()).isEqualTo(3);
 
-        SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
-        assertThat(blobBunch.files).hasSize(4);
+        BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+        assertThat(blobBunch.files).hasSize(9);
         assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
-        assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
-        assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
-        assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
-
-        blobBunch = (SpecialFieldBunch) fieldBunches.get(2);
-        assertThat(blobBunch.files).hasSize(4);
+        assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+        assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+        assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+        assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+        assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+        assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+        assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+        assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
+
+        blobBunch = (BlobFileBunch) fieldBunches.get(2);
+        assertThat(blobBunch.files).hasSize(9);
         assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
-        assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
-        assertThat(blobBunch.files.get(2).fileName()).contains("blob17");
-        assertThat(blobBunch.files.get(3).fileName()).contains("blob18");
+        assertThat(blobBunch.files.get(1).fileName()).contains("blob12");
+        assertThat(blobBunch.files.get(2).fileName()).contains("blob11");
+        assertThat(blobBunch.files.get(3).fileName()).contains("blob19");
+        assertThat(blobBunch.files.get(4).fileName()).contains("blob16");
+        assertThat(blobBunch.files.get(5).fileName()).contains("blob13");
+        assertThat(blobBunch.files.get(6).fileName()).contains("blob17");
+        assertThat(blobBunch.files.get(7).fileName()).contains("blob14");
+        assertThat(blobBunch.files.get(8).fileName()).contains("blob18");
     }
 
     @Test
@@ -372,8 +387,81 @@ public class DataEvolutionReadTest {
                 writeCols);
     }
 
+    private DataFileMeta createVectorFile(
+            String fileName, long firstRowId, long rowCount, long 
maxSequenceNumber) {
+        return createVectorFileWithCols(
+                fileName, firstRowId, rowCount, maxSequenceNumber, 
Arrays.asList("vector_col"));
+    }
+
+    private DataFileMeta createVectorFileWithSchema(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSequenceNumber,
+            long schemaId) {
+        return createFile(
+                fileName + ".vector.avro",
+                firstRowId,
+                rowCount,
+                maxSequenceNumber,
+                schemaId,
+                Arrays.asList("vector_col"));
+    }
+
+    private DataFileMeta createVectorFileWithCols(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSequenceNumber,
+            List<String> writeCols) {
+        return createFile(
+                fileName + ".vector.avro", firstRowId, rowCount, 
maxSequenceNumber, 0L, writeCols);
+    }
+
+    private DataFileMeta createFile(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSequenceNumber,
+            long schemaId,
+            List<String> writeCols) {
+        return DataFileMeta.create(
+                fileName,
+                rowCount,
+                rowCount,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                0,
+                maxSequenceNumber,
+                schemaId,
+                DataFileMeta.DUMMY_LEVEL,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(System.currentTimeMillis()),
+                rowCount,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                firstRowId,
+                writeCols);
+    }
+
+    @Test
+    void testAddVectorFilesWithDifferentSchemaId() {
+        DataFileMeta vectorEntry1 = createVectorFileWithSchema("vector1", 0, 
100, 1, 0L);
+        DataFileMeta vectorEntry2 = createVectorFileWithSchema("vector2", 100, 
200, 1, 1L);
+
+        vectorBunch.add(vectorEntry1);
+        assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("All files in this bunch should have the same 
schema id.");
+    }
+
     @Test
     void testAddBlobFilesWithDifferentSchemaId() {
+        BlobFileBunch blobBunch = new BlobFileBunch(300, false);
         DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 
0L);
         DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 
1, 1L);
 
@@ -388,24 +476,24 @@ public class DataEvolutionReadTest {
 
     @Test
     public void testRowIdPushDown() {
-        SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, 
true);
-        DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
-        DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
-        blobBunch.add(blobEntry1);
-        SpecialFieldBunch finalBlobBunch = blobBunch;
-        DataFileMeta finalBlobEntry = blobEntry2;
-        assertThatCode(() -> 
finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
-
-        blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
-        blobEntry1 = createBlobFile("blob1", 0, 100, 1);
-        blobEntry2 = createBlobFile("blob2", 50, 200, 2);
-        blobBunch.add(blobEntry1);
-        blobBunch.add(blobEntry2);
-        assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
-
-        SpecialFieldBunch finalBlobBunch2 = blobBunch;
-        DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2);
-        assertThatCode(() -> 
finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException();
+        VectorFileBunch vectorBunch = new VectorFileBunch(Long.MAX_VALUE, 
true);
+        DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+        DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
+        vectorBunch.add(vectorEntry1);
+        VectorFileBunch finalVectorBunch = vectorBunch;
+        DataFileMeta finalVectorEntry = vectorEntry2;
+        assertThatCode(() -> 
finalVectorBunch.add(finalVectorEntry)).doesNotThrowAnyException();
+
+        vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true);
+        vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+        vectorEntry2 = createVectorFile("vector2", 50, 200, 2);
+        vectorBunch.add(vectorEntry1);
+        vectorBunch.add(vectorEntry2);
+        assertThat(vectorBunch.files).containsExactlyInAnyOrder(vectorEntry2);
+
+        VectorFileBunch finalVectorBunch2 = vectorBunch;
+        DataFileMeta vectorEntry3 = createVectorFile("vector2", 250, 100, 2);
+        assertThatCode(() -> 
finalVectorBunch2.add(vectorEntry3)).doesNotThrowAnyException();
     }
 
     /** Creates a normal (non-blob) file for testing. */
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index c91dbb4c8e..e1bd365b23 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -79,8 +79,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchMergeHandler;
 import org.apache.paimon.utils.RoaringBitmap32;
 
-import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
-
 import org.apache.commons.math3.random.RandomDataGenerator;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -1031,10 +1029,9 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                                             + "."
                                             + CoreOptions.COLUMNS,
                                     "price");
-                            options.set(ParquetOutputFormat.BLOCK_SIZE, 
"1048576");
-                            options.set(
-                                    
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-                            
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
+                            options.set("parquet.block.size", "1048576");
+                            options.set("parquet.page.size.row.check.min", 
"100");
+                            options.set("parquet.page.row.count.limit", "300");
                         });
 
         int bound = 300000;
@@ -1116,9 +1113,9 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                                     + "."
                                     + CoreOptions.COLUMNS,
                             "price");
-                    options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
-                    
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-                    options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                    options.set("parquet.block.size", "1048576");
+                    options.set("parquet.page.size.row.check.min", "100");
+                    options.set("parquet.page.row.count.limit", "300");
                 };
         // in unaware-bucket mode, we split files into splits all the time
         FileStoreTable table = createUnawareBucketFileStoreTable(rowType, 
configure);
@@ -1215,9 +1212,9 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                     options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
                     options.set(WRITE_ONLY, true);
                     options.set(SOURCE_SPLIT_TARGET_SIZE, 
MemorySize.ofBytes(1));
-                    options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
-                    
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-                    options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                    options.set("parquet.block.size", "1048576");
+                    options.set("parquet.page.size.row.check.min", "100");
+                    options.set("parquet.page.row.count.limit", "300");
                 };
         // in unaware-bucket mode, we split files into splits all the time
         FileStoreTable table = createUnawareBucketFileStoreTable(rowType, 
configure);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 80cfb975bd..82b097d894 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -82,8 +82,6 @@ import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.RoaringBitmap32;
 
-import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
-
 import org.apache.commons.math3.random.RandomDataGenerator;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -1139,9 +1137,9 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                             conf.set(BUCKET, 1);
                             conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
                             conf.set(DELETION_VECTORS_ENABLED, true);
-                            conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
-                            
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-                            conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                            conf.set("parquet.block.size", "524288");
+                            conf.set("parquet.page.size.row.check.min", "100");
+                            conf.set("parquet.page.row.count.limit", "300");
                             conf.set("file-index.bitmap.columns", "b");
                         });
 
@@ -1248,9 +1246,9 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                             conf.set(BUCKET, 1);
                             conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
                             conf.set(DELETION_VECTORS_ENABLED, true);
-                            conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
-                            
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-                            conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                            conf.set("parquet.block.size", "524288");
+                            conf.set("parquet.page.size.row.check.min", "100");
+                            conf.set("parquet.page.row.count.limit", "300");
                             conf.set("file-index.range-bitmap.columns", 
indexColumnName);
                         });
 
@@ -1335,9 +1333,9 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                             conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
                             conf.set(DELETION_VECTORS_ENABLED, true);
                             conf.set(SOURCE_SPLIT_TARGET_SIZE, 
MemorySize.ofBytes(1));
-                            conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
-                            
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-                            conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, 
"300");
+                            conf.set("parquet.block.size", "524288");
+                            conf.set("parquet.page.size.row.check.min", "100");
+                            conf.set("parquet.page.row.count.limit", "300");
                         });
 
         int rowCount = 10000;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index 27a1ed5617..344e1325a3 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -55,7 +55,7 @@ public class BlobFileMeta {
         long[] blobOffsets = new long[blobLengths.length];
         long offset = 0;
         for (int i = 0; i < blobLengths.length; i++) {
-            if (blobLengths[i] == -1) {
+            if (blobLengths[i] < 0) {
                 blobOffsets[i] = -1;
             } else {
                 blobOffsets[i] = offset;
@@ -86,7 +86,11 @@ public class BlobFileMeta {
     }
 
     public boolean isNull(int i) {
-        return blobLengths[i] == -1;
+        return blobLengths[i] == BlobFormatWriter.NULL_LENGTH;
+    }
+
+    public boolean isPlaceHolder(int i) {
+        return blobLengths[i] == BlobFormatWriter.PLACE_HOLDER_LENGTH;
     }
 
     public long blobLength(int i) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 801964b862..73222ea575 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.blob;
 
 import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobPlaceholder;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
@@ -94,6 +95,8 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
                 Blob blob;
                 if (fileMeta.isNull(currentPosition)) {
                     blob = null;
+                } else if (fileMeta.isPlaceHolder(currentPosition)) {
+                    blob = BlobPlaceholder.INSTANCE;
                 } else {
                     long offset = fileMeta.blobOffset(currentPosition) + 4;
                     long length = fileMeta.blobLength(currentPosition) - 16;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
index 83d6c0eb91..f97158c171 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.blob;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobConsumer;
 import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobPlaceholder;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileAwareFormatWriter;
 import org.apache.paimon.format.FormatWriter;
@@ -46,6 +47,8 @@ public class BlobFormatWriter implements 
FileAwareFormatWriter {
     public static final byte VERSION = 1;
     public static final int MAGIC_NUMBER = 1481511375;
     public static final byte[] MAGIC_NUMBER_BYTES = 
intToLittleEndian(MAGIC_NUMBER);
+    public static final long NULL_LENGTH = -1L;
+    public static final long PLACE_HOLDER_LENGTH = -2L;
 
     private final PositionOutputStream out;
     @Nullable private final BlobConsumer writeConsumer;
@@ -81,13 +84,17 @@ public class BlobFormatWriter implements 
FileAwareFormatWriter {
     public void addElement(InternalRow element) throws IOException {
         checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only 
support one field.");
         if (element.isNullAt(0)) {
-            lengths.add(-1L);
+            lengths.add(NULL_LENGTH);
             if (writeConsumer != null) {
                 writeConsumer.accept(blobFieldName, null);
             }
             return;
         }
         Blob blob = element.getBlob(0);
+        if (blob == BlobPlaceholder.INSTANCE) {
+            lengths.add(PLACE_HOLDER_LENGTH);
+            return;
+        }
 
         long previousPos = out.getPos();
         crc32.reset();
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 0b66e15c2b..6c7542bfcb 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.blob;
 
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
 import org.apache.paimon.data.BlobRef;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -79,16 +80,22 @@ public class BlobFileFormatTest {
 
         // write
         FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
-        List<byte[]> blobs =
-                Arrays.asList("hello".getBytes(), null, "world".getBytes(), 
new byte[0]);
+        List<Object> blobs =
+                Arrays.asList(
+                        "hello".getBytes(),
+                        null,
+                        BlobPlaceholder.INSTANCE,
+                        "world".getBytes(),
+                        new byte[0]);
         try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
             FormatWriter formatWriter = writerFactory.create(out, null);
-            for (byte[] bytes : blobs) {
-                if (bytes == null) {
+            for (Object blob : blobs) {
+                if (blob == null) {
                     formatWriter.addElement(GenericRow.of((Object) null));
-                    continue;
+                } else if (blob == BlobPlaceholder.INSTANCE) {
+                    
formatWriter.addElement(GenericRow.of(BlobPlaceholder.INSTANCE));
                 } else {
-                    formatWriter.addElement(GenericRow.of(new 
BlobData(bytes)));
+                    formatWriter.addElement(GenericRow.of(new 
BlobData((byte[]) blob)));
                 }
             }
             formatWriter.close();
@@ -98,7 +105,7 @@ public class BlobFileFormatTest {
         FormatReaderFactory readerFactory = format.createReaderFactory(null, 
rowType, null);
         FormatReaderContext context =
                 new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file));
-        List<byte[]> result = new ArrayList<>();
+        List<Object> result = new ArrayList<>();
         readerFactory
                 .createReader(context)
                 .forEachRemaining(
@@ -107,7 +114,10 @@ public class BlobFileFormatTest {
                                 result.add(null);
                             } else {
                                 Blob blob = row.getBlob(0);
-                                if (blobAsDescriptor) {
+                                if (blob == BlobPlaceholder.INSTANCE) {
+                                    result.add(BlobPlaceholder.INSTANCE);
+                                    return;
+                                } else if (blobAsDescriptor) {
                                     
assertThat(blob).isInstanceOf(BlobRef.class);
                                 } else {
                                     
assertThat(blob).isInstanceOf(BlobData.class);
@@ -117,19 +127,23 @@ public class BlobFileFormatTest {
                         });
 
         // assert
-        assertThat(result).containsExactlyElementsOf(blobs);
+        assertThat(result).hasSize(blobs.size());
+        assertThat((byte[]) result.get(0)).isEqualTo((byte[]) blobs.get(0));
+        assertThat(result.get(1)).isNull();
+        assertThat(result.get(2)).isSameAs(BlobPlaceholder.INSTANCE);
+        assertThat((byte[]) result.get(3)).isEqualTo((byte[]) blobs.get(3));
+        assertThat((byte[]) result.get(4)).isEqualTo((byte[]) blobs.get(4));
 
         // read with selection
         RoaringBitmap32 selection = new RoaringBitmap32();
         selection.add(2);
         context = new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file), selection);
         result.clear();
-        readerFactory
-                .createReader(context)
-                .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
+        readerFactory.createReader(context).forEachRemaining(row -> 
result.add(row.getBlob(0)));
 
         // assert
-        assertThat(result).containsOnly(blobs.get(2));
+        assertThat(result).hasSize(1);
+        assertThat(result.get(0)).isSameAs(BlobPlaceholder.INSTANCE);
     }
 
     @Test
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index e7e65394aa..355fb36dc4 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -32,6 +32,8 @@ from pypaimon.table.row.row_kind import RowKind
 
 
 class FormatBlobReader(RecordBatchReader):
+    NULL_LENGTH = -1
+    PLACE_HOLDER_LENGTH = -2
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
                  full_fields: List[DataField], push_down_predicate: Any, 
blob_as_descriptor: bool,
@@ -97,6 +99,10 @@ class FormatBlobReader(RecordBatchReader):
                 for field_name in self._fields:
                     if blob is None:
                         pydict_data[field_name].append(None)
+                    elif blob is Blob.PLACE_HOLDER:
+                        raise RuntimeError(
+                            "Blob placeholder is not supported by 
FormatBlobReader yet."
+                        )
                     elif self._blob_as_descriptor:
                         
pydict_data[field_name].append(blob.to_descriptor().serialize())
                     else:
@@ -163,7 +169,7 @@ class FormatBlobReader(RecordBatchReader):
             blob_offsets = []
             offset = 0
             for length in blob_lengths:
-                if length == -1:
+                if length < 0:
                     blob_offsets.append(-1)
                 else:
                     blob_offsets.append(offset)
@@ -175,6 +181,8 @@ class FormatBlobReader(RecordBatchReader):
 class BlobRecordIterator:
     MAGIC_NUMBER_SIZE = 4
     METADATA_OVERHEAD = 16
+    NULL_LENGTH = -1
+    PLACE_HOLDER_LENGTH = -2
 
     def __init__(self, file_io: FileIO, file_path: str, blob_lengths: 
List[int],
                  blob_offsets: List[int], field_name: str):
@@ -192,13 +200,17 @@ class BlobRecordIterator:
         if self.current_position >= len(self.blob_lengths):
             raise StopIteration
         fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
-        if self.blob_lengths[self.current_position] == -1:
+        length = self.blob_lengths[self.current_position]
+        if length == self.NULL_LENGTH:
             self.current_position += 1
             return GenericRow([None], fields, RowKind.INSERT)
+        if length == self.PLACE_HOLDER_LENGTH:
+            self.current_position += 1
+            return GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT)
         # Create blob reference for the current blob
         # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 
bytes) = 12 bytes
         blob_offset = self.blob_offsets[self.current_position] + 
self.MAGIC_NUMBER_SIZE  # Skip magic number
-        blob_length = self.blob_lengths[self.current_position] - 
self.METADATA_OVERHEAD
+        blob_length = length - self.METADATA_OVERHEAD
         blob = Blob.from_file(self.file_io, self.file_path, blob_offset, 
blob_length)
         self.current_position += 1
         return GenericRow([blob], fields, RowKind.INSERT)
diff --git a/paimon-python/pypaimon/table/row/blob.py 
b/paimon-python/pypaimon/table/row/blob.py
index 19a932a9f9..43391775bd 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -297,6 +297,21 @@ class Blob(ABC):
         return BlobData(data)
 
 
+class _PlaceholderBlob(Blob):
+
+    def to_data(self) -> bytes:
+        raise RuntimeError("Should never call this method for placeholder 
blob.")
+
+    def to_descriptor(self) -> BlobDescriptor:
+        raise RuntimeError("Should never call this method for placeholder 
blob.")
+
+    def new_input_stream(self) -> BinaryIO:
+        raise RuntimeError("Should never call this method for placeholder 
blob.")
+
+
+Blob.PLACE_HOLDER = _PlaceholderBlob()
+
+
 class BlobData(Blob):
 
     def __init__(self, data: Optional[Union[bytes, bytearray]] = None):
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index db0c639888..e6b856432b 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -29,7 +29,7 @@ from pypaimon import CatalogFactory
 from pypaimon.common.file_io import FileIO
 from pypaimon.filesystem.local_file_io import LocalFileIO
 from pypaimon.common.options import Options
-from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+from pypaimon.read.reader.format_blob_reader import BlobRecordIterator, 
FormatBlobReader
 from pypaimon.schema.data_types import AtomicType, DataField
 from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor
 from pypaimon.table.row.generic_row import GenericRowDeserializer, 
GenericRowSerializer, GenericRow
@@ -1296,6 +1296,79 @@ class BlobEndToEndTest(unittest.TestCase):
         self.assertEqual(desc2.uri, blob_file_path)
         reader.close()
 
+    def test_placeholder_blob_write_read(self):
+        from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+        file_io = LocalFileIO(self.temp_dir, Options({}))
+        blob_file_path = os.path.join(self.temp_dir, "placeholder_blob.blob")
+
+        output = open(blob_file_path, 'wb')
+        writer = BlobFormatWriter(output)
+        fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+        writer.add_element(GenericRow([BlobData(b"hello")], fields, 
RowKind.INSERT))
+        writer.add_element(GenericRow([Blob.PLACE_HOLDER], fields, 
RowKind.INSERT))
+        writer.add_element(GenericRow([None], fields, RowKind.INSERT))
+        writer.add_element(GenericRow([BlobData(b"world")], fields, 
RowKind.INSERT))
+        self.assertEqual(
+            writer.lengths[1:3],
+            [BlobFormatWriter.PLACE_HOLDER_LENGTH, 
BlobFormatWriter.NULL_LENGTH])
+        writer.close()
+
+        with open(blob_file_path, 'rb') as blob_file:
+            blob_file.seek(-1, os.SEEK_END)
+            self.assertEqual(blob_file.read(1), struct.pack('<B', 
BlobFormatWriter.VERSION))
+
+        blob_field_name = "blob_field"
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=blob_file_path,
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=False
+        )
+
+        self.assertEqual(reader.blob_lengths[1], 
BlobFormatWriter.PLACE_HOLDER_LENGTH)
+        iterator = BlobRecordIterator(
+            file_io, blob_file_path, reader.blob_lengths, reader.blob_offsets, 
blob_field_name)
+        self.assertEqual(next(iterator).values[0].to_data(), b"hello")
+        self.assertIs(next(iterator).values[0], Blob.PLACE_HOLDER)
+        self.assertIsNone(next(iterator).values[0])
+        self.assertEqual(next(iterator).values[0].to_data(), b"world")
+
+        with self.assertRaisesRegex(RuntimeError, "Blob placeholder is not 
supported"):
+            reader.read_arrow_batch()
+        reader.close()
+
+    def test_placeholder_blob_read_as_descriptor(self):
+        from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+        file_io = LocalFileIO(self.temp_dir, Options({}))
+        blob_file_path = os.path.join(self.temp_dir, "placeholder_desc.blob")
+
+        output = open(blob_file_path, 'wb')
+        writer = BlobFormatWriter(output)
+        fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+        writer.add_element(GenericRow([Blob.PLACE_HOLDER], fields, 
RowKind.INSERT))
+        writer.add_element(GenericRow([BlobData(b"world")], fields, 
RowKind.INSERT))
+        writer.close()
+
+        blob_field_name = "blob_field"
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=blob_file_path,
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=True
+        )
+
+        with self.assertRaisesRegex(RuntimeError, "Blob placeholder is not 
supported"):
+            reader.read_arrow_batch()
+        reader.close()
+
 
 class OffsetInputStreamTest(unittest.TestCase):
 
diff --git a/paimon-python/pypaimon/write/blob_format_writer.py 
b/paimon-python/pypaimon/write/blob_format_writer.py
index f019655c84..92257f4ca9 100644
--- a/paimon-python/pypaimon/write/blob_format_writer.py
+++ b/paimon-python/pypaimon/write/blob_format_writer.py
@@ -26,6 +26,8 @@ from pypaimon.common.delta_varint_compressor import 
DeltaVarintCompressor
 class BlobFormatWriter:
     VERSION = 1
     MAGIC_NUMBER = 1481511375
+    NULL_LENGTH = -1
+    PLACE_HOLDER_LENGTH = -2
     BUFFER_SIZE = 4096
     METADATA_SIZE = 12  # 8-byte length + 4-byte CRC
 
@@ -40,12 +42,16 @@ class BlobFormatWriter:
 
         blob_value = row.values[0]
         if blob_value is None:
-            self.lengths.append(-1)
+            self.lengths.append(self.NULL_LENGTH)
             return
 
         if not isinstance(blob_value, Blob):
             raise ValueError("Field must be Blob/BlobData instance")
 
+        if blob_value is Blob.PLACE_HOLDER:
+            self.lengths.append(self.PLACE_HOLDER_LENGTH)
+            return
+
         previous_pos = self.position
         crc32 = 0  # Initialize CRC32
 
@@ -97,7 +103,7 @@ class BlobFormatWriter:
         if col_data is None:
             if not is_blob:
                 raise RuntimeError("Null values are only supported for BLOB 
type fields")
-            self.lengths.append(-1)
+            self.lengths.append(self.NULL_LENGTH)
             return
 
         if is_blob:

Reply via email to