This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d44333607 [core][python] Fix blob updates and compaction (#8077)
5d44333607 is described below

commit 5d4433360793ca48f34e90864983ef7c52d94bc9
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 2 23:29:43 2026 +0800

    [core][python] Fix blob updates and compaction (#8077)
    
    Fix BLOB column updates in data-evolution append tables across Java and
    Python, and make BLOB compaction handle updated multi-version BLOB
    files. Unchanged BLOB values are now represented with placeholders and
    resolved from older BLOB files during reads and compaction.
---
 .../append/DedicatedFormatRollingFileWriter.java   |  58 ++++----
 .../DataEvolutionCompactCoordinator.java           |  29 +++-
 .../dataevolution/DataEvolutionCompactTask.java    |  56 +++-----
 .../org/apache/paimon/append/BlobTableTest.java    | 103 ++++++++++++++
 .../DataEvolutionCompactCoordinatorTest.java       |  18 +++
 .../pypaimon/read/reader/concat_batch_reader.py    | 130 +++++++++++++++++-
 paimon-python/pypaimon/read/reader/field_bunch.py  |  52 +++++++
 paimon-python/pypaimon/read/split_read.py          |  50 ++++++-
 paimon-python/pypaimon/tests/blob_table_test.py    |  76 +++++++++++
 .../pypaimon/write/table_update_by_row_id.py       | 149 ++++++++++++++++-----
 .../pypaimon/write/writer/blob_file_writer.py      |  15 ++-
 paimon-python/pypaimon/write/writer/blob_writer.py |  11 ++
 12 files changed, 645 insertions(+), 102 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
index ed89ad8cc4..3be374ff9d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
@@ -103,7 +103,7 @@ public class DedicatedFormatRollingFileWriter
     private static final long CHECK_ROLLING_RECORD_CNT = 1000L;
 
     // Core components
-    private final Supplier<
+    private final @Nullable Supplier<
                     ProjectedFileWriter<SingleFileWriter<InternalRow, 
DataFileMeta>, DataFileMeta>>
             writerFactory;
     private final @Nullable Supplier<MultipleBlobFileWriter> blobWriterFactory;
@@ -168,21 +168,25 @@ public class DedicatedFormatRollingFileWriter
             }
         }
 
