This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5d44333607 [core][python] Fix blob updates and compaction (#8077)
5d44333607 is described below
commit 5d4433360793ca48f34e90864983ef7c52d94bc9
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 2 23:29:43 2026 +0800
[core][python] Fix blob updates and compaction (#8077)
Fix BLOB column updates in data-evolution append tables across Java and
Python, and make BLOB compaction handle updated multi-version BLOB
files. Unchanged BLOB values are now represented with placeholders and
resolved from older BLOB files during reads and compaction.
---
.../append/DedicatedFormatRollingFileWriter.java | 58 ++++----
.../DataEvolutionCompactCoordinator.java | 29 +++-
.../dataevolution/DataEvolutionCompactTask.java | 56 +++-----
.../org/apache/paimon/append/BlobTableTest.java | 103 ++++++++++++++
.../DataEvolutionCompactCoordinatorTest.java | 18 +++
.../pypaimon/read/reader/concat_batch_reader.py | 130 +++++++++++++++++-
paimon-python/pypaimon/read/reader/field_bunch.py | 52 +++++++
paimon-python/pypaimon/read/split_read.py | 50 ++++++-
paimon-python/pypaimon/tests/blob_table_test.py | 76 +++++++++++
.../pypaimon/write/table_update_by_row_id.py | 149 ++++++++++++++++-----
.../pypaimon/write/writer/blob_file_writer.py | 15 ++-
paimon-python/pypaimon/write/writer/blob_writer.py | 11 ++
12 files changed, 645 insertions(+), 102 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
index ed89ad8cc4..3be374ff9d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
@@ -103,7 +103,7 @@ public class DedicatedFormatRollingFileWriter
private static final long CHECK_ROLLING_RECORD_CNT = 1000L;
// Core components
- private final Supplier<
+ private final @Nullable Supplier<
ProjectedFileWriter<SingleFileWriter<InternalRow,
DataFileMeta>, DataFileMeta>>
writerFactory;
private final @Nullable Supplier<MultipleBlobFileWriter> blobWriterFactory;
@@ -168,21 +168,25 @@ public class DedicatedFormatRollingFileWriter
}
}
- this.writerFactory =
- createNormalWriterFactory(
- fileIO,
- schemaId,
- fileFormat,
- fieldsInNormalFile,
- writeSchema,
- pathFactory,
- seqNumCounterSupplier,
- fileCompression,
- statsCollectorFactories,
- fileIndexOptions,
- fileSource,
- asyncFileWrite,
- statsDenseStore);
+ if (fieldsInNormalFile.isEmpty()) {
+ this.writerFactory = null;
+ } else {
+ this.writerFactory =
+ createNormalWriterFactory(
+ fileIO,
+ schemaId,
+ fileFormat,
+ fieldsInNormalFile,
+ writeSchema,
+ pathFactory,
+ seqNumCounterSupplier,
+ fileCompression,
+ statsCollectorFactories,
+ fileIndexOptions,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore);
+ }
if (context != null) {
this.blobWriterFactory =
@@ -353,7 +357,7 @@ public class DedicatedFormatRollingFileWriter
? externalStorageBlobWriter.transformRow(row)
: row;
- if (currentWriter == null) {
+ if (writerFactory != null && currentWriter == null) {
currentWriter = writerFactory.get();
}
if ((blobWriter == null) && (blobWriterFactory != null)) {
@@ -368,10 +372,12 @@ public class DedicatedFormatRollingFileWriter
if (vectorStoreWriter != null) {
vectorStoreWriter.write(transformedRow);
}
- currentWriter.write(transformedRow);
+ if (currentWriter != null) {
+ currentWriter.write(transformedRow);
+ }
recordCount++;
- if (rollingFile()) {
+ if (currentWriter != null && rollingFile()) {
closeCurrentWriter();
}
} catch (Throwable e) {
@@ -382,7 +388,7 @@ public class DedicatedFormatRollingFileWriter
/** Handles write exceptions by logging and cleaning up resources. */
private void handleWriteException(Throwable e) {
- String filePath = (currentWriter == null) ? null :
currentWriter.writer().path().toString();
+ String filePath = currentWriter == null ? null :
currentWriter.writer().path().toString();
LOG.warn("Exception occurs when writing file {}. Cleaning up.",
filePath, e);
abort();
}
@@ -451,12 +457,12 @@ public class DedicatedFormatRollingFileWriter
* @throws IOException if closing fails
*/
private void closeCurrentWriter() throws IOException {
- if (currentWriter == null) {
+ if (currentWriter == null && blobWriter == null && vectorStoreWriter
== null) {
return;
}
// Close main writer and get metadata
- DataFileMeta mainDataFileMeta = closeMainWriter();
+ DataFileMeta mainDataFileMeta = currentWriter == null ? null :
closeMainWriter();
// Close blob writer and process blob metadata
List<DataFileMeta> blobMetas = closeBlobWriter();
@@ -464,11 +470,13 @@ public class DedicatedFormatRollingFileWriter
// Close vector-store writer and process vector-store metadata
List<DataFileMeta> vectorStoreMetas = closeVectorStoreWriter();
- // Validate consistency between main and blob files
- validateFileConsistency(mainDataFileMeta, blobMetas, vectorStoreMetas);
+ if (mainDataFileMeta != null) {
+ // Validate consistency between main and blob files
+ validateFileConsistency(mainDataFileMeta, blobMetas,
vectorStoreMetas);
+ results.add(mainDataFileMeta);
+ }
// Add results to the results list
- results.add(mainDataFileMeta);
results.addAll(blobMetas);
results.addAll(vectorStoreMetas);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index fc65fe95b6..c41493564c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -434,11 +434,36 @@ public class DataEvolutionCompactCoordinator {
private List<List<DataFileMeta>>
fileGroupsToCompact(List<DataFileMeta> files) {
List<List<DataFileMeta>> result = new ArrayList<>();
List<DataFileMeta> sortedFiles = new ArrayList<>(files);
- sortedFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId));
+ sortedFiles.sort(
+ comparingLong(DataFileMeta::nonNullFirstRowId)
+
.thenComparingLong(DataFileMeta::maxSequenceNumber));
+
+ RangeHelper<DataFileMeta> rangeHelper =
+ new RangeHelper<>(DataFileMeta::nonNullRowIdRange);
+ List<DataFileMeta> smallFileCandidates = new ArrayList<>();
+ for (List<DataFileMeta> rowRangeGroup :
+ rangeHelper.mergeOverlappingRanges(sortedFiles)) {
+ if (rowRangeGroup.size() >= BLOB_COMPACT_MIN_FILE_NUM) {
+ rowRangeGroup.sort(
+ comparingLong(DataFileMeta::nonNullFirstRowId)
+
.thenComparingLong(DataFileMeta::maxSequenceNumber));
+ result.add(rowRangeGroup);
+ } else {
+ smallFileCandidates.add(rowRangeGroup.get(0));
+ }
+ }
+
+ result.addAll(smallFileGroupsToCompact(smallFileCandidates));
+ result.sort(comparingLong(group ->
group.get(0).nonNullFirstRowId()));
+ return result;
+ }
+
+ private List<List<DataFileMeta>>
smallFileGroupsToCompact(List<DataFileMeta> files) {
+ List<List<DataFileMeta>> result = new ArrayList<>();
List<DataFileMeta> continuousFiles = new ArrayList<>();
long expectedFirstRowId = -1;
- for (DataFileMeta file : sortedFiles) {
+ for (DataFileMeta file : files) {
if (file.fileSize() >= blobTargetFileSize) {
addFileGroupsToCompact(result, continuousFiles);
continuousFiles.clear();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index e56fe32f82..f8bdc959c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -45,6 +45,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SetUtils;
@@ -191,7 +192,7 @@ public class DataEvolutionCompactTask extends
AppendCompactTask {
CoreOptions options = table.coreOptions();
List<DataFileMeta> sortedCompactBefore =
sortedByFirstRowId(compactBefore);
DataField blobField = blobField(table, options, sortedCompactBefore);
- checkRowIdsContinuous(sortedCompactBefore);
+ Range compactBeforeRange =
checkContiguousRowRange(sortedCompactBefore);
checkArgument(
sortedCompactBefore.size() > 1,
"Blob compaction task %s should contain at least two files to
compact.",
@@ -232,12 +233,11 @@ public class DataEvolutionCompactTask extends
AppendCompactTask {
throw e;
}
- long firstRowId = sortedCompactBefore.get(0).nonNullFirstRowId();
long minSequenceId = minSequenceId(sortedCompactBefore);
long maxSequenceId = maxSequenceId(sortedCompactBefore);
DataFileMeta compactedFile =
writer.result()
- .assignFirstRowId(firstRowId)
+ .assignFirstRowId(compactBeforeRange.from)
.assignSequenceNumber(minSequenceId, maxSequenceId);
compactAfter.add(compactedFile);
checkArgument(compactAfter.size() == 1, "Blob file compaction should
produce one file.");
@@ -326,46 +326,30 @@ public class DataEvolutionCompactTask extends
AppendCompactTask {
return field;
}
- private void checkRowIdsContinuous(List<DataFileMeta> files) {
- checkArgument(!files.isEmpty(), "%s should not be empty.", "Blob
compact before files");
- long expectedFirstRowId = files.get(0).nonNullFirstRowId();
- for (DataFileMeta file : files) {
- long firstRowId = file.nonNullFirstRowId();
- checkArgument(
- firstRowId == expectedFirstRowId,
- "%s should be continuous and sorted by row id, expected %s
but got %s in file %s.",
- "Blob compact before files",
- expectedFirstRowId,
- firstRowId,
- file);
- expectedFirstRowId += file.rowCount();
- }
+ private Range checkContiguousRowRange(List<DataFileMeta> files) {
+ checkArgument(!files.isEmpty(), "%s should not be empty.", "Blob
compact files");
+ List<Range> ranges =
+
files.stream().map(DataFileMeta::nonNullRowIdRange).collect(Collectors.toList());
+ List<Range> merged = Range.sortAndMergeOverlap(ranges, true);
+ checkArgument(
+ merged.size() == 1,
+ "%s should have a contiguous row range, but got %s.",
+ "Blob compact files",
+ merged);
+ return merged.get(0);
}
private void checkSameRowRange(
List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter)
{
+ Range beforeRange = checkContiguousRowRange(compactBefore);
+ Range afterRange = checkContiguousRowRange(compactAfter);
checkArgument(
- !compactBefore.isEmpty(),
- "%s compact before files should not be empty.",
- "Blob compact files");
- checkArgument(
- !compactAfter.isEmpty(),
- "%s compact after files should not be empty.",
- "Blob compact files");
- long beforeFirstRowId = compactBefore.get(0).nonNullFirstRowId();
- long afterFirstRowId = compactAfter.get(0).nonNullFirstRowId();
- long beforeRowCount =
compactBefore.stream().mapToLong(DataFileMeta::rowCount).sum();
- long afterRowCount =
compactAfter.stream().mapToLong(DataFileMeta::rowCount).sum();
- checkArgument(
- beforeFirstRowId == afterFirstRowId && beforeRowCount ==
afterRowCount,
+ beforeRange.equals(afterRange),
"%s compact after files should have the same row range as
compact before files, "
- + "before first row id is %s with row count %s, "
- + "but after first row id is %s with row count %s.",
+ + "before range is %s, but after range is %s.",
"Blob compact files",
- beforeFirstRowId,
- beforeRowCount,
- afterFirstRowId,
- afterRowCount);
+ beforeRange,
+ afterRange);
}
private long minSequenceId(List<DataFileMeta> files) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 6a25d4a0b6..3f12ce039f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.BlobView;
import org.apache.paimon.data.BlobViewStruct;
import org.apache.paimon.data.GenericRow;
@@ -47,6 +48,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.EndOfScanException;
@@ -152,6 +154,91 @@ public class BlobTableTest extends TableTestBase {
assertThat(integer.get()).isEqualTo(1000);
}
+ @Test
+ public void testUpdateBlobColumn() throws Exception {
+ createTableDefault();
+
+ byte[] blob0 = "blob-0".getBytes();
+ byte[] blob1 = "blob-1".getBytes();
+ byte[] blob2 = "blob-2".getBytes();
+ writeDataDefault(
+ Arrays.asList(
+ GenericRow.of(0, BinaryString.fromString("row-0"), new
BlobData(blob0)),
+ GenericRow.of(1, BinaryString.fromString("row-1"), new
BlobData(blob1)),
+ GenericRow.of(2, BinaryString.fromString("row-2"), new
BlobData(blob2))));
+
+ byte[] updatedBlob1 = "updated-blob-1".getBytes();
+ FileStoreTable table = getTableDefault();
+ RowType blobWriteType = table.schema().logicalRowType().project("f2");
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(blobWriteType);
+ BatchTableCommit commit = builder.newCommit()) {
+ write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+ write.write(GenericRow.of(new BlobData(updatedBlob1)));
+ write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+
+ List<CommitMessage> commitMessages = write.prepareCommit();
+ assignFirstRowId(commitMessages, 0L);
+ commit.commit(commitMessages);
+ }
+
+ Map<Integer, byte[]> actual = new HashMap<>();
+ readDefault(row -> actual.put(row.getInt(0), row.getBlob(2).toData()));
+
+ assertThat(actual.size()).isEqualTo(3);
+ assertThat(actual.get(0)).isEqualTo(blob0);
+ assertThat(actual.get(1)).isEqualTo(updatedBlob1);
+ assertThat(actual.get(2)).isEqualTo(blob2);
+ }
+
+ @Test
+ public void testCompactUpdatedBlobColumn() throws Exception {
+ createTableDefault();
+
+ byte[] blob0 = "blob-0".getBytes();
+ byte[] blob1 = "blob-1".getBytes();
+ byte[] blob2 = "blob-2".getBytes();
+ writeDataDefault(
+ Arrays.asList(
+ GenericRow.of(0, BinaryString.fromString("row-0"), new
BlobData(blob0)),
+ GenericRow.of(1, BinaryString.fromString("row-1"), new
BlobData(blob1)),
+ GenericRow.of(2, BinaryString.fromString("row-2"), new
BlobData(blob2))));
+
+ byte[] updatedBlob1 = "updated-blob-1".getBytes();
+ FileStoreTable table = getTableDefault();
+ RowType blobWriteType = table.schema().logicalRowType().project("f2");
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write =
builder.newWrite().withWriteType(blobWriteType);
+ BatchTableCommit commit = builder.newCommit()) {
+ write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+ write.write(GenericRow.of(new BlobData(updatedBlob1)));
+ write.write(GenericRow.of(BlobPlaceholder.INSTANCE));
+
+ List<CommitMessage> commitMessages = write.prepareCommit();
+ assignFirstRowId(commitMessages, 0L);
+ commit.commit(commitMessages);
+ }
+
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, true, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+
assertThat(tasks.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isTrue();
+
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+
+ Map<Integer, byte[]> actual = new HashMap<>();
+ readDefault(row -> actual.put(row.getInt(0), row.getBlob(2).toData()));
+
+ assertThat(actual.size()).isEqualTo(3);
+ assertThat(actual.get(0)).isEqualTo(blob0);
+ assertThat(actual.get(1)).isEqualTo(updatedBlob1);
+ assertThat(actual.get(2)).isEqualTo(blob2);
+ }
+
@Test
public void testWriteByInputStream() throws Exception {
createTableDefault();
@@ -1730,6 +1817,22 @@ public class BlobTableTest extends TableTestBase {
commit.close();
}
+ private static void assignFirstRowId(List<CommitMessage> commitMessages,
long firstRowId) {
+ commitMessages.forEach(
+ commitMessage -> {
+ CommitMessageImpl impl = (CommitMessageImpl) commitMessage;
+ List<DataFileMeta> newFiles =
+ new
ArrayList<>(impl.newFilesIncrement().newFiles());
+ impl.newFilesIncrement().newFiles().clear();
+ impl.newFilesIncrement()
+ .newFiles()
+ .addAll(
+ newFiles.stream()
+ .map(file ->
file.assignFirstRowId(firstRowId))
+ .collect(Collectors.toList()));
+ });
+ }
+
private void createThreeTypeBlobTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index f2f3a16756..fe1b2dfc3e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -191,6 +191,24 @@ public class DataEvolutionCompactCoordinatorTest {
entries.get(5).file());
}
+ @Test
+ public void testCompactPlannerWithUpdatedBlobFiles() {
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(makeEntry("file1.parquet", 0L, 3L, 100));
+ entries.add(makeBlobEntry("old.blob", 0L, 3L, 100, 0, "pic"));
+ entries.add(makeBlobEntry("updated.blob", 0L, 3L, 100, 1, "pic"));
+
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ blobPlanner(1024, 1, 2, rowType(new DataField(1, "pic",
DataTypes.BLOB())));
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).isBlobTask()).isTrue();
+ assertThat(tasks.get(0).compactBefore())
+ .containsExactly(entries.get(1).file(), entries.get(2).file());
+ }
+
@Test
public void testCompactPlannerDoesNotCompactBlobFilesAcrossDataFiles() {
List<ManifestEntry> entries = new ArrayList<>();
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 3788687162..67d1a7c40a 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -16,13 +16,17 @@
# under the License.
import collections
-from typing import Callable, List, Optional
+from typing import Callable, Dict, List, Optional, Tuple
import pyarrow as pa
import pyarrow.dataset as ds
from pyarrow import RecordBatch
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.read.reader.format_blob_reader import BlobRecordIterator
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.table.row.blob import Blob
+from pypaimon.utils.range import Range
_MIN_BATCH_SIZE_TO_REFILL = 1024
@@ -229,3 +233,127 @@ class DataEvolutionMergeReader(RecordBatchReader):
reader.close()
except Exception as e:
raise IOError("Failed to close inner readers") from e
+
+
+class BlobFallbackBatchReader(RecordBatchReader):
+ """Resolve blob placeholders by falling back through older blob
versions."""
+
+ def __init__(self, file_reader_suppliers: List[Tuple[DataFileMeta,
Callable]],
+ field_name: str, output_type, row_ranges:
Optional[List[Range]] = None,
+ blob_as_descriptor: bool = False):
+ self._file_reader_suppliers = file_reader_suppliers
+ self._field_name = field_name
+ self._output_type = output_type
+ self._row_ranges = Range.sort_and_merge_overlap(row_ranges) if
row_ranges else None
+ self._blob_as_descriptor = blob_as_descriptor
+ self._returned = False
+ self._readers: List[RecordBatchReader] = []
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ if self._returned:
+ return None
+ self._returned = True
+
+ groups: Dict[int, Dict[int, Tuple[object, bool]]] = {}
+ target_row_ids = self._target_row_ids()
+
+ for file, supplier in self._file_reader_suppliers:
+ row_ids = self._selected_row_ids(file)
+ blob_values = self._read_blob_values(file, supplier)
+ if len(blob_values) != len(row_ids):
+ raise ValueError(
+ "Blob fallback reader returned an unexpected row count "
+ f"for {file.file_name}: expect {len(row_ids)}, got
{len(blob_values)}."
+ )
+ if not row_ids:
+ continue
+ group = groups.setdefault(file.max_sequence_number, {})
+ for row_id, blob in zip(row_ids, blob_values):
+ if row_id in group:
+ raise ValueError(
+ "Blob files within the same max sequence should not
overlap."
+ )
+ if blob is None:
+ group[row_id] = (None, False)
+ elif blob is Blob.PLACE_HOLDER:
+ group[row_id] = (None, True)
+ else:
+ if self._blob_as_descriptor:
+ group[row_id] = (blob.to_descriptor().serialize(),
False)
+ else:
+ group[row_id] = (blob.to_data(), False)
+
+ if not groups:
+ return None
+
+ result = []
+ for row_id in target_row_ids:
+ found = False
+ for max_sequence_number in sorted(groups.keys(), reverse=True):
+ candidate = groups[max_sequence_number].get(row_id)
+ if candidate is None:
+ continue
+ value, is_placeholder = candidate
+ if not is_placeholder:
+ result.append(value)
+ found = True
+ break
+ if not found:
+ raise ValueError("All blob files at the same row id store a
placeholder.")
+
+ return pa.RecordBatch.from_arrays(
+ [pa.array(result, type=self._output_type)],
+ names=[self._field_name],
+ )
+
+ def _target_row_ids(self) -> List[int]:
+ file_ranges = [
+ file.row_id_range()
+ for file, _ in self._file_reader_suppliers
+ ]
+ ranges = [
+ Range(
+ min(row_range.from_ for row_range in file_ranges),
+ max(row_range.to for row_range in file_ranges),
+ )
+ ]
+ if self._row_ranges is not None:
+ ranges = Range.and_(ranges, self._row_ranges)
+ return self._expand_ranges(ranges)
+
+ def _selected_row_ids(self, file: DataFileMeta) -> List[int]:
+ ranges = [file.row_id_range()]
+ if self._row_ranges is not None:
+ ranges = Range.and_(ranges, self._row_ranges)
+ return self._expand_ranges(ranges)
+
+ @staticmethod
+ def _expand_ranges(ranges: List[Range]) -> List[int]:
+ return [
+ row_id
+ for row_range in ranges
+ for row_id in range(row_range.from_, row_range.to + 1)
+ ]
+
+ def _read_blob_values(self, file: DataFileMeta, supplier: Callable) ->
List[object]:
+ reader = supplier()
+ if reader is None:
+ return []
+ self._readers.append(reader)
+ try:
+ iterator = BlobRecordIterator(
+ reader._file_io,
+ reader.file_path,
+ reader.blob_lengths,
+ reader.blob_offsets,
+ self._field_name,
+ reader._input_stream,
+ )
+ return [row.values[0] for row in iterator]
+ except AttributeError as e:
+ raise TypeError("Blob fallback reader expects FormatBlobReader
suppliers.") from e
+
+ def close(self) -> None:
+ for reader in self._readers:
+ reader.close()
+ self._readers = []
diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py
b/paimon-python/pypaimon/read/reader/field_bunch.py
index 74162cbc83..f2e0ae756c 100644
--- a/paimon-python/pypaimon/read/reader/field_bunch.py
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -23,7 +23,9 @@ supporting both regular data files and blob files.
"""
from abc import ABC
from typing import List
+
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.utils.range import Range
class FieldBunch(ABC):
@@ -75,6 +77,12 @@ class _SpecialFieldBunch(FieldBunch):
f"Only {self._file_type_label()} file can be added to "
f"a {self._file_type_label()} bunch.")
+ if self._files and file.write_cols != self._files[0].write_cols:
+ raise ValueError(
+ f"All files in a {self._file_type_label()} bunch should "
+ f"have the same write columns."
+ )
+
if file.first_row_id == self.latest_first_row_id:
if file.max_sequence_number >= self.latest_max_sequence_number:
raise ValueError(
@@ -136,6 +144,50 @@ class _SpecialFieldBunch(FieldBunch):
class BlobBunch(_SpecialFieldBunch):
"""Files for partial field (blob files)."""
+ def add(self, file: DataFileMeta) -> None:
+ if not self._is_special_file(file.file_name):
+ raise ValueError("Only blob file can be added to a blob bunch.")
+ if self._files and file.write_cols != self._files[0].write_cols:
+ raise ValueError("All files in a blob bunch should have the same
write columns.")
+
+ self._files.append(file)
+ merged = Range.sort_and_merge_overlap(
+ [blob_file.row_id_range() for blob_file in self._files],
+ True,
+ True,
+ )
+ self._row_count = sum(row_range.count() for row_range in merged)
+ if self.expected_row_count >= 0 and self._row_count >
self.expected_row_count:
+ raise ValueError(
+ f"Blob files row count exceed the expect
{self.expected_row_count}"
+ )
+
+ def row_count(self) -> int:
+ merged = Range.sort_and_merge_overlap(
+ [blob_file.row_id_range() for blob_file in self._files],
+ True,
+ True,
+ )
+ row_count = sum(row_range.count() for row_range in merged)
+ if not self.row_id_push_down:
+ if len(merged) != 1:
+ raise ValueError("Blob file bunch should always contain a
contiguous row range.")
+ if self.expected_row_count >= 0 and row_count !=
self.expected_row_count:
+ raise ValueError(
+ "The merged row count of blob file bunch should be aligned
"
+ f"with normal files, expect {self.expected_row_count}, got
{row_count}."
+ )
+ return row_count
+
+ def sequential_read_optimize(self) -> bool:
+ if not self._files:
+ raise ValueError("Blob bunch should not be empty.")
+ max_sequence_number = self._files[0].max_sequence_number
+ return all(
+ file.max_sequence_number == max_sequence_number
+ for file in self._files
+ )
+
def _is_special_file(self, file_name: str) -> bool:
return DataFileMeta.is_blob_file(file_name)
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index dc5f5e6100..ddb349e0ca 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -29,8 +29,9 @@ from pypaimon.manifest.schema.data_file_meta import
DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.push_down_utils import rewrite_predicate_indices,
trim_predicate_by_fields
-from pypaimon.read.reader.concat_batch_reader import (ConcatBatchReader,
- MergeAllBatchReader,
DataEvolutionMergeReader)
+from pypaimon.read.reader.concat_batch_reader import (
+ BlobFallbackBatchReader, ConcatBatchReader,
+ MergeAllBatchReader, DataEvolutionMergeReader)
from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
@@ -950,6 +951,27 @@ class DataEvolutionSplitRead(SplitRead):
bunch.files()[0], read_field_names
): r]
file_record_readers[i] = MergeAllBatchReader(suppliers,
batch_size=batch_size)
+ elif DataFileMeta.is_blob_file(first_file.file_name):
+ file_reader_suppliers = [
+ (
+ file,
+ partial(
+ self._create_raw_blob_file_reader,
+ file=file,
+ read_fields=read_field_names,
+ ),
+ )
+ for file in bunch.files()
+ ]
+ file_record_readers[i] = BlobFallbackBatchReader(
+ file_reader_suppliers,
+ read_fields[0].name,
+ PyarrowFieldParser.from_paimon_schema(
+ [read_fields[0]]
+ ).field(0).type,
+ self.row_ranges,
+ CoreOptions.blob_as_descriptor(self.table.options),
+ )
else:
# Create concatenated reader for multiple files
suppliers = [
@@ -977,6 +999,30 @@ class DataEvolutionSplitRead(SplitRead):
row_tracking_enabled=True,
row_ranges=self.row_ranges)
+ def _create_raw_blob_file_reader(
+ self, file: DataFileMeta, read_fields: [str]) ->
Optional[FormatBlobReader]:
+ row_indices = None
+ if self.row_ranges is not None:
+ row_indices = [
+ row_id - file.first_row_id
+ for row_range in Range.and_([file.row_id_range()],
self.row_ranges)
+ for row_id in range(row_range.from_, row_range.to + 1)
+ ]
+ if not row_indices:
+ return None
+
+ file_path = file.external_path if file.external_path else
file.file_path
+ return FormatBlobReader(
+ self.table.file_io,
+ file_path,
+ read_fields,
+ self.read_fields,
+ None,
+ CoreOptions.blob_as_descriptor(self.table.options),
+ batch_size=self.table.options.read_batch_size(),
+ row_indices=row_indices,
+ )
+
def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) ->
List[FieldBunch]:
"""Split files into field bunches."""
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 5eaa57b056..ca9161bd14 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1062,6 +1062,82 @@ class DedicatedFormatWriterTest(unittest.TestCase):
[b'first_blob', None, b'third_blob', None, b'fifth_blob'],
)
+ def test_update_blob_column(self):
+ from pypaimon import Schema
+ from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_update_column', schema, False)
+ table = self.catalog.get_table('test_db.blob_update_column')
+
+ initial = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'blob_data': [b'blob-1', b'blob-2', b'blob-3'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(initial)
+ write_builder.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ update_builder = table.new_batch_write_builder()
+ table_update =
update_builder.new_update().with_update_type(['blob_data'])
+ update_data = pa.Table.from_pydict({
+ '_ROW_ID': pa.array([1], type=pa.int64()),
+ 'blob_data': pa.array([b'updated-blob-2'], type=pa.large_binary()),
+ })
+ update_messages = table_update.update_by_arrow_with_row_id(update_data)
+ update_builder.new_commit().commit(update_messages)
+
+ update_files = [f for msg in update_messages for f in msg.new_files]
+ update_blob_files = [f for f in update_files if
f.file_name.endswith('.blob')]
+ self.assertGreater(len(update_blob_files), 0)
+ self.assertTrue(all(f.write_cols == ['blob_data'] for f in
update_files))
+ update_blob_lengths = []
+ blob_fields = [field for field in table.fields if field.name ==
'blob_data']
+ for blob_file in update_blob_files:
+ blob_reader = FormatBlobReader(
+ file_io=table.file_io,
+ file_path=blob_file.file_path,
+ read_fields=['blob_data'],
+ full_fields=blob_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False,
+ )
+ update_blob_lengths.extend(blob_reader.blob_lengths)
+ blob_reader.close()
+ self.assertEqual(
+ update_blob_lengths.count(BlobFormatWriter.PLACE_HOLDER_LENGTH),
+ 2,
+ )
+
+ read_builder = table.new_read_builder()
+ result =
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+ by_id = {
+ row['id']: row['blob_data']
+ for row in result.select(['id', 'blob_data']).to_pylist()
+ }
+ self.assertEqual(by_id, {
+ 1: b'blob-1',
+ 2: b'updated-blob-2',
+ 3: b'blob-3',
+ })
+
def test_blob_write_read_partition(self):
"""Test complete end-to-end blob functionality: write blob data and
read it back to verify correctness."""
from pypaimon import Schema
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index e4c448be14..7220f1718d 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -25,12 +25,14 @@ import pyarrow.compute as pc
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.split import DataSplit
from pypaimon.read.table_read import TableRead
-from pypaimon.utils.range import Range
from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.blob import Blob
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.special_fields import SpecialFields
+from pypaimon.utils.range import Range
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_write import FileStoreWrite
+from pypaimon.write.writer.blob_writer import BlobWriter
@dataclass(frozen=True)
@@ -99,15 +101,35 @@ class TableUpdateByRowId:
index: Dict[int, Tuple[DataSplit, List[DataFileMeta]]] = {}
row_id_ranges: List[Range] = []
for split in splits:
+ files_with_row_id = [
+ file for file in split.files if file.first_row_id is not None
+ ]
+ data_files = [
+ file for file in files_with_row_id
+ if not DataFileMeta.is_blob_file(file.file_name)
+ ]
for file in split.files:
- if file.first_row_id is None or
file.file_name.endswith('.blob'):
+ if file.first_row_id is None or
DataFileMeta.is_blob_file(file.file_name):
continue
row_id_ranges.append(file.row_id_range())
+ for file in data_files:
+ target_files = [
+ target_file
+ for target_file in files_with_row_id
+ if self._overlaps(file.row_id_range(),
target_file.row_id_range())
+ ]
+
entry = index.get(file.first_row_id)
if entry is None:
- index[file.first_row_id] = (split, [file])
+ index[file.first_row_id] = (split, target_files)
else:
- entry[1].append(file)
+ existing_files = entry[1]
+ existing_names = {existing.file_name for existing in
existing_files}
+ existing_files.extend(
+ target_file
+ for target_file in target_files
+ if target_file.file_name not in existing_names
+ )
# Multiple physical files may share the same first_row_id (data
evolution);
# summing row_count per file would over-count logical rows and widen
@@ -126,6 +148,10 @@ class TableUpdateByRowId:
total_row_count=total_row_count,
)
+ @staticmethod
+ def _overlaps(left: Range, right: Range) -> bool:
+ return left.from_ <= right.to and right.from_ <= left.to
+
def update_columns(self, data: pa.Table, column_names: List[str]) ->
List[CommitMessage]:
"""
Add or update columns in the table.
@@ -226,7 +252,7 @@ class TableUpdateByRowId:
"""
wanted = set(column_names)
read_fields: List[DataField] = [
- field for field in self.table.fields if field.name in wanted
+ table_field for table_field in self.table.fields if
table_field.name in wanted
]
if not read_fields:
return None
@@ -245,8 +271,12 @@ class TableUpdateByRowId:
table_read = TableRead(self.table, predicate=None,
read_type=read_fields)
return table_read.to_arrow([origin_split])
- def _merge_update_with_original(self, original_data: Optional[pa.Table],
update_data: pa.Table,
- column_names: List[str], first_row_id:
int) -> pa.Table:
+ def _merge_update_with_original(
+ self,
+ original_data: Optional[pa.Table],
+ update_data: pa.Table,
+ column_names: List[str],
+ first_row_id: int) -> Tuple[Optional[pa.Table], Dict[str,
List[object]]]:
"""Merge update data with original data, preserving row order.
For rows that have updates, use the update values.
@@ -259,7 +289,7 @@ class TableUpdateByRowId:
first_row_id: The first_row_id of this file group
Returns:
- Merged PyArrow Table with all rows
+ Normal merged PyArrow Table and blob values to write row-by-row.
"""
# Get the _ROW_ID values from update_data to determine which rows are
being updated
@@ -274,18 +304,40 @@ class TableUpdateByRowId:
# Build the merged table column by column
merged_columns = {}
+ blob_columns = {}
+ update_by_col = {
+ col_name: update_data[col_name].combine_chunks()
+ for col_name in column_names
+ }
+ update_positions = {
+ int(relative_index.as_py()): idx
+ for idx, relative_index in enumerate(relative_indices)
+ }
for col_name in column_names:
- update_col = update_data[col_name].combine_chunks()
+ update_col = update_by_col[col_name]
original_col = original_data[col_name].combine_chunks()
- # replace_with_mask fills mask=True positions with update values
in order
- merged_columns[col_name] = pc.replace_with_mask(
- original_col, mask, update_col.cast(original_col.type)
- )
-
- # Create the merged table
- merged_table = pa.table(merged_columns)
-
- return merged_table
+ if self._is_blob_column(col_name):
+ blob_columns[col_name] = [
+ update_col[update_positions[i]].as_py()
+ if i in update_positions
+ else Blob.PLACE_HOLDER
+ for i in range(original_data.num_rows)
+ ]
+ else:
+ # replace_with_mask fills mask=True positions with update
values in order
+ merged_columns[col_name] = pc.replace_with_mask(
+ original_col, mask, update_col.cast(original_col.type)
+ )
+
+ merged_table = pa.table(merged_columns) if merged_columns else None
+
+ return merged_table, blob_columns
+
+ def _is_blob_column(self, column_name: str) -> bool:
+ for table_field in self.table.fields:
+ if table_field.name == column_name:
+ return getattr(table_field.type, 'type', None) == 'BLOB'
+ return False
def _write_group(self, partition: GenericRow, first_row_id: int,
data: pa.Table, column_names: List[str]):
@@ -295,25 +347,54 @@ class TableUpdateByRowId:
writes a single output file (rolling disabled) for the group.
"""
original_data = self._read_original_file_data(first_row_id,
column_names)
- merged_data = self._merge_update_with_original(
+ merged_data, blob_columns = self._merge_update_with_original(
original_data, data, column_names, first_row_id,
)
- file_store_write = FileStoreWrite(self.table, self.commit_user)
+ partition_tuple = tuple(partition.values)
+ new_files = []
+ file_store_write = None
+ blob_writers = []
try:
- file_store_write.disable_rolling()
- file_store_write.write_cols = column_names
-
- partition_tuple = tuple(partition.values)
- for batch in merged_data.to_batches():
- file_store_write.write(partition_tuple, 0, batch)
-
- new_messages =
file_store_write.prepare_commit(self.commit_identifier)
- for msg in new_messages:
- msg.check_from_snapshot = self.snapshot_id
- for file in msg.new_files:
+ if merged_data is not None:
+ file_store_write = FileStoreWrite(self.table, self.commit_user)
+ file_store_write.disable_rolling()
+ file_store_write.write_cols = list(merged_data.column_names)
+ for batch in merged_data.to_batches():
+ file_store_write.write(partition_tuple, 0, batch)
+ new_messages =
file_store_write.prepare_commit(self.commit_identifier)
+ for msg in new_messages:
+ new_files.extend(msg.new_files)
+
+ for column_name, values in blob_columns.items():
+ blob_writer = BlobWriter(
+ self.table,
+ partition_tuple,
+ 0,
+ 0,
+ column_name,
+ self.table.options,
+ )
+ blob_writers.append(blob_writer)
+ arrow_type = original_data.schema.field(column_name).type
+ for value in values:
+ blob_writer.write_blob(value, arrow_type)
+ new_files.extend(blob_writer.prepare_commit())
+
+ if new_files:
+ for file in new_files:
file.first_row_id = first_row_id
- file.write_cols = column_names
- self.commit_messages.extend(new_messages)
+ file.write_cols = file.write_cols or column_names
+ self.commit_messages.append(
+ CommitMessage(
+ partition=partition_tuple,
+ bucket=0,
+ new_files=new_files,
+ check_from_snapshot=self.snapshot_id,
+ )
+ )
finally:
- file_store_write.close()
+ if file_store_write is not None:
+ file_store_write.close()
+ for blob_writer in blob_writers:
+ blob_writer.close()
diff --git a/paimon-python/pypaimon/write/writer/blob_file_writer.py
b/paimon-python/pypaimon/write/writer/blob_file_writer.py
index 31d945a4ed..fe911586be 100644
--- a/paimon-python/pypaimon/write/writer/blob_file_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_file_writer.py
@@ -52,8 +52,17 @@ class BlobFileWriter:
blob_data = self._to_blob(col_data)
- # Create GenericRow
- fields = [DataField(0, field_name,
PyarrowFieldParser.to_paimon_type(row_data.schema[0].type, False))]
+ self.write_blob(field_name, row_data.schema[0].type, blob_data)
+
+ def write_blob(self, field_name: str, arrow_type, blob_data):
+ blob_data = self._to_blob(blob_data)
+ fields = [
+ DataField(
+ 0,
+ field_name,
+ PyarrowFieldParser.to_paimon_type(arrow_type, False),
+ )
+ ]
row = GenericRow([blob_data], fields, RowKind.INSERT)
# Write to blob format writer
@@ -61,6 +70,8 @@ class BlobFileWriter:
self.row_count += 1
def _to_blob(self, col_data) -> Optional[Blob]:
+ if col_data is Blob.PLACE_HOLDER:
+ return Blob.PLACE_HOLDER
if hasattr(col_data, 'as_py'):
col_data = col_data.as_py()
if col_data is None:
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 4ebed16785..adc86e8486 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -81,6 +81,17 @@ class BlobWriter(AppendOnlyDataWriter):
# This ensures each row has a unique sequence number for data
versioning and consistency
self.sequence_generator.next()
+ def write_blob(self, value, arrow_type=pa.large_binary()):
+ if self.current_writer is None:
+ self.open_current_writer()
+
+ self.current_writer.write_blob(self.blob_column, arrow_type, value)
+ self.sequence_generator.next()
+ self.record_count += 1
+
+ if self.rolling_file():
+ self.close_current_writer()
+
def open_current_writer(self):
file_name = (f"{CoreOptions.data_file_prefix(self.options)}"
f"{self.file_uuid}-{self.file_count}.{self.file_format}")