This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new db86e31da4 [python] Fix DataBlobWriter KeyError for partial writes
with blob columns (#7850)
db86e31da4 is described below
commit db86e31da40527b401a01f89a3627ee772326bea
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu May 14 14:28:18 2026 +0800
[python] Fix DataBlobWriter KeyError for partial writes with blob columns
(#7850)
**Problem**: For tables with BLOB columns, pypaimon uses
`DataBlobWriter`, which splits each `pyarrow.RecordBatch` into “normal”
columns (written to Parquet/ORC/…) and blob-file columns (written via
`BlobWriter`). `_split_data` used the full table lists of normal and
blob-file column names when calling `RecordBatch.select(...)`.
**Regression**: When `TableWrite.with_write_type(...)` narrows the write
to a partial column list, validation ensures incoming batches only
contain those `columns. _split_data` still tried to select columns not
present in the batch (e.g. a normal column omitted from the partial
write), which caused PyArrow to raise KeyError.
**Fix**:
- Pass `write_cols` from `FileStoreWrite` into `DataBlobWriter` (same as
for `AppendOnlyDataWriter`), so the blob writer sees the narrowed column
set from with_write_type.
- In `DataBlobWriter.__init__`, derive `normal_column_names` and
`blob_file_column_names` from that subset when `write_cols` is set: only
blob-file columns that appear in write_cols, and normal columns =
write_cols minus blob-file columns (order preserved from `write_cols`).
Only instantiate `BlobWriter` for blob-file columns in that narrowed
set.
- Keep full-table behavior when `write_cols` is None (full schema
write).
- This keeps `_split_data` consistent with the actual batch schema and
matches the intent of partial / data-evolution writes.
---
paimon-python/pypaimon/tests/blob_table_test.py | 127 +++++++++++++++++++++
paimon-python/pypaimon/write/file_store_write.py | 3 +-
.../pypaimon/write/writer/data_blob_writer.py | 34 ++++--
3 files changed, 155 insertions(+), 9 deletions(-)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7d0ad90ec9..56359c1d1f 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -242,6 +242,133 @@ class DataBlobWriterTest(unittest.TestCase):
result =
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
self.assertEqual(result.num_rows, 3)
+ def test_data_blob_writer_partial_write_with_write_type(self):
+ """Partial write (normal + blob subset) via with_write_type: split
must match batch columns."""
+ from pypaimon import Schema
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_partial_write_type', schema,
False)
+ table = self.catalog.get_table('test_db.blob_partial_write_type')
+
+ partial_schema = pa.schema([('id', pa.int32()), ('blob_data',
pa.large_binary())])
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'blob_data': [b'a', b'b'],
+ }, schema=partial_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write().with_write_type(['id', 'blob_data'])
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ self.assertGreater(len(commit_messages), 0)
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ parquet_files = [f for f in all_files if
f.file_name.endswith('.parquet')]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertEqual(len(parquet_files), 1)
+ self.assertGreaterEqual(len(blob_files), 1)
+ self.assertEqual(parquet_files[0].write_cols, ['id'])
+ self.assertEqual(parquet_files[0].row_count, 2)
+ for bf in blob_files:
+ self.assertEqual(bf.write_cols, ['blob_data'])
+ self.assertEqual(bf.row_count, 2)
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ read_builder = table.new_read_builder()
+ out =
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+ self.assertEqual(out.num_rows, 2)
+ self.assertEqual(out.column('id').to_pylist(), [1, 2])
+ self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b'])
+ self.assertEqual(out.column('name').to_pylist(), [None, None])
+
+ def test_data_blob_writer_partial_write_normal_only_with_write_type(self):
+ """Partial write without blob columns in write_cols must not touch
blob split paths."""
+ from pypaimon import Schema
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_partial_normal_only', schema,
False)
+ table = self.catalog.get_table('test_db.blob_partial_normal_only')
+
+ partial_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+ test_data = pa.Table.from_pydict({'id': [7], 'name': ['n']},
schema=partial_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write().with_write_type(['id', 'name'])
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ self.assertFalse(any(f.file_name.endswith('.blob') for f in all_files))
+ parquet_files = [f for f in all_files if
f.file_name.endswith('.parquet')]
+ self.assertEqual(len(parquet_files), 1)
+ self.assertEqual(parquet_files[0].write_cols, ['id', 'name'])
+ self.assertEqual(parquet_files[0].row_count, 1)
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ read_builder = table.new_read_builder()
+ out =
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+ self.assertEqual(out.column('id').to_pylist(), [7])
+ self.assertEqual(out.column('name').to_pylist(), ['n'])
+ self.assertEqual(out.column('blob_data').to_pylist(), [None])
+
+ def
test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self):
+ """with_write_type lists only one blob column: only that column gets
.blob files."""
+ from pypaimon import Schema
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob1', pa.large_binary()),
+ ('blob2', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_partial_one_of_two', schema,
False)
+ table = self.catalog.get_table('test_db.blob_partial_one_of_two')
+
+ partial_schema = pa.schema([('id', pa.int32()), ('blob1',
pa.large_binary())])
+ test_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'blob1': [b'only_b1'],
+ }, schema=partial_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write().with_write_type(['id', 'blob1'])
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertEqual(len(blob_files), 1)
+ self.assertEqual(blob_files[0].write_cols, ['blob1'])
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
def test_data_blob_writer_write_operations(self):
"""Test DataBlobWriter write operations with real data."""
from pypaimon import Schema
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index a98383d9c9..c33fca3792 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -69,7 +69,8 @@ class FileStoreWrite:
partition=partition,
bucket=bucket,
max_seq_number=0,
- options=options
+ options=options,
+ write_cols=self.write_cols,
)
elif self.table.is_primary_key_table:
return KeyValueDataWriter(
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 1006ec8289..e7a28cddb7 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -48,6 +48,8 @@ class DataBlobWriter(DataWriter):
- One normal data file may correspond to multiple blob data files
- Blob data is written immediately to disk to prevent memory corruption
- Blob file metadata is stored as separate DataFileMeta objects after
normal file metadata
+ - When TableWrite.with_write_type narrows columns, incoming batches only
carry that subset;
+ column lists are narrowed accordingly so splitting never selects missing
columns.
Rolling behavior:
- Normal data rolls: Both normal and blob writers are closed together,
blob metadata added after normal metadata
@@ -76,8 +78,9 @@ class DataBlobWriter(DataWriter):
# Constant for checking rolling condition periodically
CHECK_ROLLING_RECORD_CNT = 1000
- def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, options: CoreOptions = None):
- super().__init__(table, partition, bucket, max_seq_number, options)
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, options: CoreOptions = None,
+ write_cols: Optional[List[str]] = None):
+ super().__init__(table, partition, bucket, max_seq_number, options,
write_cols=write_cols)
# Determine blob columns from table schema
self.blob_column_names = self._get_blob_columns_from_schema()
@@ -93,16 +96,31 @@ class DataBlobWriter(DataWriter):
)
# Blob fields that should still be written to `.blob` files.
- self.blob_file_column_names = [
+ full_blob_file_column_names = [
col for col in self.blob_column_names if col not in
self.blob_descriptor_fields
]
-
+ full_blob_file_set = set(full_blob_file_column_names)
all_column_names = self.table.field_names
- self.normal_column_names = [
- col for col in all_column_names if col not in
self.blob_file_column_names
- ]
+
+ # Narrow columns when TableWrite.with_write_type(...) supplies a
partial column list.
+ # Incoming RecordBatches only contain those columns; selecting full
normal/blob lists
+ # would raise KeyError.
+ if write_cols is not None:
+ write_col_set = set(write_cols)
+ self.blob_file_column_names = [
+ col for col in full_blob_file_column_names if col in
write_col_set
+ ]
+ self.normal_column_names = [
+ col for col in write_cols if col not in full_blob_file_set
+ ]
+ else:
+ self.blob_file_column_names = list(full_blob_file_column_names)
+ self.normal_column_names = [
+ col for col in all_column_names if col not in
full_blob_file_set
+ ]
+ normal_name_set = set(self.normal_column_names)
self.normal_columns = [
- field for field in self.table.table_schema.fields if field.name in
self.normal_column_names
+ field for field in self.table.table_schema.fields if field.name in
normal_name_set
]
self.write_cols = self.normal_column_names