-        this.writerFactory =
-                createNormalWriterFactory(
-                        fileIO,
-                        schemaId,
-                        fileFormat,
-                        fieldsInNormalFile,
-                        writeSchema,
-                        pathFactory,
-                        seqNumCounterSupplier,
-                        fileCompression,
-                        statsCollectorFactories,
-                        fileIndexOptions,
-                        fileSource,
-                        asyncFileWrite,
-                        statsDenseStore);
+        if (fieldsInNormalFile.isEmpty()) {
+            this.writerFactory = null;
+        } else {
+            this.writerFactory =
+                    createNormalWriterFactory(
+                            fileIO,
+                            schemaId,
+                            fileFormat,
+                            fieldsInNormalFile,
+                            writeSchema,
+                            pathFactory,
+                            seqNumCounterSupplier,
+                            fileCompression,
+                            statsCollectorFactories,
+                            fileIndexOptions,
+                            fileSource,
+                            asyncFileWrite,
+                            statsDenseStore);
+        }
 
         if (context != null) {
             this.blobWriterFactory =
@@ -353,7 +357,7 @@ public class DedicatedFormatRollingFileWriter
                             ? externalStorageBlobWriter.transformRow(row)
                             : row;
 
-            if (currentWriter == null) {
+            if (writerFactory != null && currentWriter == null) {
                 currentWriter = writerFactory.get();
             }
             if ((blobWriter == null) && (blobWriterFactory != null)) {
@@ -368,10 +372,12 @@ public class DedicatedFormatRollingFileWriter
             if (vectorStoreWriter != null) {
                 vectorStoreWriter.write(transformedRow);
             }
-            currentWriter.write(transformedRow);
+            if (currentWriter != null) {
+                currentWriter.write(transformedRow);
+            }
             recordCount++;
 
-            if (rollingFile()) {
+            if (currentWriter != null && rollingFile()) {
                 closeCurrentWriter();
             }
         } catch (Throwable e) {
@@ -382,7 +388,7 @@ public class DedicatedFormatRollingFileWriter
 
     /** Handles write exceptions by logging and cleaning up resources. */
     private void handleWriteException(Throwable e) {
-        String filePath = (currentWriter == null) ? null : 
currentWriter.writer().path().toString();
+        String filePath = currentWriter == null ? null : 
currentWriter.writer().path().toString();
         LOG.warn("Exception occurs when writing file {}. Cleaning up.", 
filePath, e);
         abort();
     }
@@ -451,12 +457,12 @@ public class DedicatedFormatRollingFileWriter
      * @throws IOException if closing fails
      */
     private void closeCurrentWriter() throws IOException {
-        if (currentWriter == null) {
+        if (currentWriter == null && blobWriter == null && vectorStoreWriter 
== null) {
             return;
         }
 
         // Close main writer and get metadata
-        DataFileMeta mainDataFileMeta = closeMainWriter();
+        DataFileMeta mainDataFileMeta = currentWriter == null ? null : 
closeMainWriter();
 
         // Close blob writer and process blob metadata
         List<DataFileMeta> blobMetas = closeBlobWriter();
@@ -464,11 +470,13 @@ public class DedicatedFormatRollingFileWriter
         // Close vector-store writer and process vector-store metadata
         List<DataFileMeta> vectorStoreMetas = closeVectorStoreWriter();
 
-        // Validate consistency between main and blob files
-        validateFileConsistency(mainDataFileMeta, blobMetas, vectorStoreMetas);
+        if (mainDataFileMeta != null) {
+            // Validate consistency between main and blob files
+            validateFileConsistency(mainDataFileMeta, blobMetas, 
vectorStoreMetas);
+            results.add(mainDataFileMeta);
+        }
 
         // Add results to the results list
-        results.add(mainDataFileMeta);
         results.addAll(blobMetas);
         results.addAll(vectorStoreMetas);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index fc65fe95b6..c41493564c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -434,11 +434,36 @@ public class DataEvolutionCompactCoordinator {
         private List<List<DataFileMeta>> 
fileGroupsToCompact(List<DataFileMeta> files) {
             List<List<DataFileMeta>> result = new ArrayList<>();
             List<DataFileMeta> sortedFiles = new ArrayList<>(files);
-            sortedFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId));
+            sortedFiles.sort(
+                    comparingLong(DataFileMeta::nonNullFirstRowId)
+                            
.thenComparingLong(DataFileMeta::maxSequenceNumber));
+
+            RangeHelper<DataFileMeta> rangeHelper =
+                    new RangeHelper<>(DataFileMeta::nonNullRowIdRange);
+            List<DataFileMeta> smallFileCandidates = new ArrayList<>();
+            for (List<DataFileMeta> rowRangeGroup :
+                    rangeHelper.mergeOverlappingRanges(sortedFiles)) {
+                if (rowRangeGroup.size() >= BLOB_COMPACT_MIN_FILE_NUM) {
+                    rowRangeGroup.sort(
+                            comparingLong(DataFileMeta::nonNullFirstRowId)
+                                    
.thenComparingLong(DataFileMeta::maxSequenceNumber));
+                    result.add(rowRangeGroup);
+                } else {
+                    smallFileCandidates.add(rowRangeGroup.get(0));
+                }
+            }
+
+            result.addAll(smallFileGroupsToCompact(smallFileCandidates));
+            result.sort(comparingLong(group -> 
group.get(0).nonNullFirstRowId()));
+            return result;
+        }
+
+        private List<List<DataFileMeta>> 
smallFileGroupsToCompact(List<DataFileMeta> files) {
+            List<List<DataFileMeta>> result = new ArrayList<>();
 
             List<DataFileMeta> continuousFiles = new ArrayList<>();
             long expectedFirstRowId = -1;
-            for (DataFileMeta file : sortedFiles) {
+            for (DataFileMeta file : files) {
                 if (file.fileSize() >= blobTargetFileSize) {
                     addFileGroupsToCompact(result, continuousFiles);
                     continuousFiles.clear();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index e56fe32f82..f8bdc959c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -45,6 +45,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SetUtils;
 
@@ -191,7 +192,7 @@ public class DataEvolutionCompactTask extends 
AppendCompactTask {
         CoreOptions options = table.coreOptions();
         List<DataFileMeta> sortedCompactBefore = 
sortedByFirstRowId(compactBefore);
         DataField blobField = blobField(table, options, sortedCompactBefore);
-        checkRowIdsContinuous(sortedCompactBefore);
+        Range compactBeforeRange = 
checkContiguousRowRange(sortedCompactBefore);
         checkArgument(
                 sortedCompactBefore.size() > 1,
                 "Blob compaction task %s should contain at least two files to 
compact.",
@@ -232,12 +233,11 @@ public class DataEvolutionCompactTask extends 
AppendCompactTask {
             throw e;
         }
 
-        long firstRowId = sortedCompactBefore.get(0).nonNullFirstRowId();
         long minSequenceId = minSequenceId(sortedCompactBefore);
         long maxSequenceId = maxSequenceId(sortedCompactBefore);
         DataFileMeta compactedFile =
                 writer.result()
-                        .assignFirstRowId(firstRowId)
+                        .assignFirstRowId(compactBeforeRange.from)
                         .assignSequenceNumber(minSequenceId, maxSequenceId);
         compactAfter.add(compactedFile);
         checkArgument(compactAfter.size() == 1, "Blob file compaction should 
produce one file.");
@@ -326,46 +326,30 @@ public class DataEvolutionCompactTask extends 
AppendCompactTask {
         return field;
     }
 
-    private void checkRowIdsContinuous(List<DataFileMeta> files) {
-        checkArgument(!files.isEmpty(), "%s should not be empty.", "Blob 
compact before files");
-        long expectedFirstRowId = files.get(0).nonNullFirstRowId();
-        for (DataFileMeta file : files) {
-            long firstRowId = file.nonNullFirstRowId();
-            checkArgument(
-                    firstRowId == expectedFirstRowId,
-                    "%s should be continuous and sorted by row id, expected %s 
but got %s in file %s.",
-                    "Blob compact before files",
-                    expectedFirstRowId,
-                    firstRowId,
-                    file);
-            expectedFirstRowId += file.rowCount();
-        }
+    private Range checkContiguousRowRange(List<DataFileMeta> files) {
+        checkArgument(!files.isEmpty(), "%s should not be empty.", "Blob 
compact files");
+        List<Range> ranges =
+                
files.stream().map(DataFileMeta::nonNullRowIdRange).collect(Collectors.toList());
+        List<Range> merged = Range.sortAndMergeOverlap(ranges, true);
+        checkArgument(
+                merged.size() == 1,
+                "%s should have a contiguous row range, but got %s.",
+                "Blob compact files",
+                merged);
+        return merged.get(0);
     }
 
     private void checkSameRowRange(
             List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) 
{
+        Range beforeRange = checkContiguousRowRange(compactBefore);
+        Range afterRange = checkContiguousRowRange(compactAfter);
         checkArgument(
-                !compactBefore.isEmpty(),
-                "%s compact before files should not be empty.",
-                "Blob compact files");
-        checkArgument(
-                !compactAfter.isEmpty(),
-                "%s compact after files should not be empty.",
-                "Blob compact files");
-        long beforeFirstRowId = compactBefore.get(0).nonNullFirstRowId();
-        long afterFirstRowId = compactAfter.get(0).nonNullFirstRowId();
-        long beforeRowCount = 
compactBefore.stream().mapToLong(DataFileMeta::rowCount).sum();
-        long afterRowCount = 
compactAfter.stream().mapToLong(DataFileMeta::rowCount).sum();
-        checkArgument(
-                beforeFirstRowId == afterFirstRowId && beforeRowCount == 
afterRowCount,
+                beforeRange.equals(afterRange),
                 "%s compact after files should have the same row range as 
compact before files, "
-                        + "before first row id is %s with row count %s, "
-                        + "but after first row id is %s with row count %s.",
+                        + "before range is %s, but after range is %s.",
                 "Blob compact files",
-                beforeFirstRowId,
-                beforeRowCount,
-                afterFirstRowId,
-                afterRowCount);
+                beforeRange,
+                afterRange);
     }
 
     private long minSequenceId(List<DataFileMeta> files) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 6a25d4a0b6..3f12ce039f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobPlaceholder;
 import org.apache.paimon.data.BlobView;
 import org.apache.paimon.data.BlobViewStruct;
 import org.apache.paimon.data.GenericRow;
@@ -47,6 +48,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -152,6 +154,91 @@ public class BlobTableTest extends TableTestBase {
         assertThat(integer.get()).isEqualTo(1000);
     }
 
+    @Test
+    public void testUpdateBlobColumn() throws Exception {
+        createTableDefault();
+
+        byte[] blob0 = "blob-0".getBytes();
+        byte[] blob1 = "blob-1".getBytes();
+        byte[] blob2 = "blob-2".getBytes();
+        writeDataDefault(
+                Arrays.asList(
+                        GenericRow.of(0, BinaryString.fromString("row-0"), new 
BlobData(blob0)),
+                        GenericRow.of(1, BinaryString.fromString("row-1"), new 
BlobData(blob1)),
+                        GenericRow.of(2, BinaryString.fromString("row-2"), new 
BlobData(blob2))));
+
+        byte[] updatedBlob1 = "updated-blob-1".getBytes();
+        FileStoreTable table = getTableDefault();
+        RowType blobWriteType = table.schema().logicalRowType().project("f2");
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(blobWriteType);
+                BatchTableCommit commit = builder.newCommit()) {
+            write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+            write.write(GenericRow.of(new BlobData(updatedBlob1)));
+            write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+
+            List<CommitMessage> commitMessages = write.prepareCommit();
+            assignFirstRowId(commitMessages, 0L);
+            commit.commit(commitMessages);
+        }
+
+        Map<Integer, byte[]> actual = new HashMap<>();
+        readDefault(row -> actual.put(row.getInt(0), row.getBlob(2).toData()));
+
+        assertThat(actual.size()).isEqualTo(3);
+        assertThat(actual.get(0)).isEqualTo(blob0);
+        assertThat(actual.get(1)).isEqualTo(updatedBlob1);
+        assertThat(actual.get(2)).isEqualTo(blob2);
+    }
+
+    @Test
+    public void testCompactUpdatedBlobColumn() throws Exception {
+        createTableDefault();
+
+        byte[] blob0 = "blob-0".getBytes();
+        byte[] blob1 = "blob-1".getBytes();
+        byte[] blob2 = "blob-2".getBytes();
+        writeDataDefault(
+                Arrays.asList(
+                        GenericRow.of(0, BinaryString.fromString("row-0"), new 
BlobData(blob0)),
+                        GenericRow.of(1, BinaryString.fromString("row-1"), new 
BlobData(blob1)),
+                        GenericRow.of(2, BinaryString.fromString("row-2"), new 
BlobData(blob2))));
+
+        byte[] updatedBlob1 = "updated-blob-1".getBytes();
+        FileStoreTable table = getTableDefault();
+        RowType blobWriteType = table.schema().logicalRowType().project("f2");
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = 
builder.newWrite().withWriteType(blobWriteType);
+                BatchTableCommit commit = builder.newCommit()) {
+            write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+            write.write(GenericRow.of(new BlobData(updatedBlob1)));
+            write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+
+            List<CommitMessage> commitMessages = write.prepareCommit();
+            assignFirstRowId(commitMessages, 0L);
+            commit.commit(commitMessages);
+        }
+
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, true, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        
assertThat(tasks.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isTrue();
+
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        Map<Integer, byte[]> actual = new HashMap<>();
+        readDefault(row -> actual.put(row.getInt(0), row.getBlob(2).toData()));
+
+        assertThat(actual.size()).isEqualTo(3);
+        assertThat(actual.get(0)).isEqualTo(blob0);
+        assertThat(actual.get(1)).isEqualTo(updatedBlob1);
+        assertThat(actual.get(2)).isEqualTo(blob2);
+    }
+
     @Test
     public void testWriteByInputStream() throws Exception {
         createTableDefault();
@@ -1730,6 +1817,22 @@ public class BlobTableTest extends TableTestBase {
         commit.close();
     }
 
+    private static void assignFirstRowId(List<CommitMessage> commitMessages, 
long firstRowId) {
+        commitMessages.forEach(
+                commitMessage -> {
+                    CommitMessageImpl impl = (CommitMessageImpl) commitMessage;
+                    List<DataFileMeta> newFiles =
+                            new 
ArrayList<>(impl.newFilesIncrement().newFiles());
+                    impl.newFilesIncrement().newFiles().clear();
+                    impl.newFilesIncrement()
+                            .newFiles()
+                            .addAll(
+                                    newFiles.stream()
+                                            .map(file -> 
file.assignFirstRowId(firstRowId))
+                                            .collect(Collectors.toList()));
+                });
+    }
+
     private void createThreeTypeBlobTable() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index f2f3a16756..fe1b2dfc3e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -191,6 +191,24 @@ public class DataEvolutionCompactCoordinatorTest {
                         entries.get(5).file());
     }
 
+    @Test
+    public void testCompactPlannerWithUpdatedBlobFiles() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 3L, 100));
+        entries.add(makeBlobEntry("old.blob", 0L, 3L, 100, 0, "pic"));
+        entries.add(makeBlobEntry("updated.blob", 0L, 3L, 100, 1, "pic"));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(1024, 1, 2, rowType(new DataField(1, "pic", 
DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).isBlobTask()).isTrue();
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(1).file(), entries.get(2).file());
+    }
+
     @Test
     public void testCompactPlannerDoesNotCompactBlobFilesAcrossDataFiles() {
         List<ManifestEntry> entries = new ArrayList<>();
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 3788687162..67d1a7c40a 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -16,13 +16,17 @@
 # under the License.
 
 import collections
-from typing import Callable, List, Optional
+from typing import Callable, Dict, List, Optional, Tuple
 
 import pyarrow as pa
 import pyarrow.dataset as ds
 from pyarrow import RecordBatch
 
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.read.reader.format_blob_reader import BlobRecordIterator
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.table.row.blob import Blob
+from pypaimon.utils.range import Range
 
 _MIN_BATCH_SIZE_TO_REFILL = 1024
 
@@ -229,3 +233,127 @@ class DataEvolutionMergeReader(RecordBatchReader):
                     reader.close()
         except Exception as e:
             raise IOError("Failed to close inner readers") from e
+
+
+class BlobFallbackBatchReader(RecordBatchReader):
+    """Resolve blob placeholders by falling back through older blob 
versions."""
+
+    def __init__(self, file_reader_suppliers: List[Tuple[DataFileMeta, 
Callable]],
+                 field_name: str, output_type, row_ranges: 
Optional[List[Range]] = None,
+                 blob_as_descriptor: bool = False):
+        self._file_reader_suppliers = file_reader_suppliers
+        self._field_name = field_name
+        self._output_type = output_type
+        self._row_ranges = Range.sort_and_merge_overlap(row_ranges) if 
row_ranges else None
+        self._blob_as_descriptor = blob_as_descriptor
+        self._returned = False
+        self._readers: List[RecordBatchReader] = []
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        if self._returned:
+            return None
+        self._returned = True
+
+        groups: Dict[int, Dict[int, Tuple[object, bool]]] = {}
+        target_row_ids = self._target_row_ids()
+
+        for file, supplier in self._file_reader_suppliers:
+            row_ids = self._selected_row_ids(file)
+            blob_values = self._read_blob_values(file, supplier)
+            if len(blob_values) != len(row_ids):
+                raise ValueError(
+                    "Blob fallback reader returned an unexpected row count "
+                    f"for {file.file_name}: expect {len(row_ids)}, got 
{len(blob_values)}."
+                )
+            if not row_ids:
+                continue
+            group = groups.setdefault(file.max_sequence_number, {})
+            for row_id, blob in zip(row_ids, blob_values):
+                if row_id in group:
+                    raise ValueError(
+                        "Blob files within the same max sequence should not 
overlap."
+                    )
+                if blob is None:
+                    group[row_id] = (None, False)
+                elif blob is Blob.PLACE_HOLDER:
+                    group[row_id] = (None, True)
+                else:
+                    if self._blob_as_descriptor:
+                        group[row_id] = (blob.to_descriptor().serialize(), 
False)
+                    else:
+                        group[row_id] = (blob.to_data(), False)
+
+        if not groups:
+            return None
+
+        result = []
+        for row_id in target_row_ids:
+            found = False
+            for max_sequence_number in sorted(groups.keys(), reverse=True):
+                candidate = groups[max_sequence_number].get(row_id)
+                if candidate is None:
+                    continue
+                value, is_placeholder = candidate
+                if not is_placeholder:
+                    result.append(value)
+                    found = True
+                    break
+            if not found:
+                raise ValueError("All blob files at the same row id store a 
placeholder.")
+
+        return pa.RecordBatch.from_arrays(
+            [pa.array(result, type=self._output_type)],
+            names=[self._field_name],
+        )
+
+    def _target_row_ids(self) -> List[int]:
+        file_ranges = [
+            file.row_id_range()
+            for file, _ in self._file_reader_suppliers
+        ]
+        ranges = [
+            Range(
+                min(row_range.from_ for row_range in file_ranges),
+                max(row_range.to for row_range in file_ranges),
+            )
+        ]
+        if self._row_ranges is not None:
+            ranges = Range.and_(ranges, self._row_ranges)
+        return self._expand_ranges(ranges)
+
+    def _selected_row_ids(self, file: DataFileMeta) -> List[int]:
+        ranges = [file.row_id_range()]
+        if self._row_ranges is not None:
+            ranges = Range.and_(ranges, self._row_ranges)
+        return self._expand_ranges(ranges)
+
+    @staticmethod
+    def _expand_ranges(ranges: List[Range]) -> List[int]:
+        return [
+            row_id
+            for row_range in ranges
+            for row_id in range(row_range.from_, row_range.to + 1)
+        ]
+
+    def _read_blob_values(self, file: DataFileMeta, supplier: Callable) -> 
List[object]:
+        reader = supplier()
+        if reader is None:
+            return []
+        self._readers.append(reader)
+        try:
+            iterator = BlobRecordIterator(
+                reader._file_io,
+                reader.file_path,
+                reader.blob_lengths,
+                reader.blob_offsets,
+                self._field_name,
+                reader._input_stream,
+            )
+            return [row.values[0] for row in iterator]
+        except AttributeError as e:
+            raise TypeError("Blob fallback reader expects FormatBlobReader 
suppliers.") from e
+
+    def close(self) -> None:
+        for reader in self._readers:
+            reader.close()
+        self._readers = []
diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py 
b/paimon-python/pypaimon/read/reader/field_bunch.py
index 74162cbc83..f2e0ae756c 100644
--- a/paimon-python/pypaimon/read/reader/field_bunch.py
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -23,7 +23,9 @@ supporting both regular data files and blob files.
 """
 from abc import ABC
 from typing import List
+
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.utils.range import Range
 
 
 class FieldBunch(ABC):
@@ -75,6 +77,12 @@ class _SpecialFieldBunch(FieldBunch):
                 f"Only {self._file_type_label()} file can be added to "
                 f"a {self._file_type_label()} bunch.")
 
+        if self._files and file.write_cols != self._files[0].write_cols:
+            raise ValueError(
+                f"All files in a {self._file_type_label()} bunch should "
+                f"have the same write columns."
+            )
+
         if file.first_row_id == self.latest_first_row_id:
             if file.max_sequence_number >= self.latest_max_sequence_number:
                 raise ValueError(
@@ -136,6 +144,50 @@ class _SpecialFieldBunch(FieldBunch):
 class BlobBunch(_SpecialFieldBunch):
     """Files for partial field (blob files)."""
 
+    def add(self, file: DataFileMeta) -> None:
+        if not self._is_special_file(file.file_name):
+            raise ValueError("Only blob file can be added to a blob bunch.")
+        if self._files and file.write_cols != self._files[0].write_cols:
+            raise ValueError("All files in a blob bunch should have the same 
write columns.")
+
+        self._files.append(file)
+        merged = Range.sort_and_merge_overlap(
+            [blob_file.row_id_range() for blob_file in self._files],
+            True,
+            True,
+        )
+        self._row_count = sum(row_range.count() for row_range in merged)
+        if self.expected_row_count >= 0 and self._row_count > 
self.expected_row_count:
+            raise ValueError(
+                f"Blob files row count exceed the expect 
{self.expected_row_count}"
+            )
+
+    def row_count(self) -> int:
+        merged = Range.sort_and_merge_overlap(
+            [blob_file.row_id_range() for blob_file in self._files],
+            True,
+            True,
+        )
+        row_count = sum(row_range.count() for row_range in merged)
+        if not self.row_id_push_down:
+            if len(merged) != 1:
+                raise ValueError("Blob file bunch should always contain a 
contiguous row range.")
+            if self.expected_row_count >= 0 and row_count != 
self.expected_row_count:
+                raise ValueError(
+                    "The merged row count of blob file bunch should be aligned 
"
+                    f"with normal files, expect {self.expected_row_count}, got 
{row_count}."
+                )
+        return row_count
+
+    def sequential_read_optimize(self) -> bool:
+        if not self._files:
+            raise ValueError("Blob bunch should not be empty.")
+        max_sequence_number = self._files[0].max_sequence_number
+        return all(
+            file.max_sequence_number == max_sequence_number
+            for file in self._files
+        )
+
     def _is_special_file(self, file_name: str) -> bool:
         return DataFileMeta.is_blob_file(file_name)
 
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index dc5f5e6100..ddb349e0ca 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -29,8 +29,9 @@ from pypaimon.manifest.schema.data_file_meta import 
DataFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.push_down_utils import rewrite_predicate_indices, 
trim_predicate_by_fields
-from pypaimon.read.reader.concat_batch_reader import (ConcatBatchReader,
-                                                      MergeAllBatchReader, 
DataEvolutionMergeReader)
+from pypaimon.read.reader.concat_batch_reader import (
+    BlobFallbackBatchReader, ConcatBatchReader,
+    MergeAllBatchReader, DataEvolutionMergeReader)
 from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
 
 from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
@@ -950,6 +951,27 @@ class DataEvolutionSplitRead(SplitRead):
                         bunch.files()[0], read_field_names
                     ): r]
                     file_record_readers[i] = MergeAllBatchReader(suppliers, 
batch_size=batch_size)
+                elif DataFileMeta.is_blob_file(first_file.file_name):
+                    file_reader_suppliers = [
+                        (
+                            file,
+                            partial(
+                                self._create_raw_blob_file_reader,
+                                file=file,
+                                read_fields=read_field_names,
+                            ),
+                        )
+                        for file in bunch.files()
+                    ]
+                    file_record_readers[i] = BlobFallbackBatchReader(
+                        file_reader_suppliers,
+                        read_fields[0].name,
+                        PyarrowFieldParser.from_paimon_schema(
+                            [read_fields[0]]
+                        ).field(0).type,
+                        self.row_ranges,
+                        CoreOptions.blob_as_descriptor(self.table.options),
+                    )
                 else:
                     # Create concatenated reader for multiple files
                     suppliers = [
@@ -977,6 +999,30 @@ class DataEvolutionSplitRead(SplitRead):
             row_tracking_enabled=True,
             row_ranges=self.row_ranges)
 
+    def _create_raw_blob_file_reader(
+            self, file: DataFileMeta, read_fields: [str]) -> 
Optional[FormatBlobReader]:
+        row_indices = None
+        if self.row_ranges is not None:
+            row_indices = [
+                row_id - file.first_row_id
+                for row_range in Range.and_([file.row_id_range()], 
self.row_ranges)
+                for row_id in range(row_range.from_, row_range.to + 1)
+            ]
+            if not row_indices:
+                return None
+
+        file_path = file.external_path if file.external_path else 
file.file_path
+        return FormatBlobReader(
+            self.table.file_io,
+            file_path,
+            read_fields,
+            self.read_fields,
+            None,
+            CoreOptions.blob_as_descriptor(self.table.options),
+            batch_size=self.table.options.read_batch_size(),
+            row_indices=row_indices,
+        )
+
     def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> 
List[FieldBunch]:
         """Split files into field bunches."""
 
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 5eaa57b056..ca9161bd14 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1062,6 +1062,82 @@ class DedicatedFormatWriterTest(unittest.TestCase):
             [b'first_blob', None, b'third_blob', None, b'fifth_blob'],
         )
 
+    def test_update_blob_column(self):
+        from pypaimon import Schema
+        from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+        from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_update_column', schema, False)
+        table = self.catalog.get_table('test_db.blob_update_column')
+
+        initial = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'blob_data': [b'blob-1', b'blob-2', b'blob-3'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(initial)
+        write_builder.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+        update_builder = table.new_batch_write_builder()
+        table_update = 
update_builder.new_update().with_update_type(['blob_data'])
+        update_data = pa.Table.from_pydict({
+            '_ROW_ID': pa.array([1], type=pa.int64()),
+            'blob_data': pa.array([b'updated-blob-2'], type=pa.large_binary()),
+        })
+        update_messages = table_update.update_by_arrow_with_row_id(update_data)
+        update_builder.new_commit().commit(update_messages)
+
+        update_files = [f for msg in update_messages for f in msg.new_files]
+        update_blob_files = [f for f in update_files if 
f.file_name.endswith('.blob')]
+        self.assertGreater(len(update_blob_files), 0)
+        self.assertTrue(all(f.write_cols == ['blob_data'] for f in 
update_files))
+        update_blob_lengths = []
+        blob_fields = [field for field in table.fields if field.name == 
'blob_data']
+        for blob_file in update_blob_files:
+            blob_reader = FormatBlobReader(
+                file_io=table.file_io,
+                file_path=blob_file.file_path,
+                read_fields=['blob_data'],
+                full_fields=blob_fields,
+                push_down_predicate=None,
+                blob_as_descriptor=False,
+            )
+            update_blob_lengths.extend(blob_reader.blob_lengths)
+            blob_reader.close()
+        self.assertEqual(
+            update_blob_lengths.count(BlobFormatWriter.PLACE_HOLDER_LENGTH),
+            2,
+        )
+
+        read_builder = table.new_read_builder()
+        result = 
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+        by_id = {
+            row['id']: row['blob_data']
+            for row in result.select(['id', 'blob_data']).to_pylist()
+        }
+        self.assertEqual(by_id, {
+            1: b'blob-1',
+            2: b'updated-blob-2',
+            3: b'blob-3',
+        })
+
     def test_blob_write_read_partition(self):
         """Test complete end-to-end blob functionality: write blob data and 
read it back to verify correctness."""
         from pypaimon import Schema
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py 
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index e4c448be14..7220f1718d 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -25,12 +25,14 @@ import pyarrow.compute as pc
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.read.split import DataSplit
 from pypaimon.read.table_read import TableRead
-from pypaimon.utils.range import Range
 from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.blob import Blob
 from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.table.special_fields import SpecialFields
+from pypaimon.utils.range import Range
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_write import FileStoreWrite
+from pypaimon.write.writer.blob_writer import BlobWriter
 
 
 @dataclass(frozen=True)
@@ -99,15 +101,35 @@ class TableUpdateByRowId:
         index: Dict[int, Tuple[DataSplit, List[DataFileMeta]]] = {}
         row_id_ranges: List[Range] = []
         for split in splits:
+            files_with_row_id = [
+                file for file in split.files if file.first_row_id is not None
+            ]
+            data_files = [
+                file for file in files_with_row_id
+                if not DataFileMeta.is_blob_file(file.file_name)
+            ]
             for file in split.files:
-                if file.first_row_id is None or 
file.file_name.endswith('.blob'):
+                if file.first_row_id is None or 
DataFileMeta.is_blob_file(file.file_name):
                     continue
                 row_id_ranges.append(file.row_id_range())
+            for file in data_files:
+                target_files = [
+                    target_file
+                    for target_file in files_with_row_id
+                    if self._overlaps(file.row_id_range(), 
target_file.row_id_range())
+                ]
+
                 entry = index.get(file.first_row_id)
                 if entry is None:
-                    index[file.first_row_id] = (split, [file])
+                    index[file.first_row_id] = (split, target_files)
                 else:
-                    entry[1].append(file)
+                    existing_files = entry[1]
+                    existing_names = {existing.file_name for existing in 
existing_files}
+                    existing_files.extend(
+                        target_file
+                        for target_file in target_files
+                        if target_file.file_name not in existing_names
+                    )
 
         # Multiple physical files may share the same first_row_id (data 
evolution);
         # summing row_count per file would over-count logical rows and widen
@@ -126,6 +148,10 @@ class TableUpdateByRowId:
             total_row_count=total_row_count,
         )
 
+    @staticmethod
+    def _overlaps(left: Range, right: Range) -> bool:
+        return left.from_ <= right.to and right.from_ <= left.to
+
     def update_columns(self, data: pa.Table, column_names: List[str]) -> 
List[CommitMessage]:
         """
         Add or update columns in the table.
@@ -226,7 +252,7 @@ class TableUpdateByRowId:
         """
         wanted = set(column_names)
         read_fields: List[DataField] = [
-            field for field in self.table.fields if field.name in wanted
+            table_field for table_field in self.table.fields if 
table_field.name in wanted
         ]
         if not read_fields:
             return None
@@ -245,8 +271,12 @@ class TableUpdateByRowId:
         table_read = TableRead(self.table, predicate=None, 
read_type=read_fields)
         return table_read.to_arrow([origin_split])
 
-    def _merge_update_with_original(self, original_data: Optional[pa.Table], 
update_data: pa.Table,
-                                    column_names: List[str], first_row_id: 
int) -> pa.Table:
+    def _merge_update_with_original(
+            self,
+            original_data: Optional[pa.Table],
+            update_data: pa.Table,
+            column_names: List[str],
+            first_row_id: int) -> Tuple[Optional[pa.Table], Dict[str, 
List[object]]]:
         """Merge update data with original data, preserving row order.
 
         For rows that have updates, use the update values.
@@ -259,7 +289,7 @@ class TableUpdateByRowId:
             first_row_id: The first_row_id of this file group
 
         Returns:
-            Merged PyArrow Table with all rows
+            Normal merged PyArrow Table and blob values to write row-by-row.
         """
 
         # Get the _ROW_ID values from update_data to determine which rows are 
being updated
@@ -274,18 +304,40 @@ class TableUpdateByRowId:
 
         # Build the merged table column by column
         merged_columns = {}
+        blob_columns = {}
+        update_by_col = {
+            col_name: update_data[col_name].combine_chunks()
+            for col_name in column_names
+        }
+        update_positions = {
+            int(relative_index.as_py()): idx
+            for idx, relative_index in enumerate(relative_indices)
+        }
         for col_name in column_names:
-            update_col = update_data[col_name].combine_chunks()
+            update_col = update_by_col[col_name]
             original_col = original_data[col_name].combine_chunks()
-            # replace_with_mask fills mask=True positions with update values 
in order
-            merged_columns[col_name] = pc.replace_with_mask(
-                original_col, mask, update_col.cast(original_col.type)
-            )
-
-        # Create the merged table
-        merged_table = pa.table(merged_columns)
-
-        return merged_table
+            if self._is_blob_column(col_name):
+                blob_columns[col_name] = [
+                    update_col[update_positions[i]].as_py()
+                    if i in update_positions
+                    else Blob.PLACE_HOLDER
+                    for i in range(original_data.num_rows)
+                ]
+            else:
+                # replace_with_mask fills mask=True positions with update 
values in order
+                merged_columns[col_name] = pc.replace_with_mask(
+                    original_col, mask, update_col.cast(original_col.type)
+                )
+
+        merged_table = pa.table(merged_columns) if merged_columns else None
+
+        return merged_table, blob_columns
+
+    def _is_blob_column(self, column_name: str) -> bool:
+        for table_field in self.table.fields:
+            if table_field.name == column_name:
+                return getattr(table_field.type, 'type', None) == 'BLOB'
+        return False
 
     def _write_group(self, partition: GenericRow, first_row_id: int,
                      data: pa.Table, column_names: List[str]):
@@ -295,25 +347,54 @@ class TableUpdateByRowId:
         writes a single output file (rolling disabled) for the group.
         """
         original_data = self._read_original_file_data(first_row_id, 
column_names)
-        merged_data = self._merge_update_with_original(
+        merged_data, blob_columns = self._merge_update_with_original(
             original_data, data, column_names, first_row_id,
         )
 
-        file_store_write = FileStoreWrite(self.table, self.commit_user)
+        partition_tuple = tuple(partition.values)
+        new_files = []
+        file_store_write = None
+        blob_writers = []
         try:
-            file_store_write.disable_rolling()
-            file_store_write.write_cols = column_names
-
-            partition_tuple = tuple(partition.values)
-            for batch in merged_data.to_batches():
-                file_store_write.write(partition_tuple, 0, batch)
-
-            new_messages = 
file_store_write.prepare_commit(self.commit_identifier)
-            for msg in new_messages:
-                msg.check_from_snapshot = self.snapshot_id
-                for file in msg.new_files:
+            if merged_data is not None:
+                file_store_write = FileStoreWrite(self.table, self.commit_user)
+                file_store_write.disable_rolling()
+                file_store_write.write_cols = list(merged_data.column_names)
+                for batch in merged_data.to_batches():
+                    file_store_write.write(partition_tuple, 0, batch)
+                new_messages = 
file_store_write.prepare_commit(self.commit_identifier)
+                for msg in new_messages:
+                    new_files.extend(msg.new_files)
+
+            for column_name, values in blob_columns.items():
+                blob_writer = BlobWriter(
+                    self.table,
+                    partition_tuple,
+                    0,
+                    0,
+                    column_name,
+                    self.table.options,
+                )
+                blob_writers.append(blob_writer)
+                arrow_type = original_data.schema.field(column_name).type
+                for value in values:
+                    blob_writer.write_blob(value, arrow_type)
+                new_files.extend(blob_writer.prepare_commit())
+
+            if new_files:
+                for file in new_files:
                     file.first_row_id = first_row_id
-                    file.write_cols = column_names
-            self.commit_messages.extend(new_messages)
+                    file.write_cols = file.write_cols or column_names
+                self.commit_messages.append(
+                    CommitMessage(
+                        partition=partition_tuple,
+                        bucket=0,
+                        new_files=new_files,
+                        check_from_snapshot=self.snapshot_id,
+                    )
+                )
         finally:
-            file_store_write.close()
+            if file_store_write is not None:
+                file_store_write.close()
+            for blob_writer in blob_writers:
+                blob_writer.close()
diff --git a/paimon-python/pypaimon/write/writer/blob_file_writer.py 
b/paimon-python/pypaimon/write/writer/blob_file_writer.py
index 31d945a4ed..fe911586be 100644
--- a/paimon-python/pypaimon/write/writer/blob_file_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_file_writer.py
@@ -52,8 +52,17 @@ class BlobFileWriter:
 
         blob_data = self._to_blob(col_data)
 
-        # Create GenericRow
-        fields = [DataField(0, field_name, 
PyarrowFieldParser.to_paimon_type(row_data.schema[0].type, False))]
+        self.write_blob(field_name, row_data.schema[0].type, blob_data)
+
+    def write_blob(self, field_name: str, arrow_type, blob_data):
+        blob_data = self._to_blob(blob_data)
+        fields = [
+            DataField(
+                0,
+                field_name,
+                PyarrowFieldParser.to_paimon_type(arrow_type, False),
+            )
+        ]
         row = GenericRow([blob_data], fields, RowKind.INSERT)
 
         # Write to blob format writer
@@ -61,6 +70,8 @@ class BlobFileWriter:
         self.row_count += 1
 
     def _to_blob(self, col_data) -> Optional[Blob]:
+        if col_data is Blob.PLACE_HOLDER:
+            return Blob.PLACE_HOLDER
         if hasattr(col_data, 'as_py'):
             col_data = col_data.as_py()
         if col_data is None:
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 4ebed16785..adc86e8486 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -81,6 +81,17 @@ class BlobWriter(AppendOnlyDataWriter):
         # This ensures each row has a unique sequence number for data 
versioning and consistency
         self.sequence_generator.next()
 
+    def write_blob(self, value, arrow_type=pa.large_binary()):
+        if self.current_writer is None:
+            self.open_current_writer()
+
+        self.current_writer.write_blob(self.blob_column, arrow_type, value)
+        self.sequence_generator.next()
+        self.record_count += 1
+
+        if self.rolling_file():
+            self.close_current_writer()
+
     def open_current_writer(self):
         file_name = (f"{CoreOptions.data_file_prefix(self.options)}"
                      f"{self.file_uuid}-{self.file_count}.{self.file_format}")

Reply via email to