This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new db86e31da4 [python] Fix DataBlobWriter KeyError for partial writes 
with blob columns (#7850)
db86e31da4 is described below

commit db86e31da40527b401a01f89a3627ee772326bea
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu May 14 14:28:18 2026 +0800

    [python] Fix DataBlobWriter KeyError for partial writes with blob columns 
(#7850)
    
    **Problem**: For tables with BLOB columns, pypaimon uses
    `DataBlobWriter`, which splits each `pyarrow.RecordBatch` into “normal”
    columns (written to Parquet/ORC/…) and blob-file columns (written via
    `BlobWriter`). `_split_data` used the full table lists of normal and
    blob-file column names when calling `RecordBatch.select(...)`.
    
    **Regression**: When `TableWrite.with_write_type(...)` narrows the write
    to a partial column list, validation ensures incoming batches only
    contain those `columns. _split_data` still tried to select columns not
    present in the batch (e.g. a normal column omitted from the partial
    write), which caused PyArrow to raise KeyError.
    
    **Fix**:
    
    - Pass `write_cols` from `FileStoreWrite` into `DataBlobWriter` (same as
    for `AppendOnlyDataWriter`), so the blob writer sees the narrowed column
    set from with_write_type.
    - In `DataBlobWriter.__init__`, derive `normal_column_names` and
    `blob_file_column_names` from that subset when `write_cols` is set: only
    blob-file columns that appear in write_cols, and normal columns =
    write_cols minus blob-file columns (order preserved from `write_cols`).
    Only instantiate `BlobWriter` for blob-file columns in that narrowed
    set.
    - Keep full-table behavior when `write_cols` is None (full schema
    write).
    - This keeps `_split_data` consistent with the actual batch schema and
    matches the intent of partial / data-evolution writes.
---
 paimon-python/pypaimon/tests/blob_table_test.py    | 127 +++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_write.py   |   3 +-
 .../pypaimon/write/writer/data_blob_writer.py      |  34 ++++--
 3 files changed, 155 insertions(+), 9 deletions(-)

diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7d0ad90ec9..56359c1d1f 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -242,6 +242,133 @@ class DataBlobWriterTest(unittest.TestCase):
         result = 
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
         self.assertEqual(result.num_rows, 3)
 
+    def test_data_blob_writer_partial_write_with_write_type(self):
+        """Partial write (normal + blob subset) via with_write_type: split 
must match batch columns."""
+        from pypaimon import Schema
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_partial_write_type', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_partial_write_type')
+
+        partial_schema = pa.schema([('id', pa.int32()), ('blob_data', 
pa.large_binary())])
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'blob_data': [b'a', b'b'],
+        }, schema=partial_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write().with_write_type(['id', 'blob_data'])
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        self.assertGreater(len(commit_messages), 0)
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        parquet_files = [f for f in all_files if 
f.file_name.endswith('.parquet')]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        self.assertEqual(len(parquet_files), 1)
+        self.assertGreaterEqual(len(blob_files), 1)
+        self.assertEqual(parquet_files[0].write_cols, ['id'])
+        self.assertEqual(parquet_files[0].row_count, 2)
+        for bf in blob_files:
+            self.assertEqual(bf.write_cols, ['blob_data'])
+            self.assertEqual(bf.row_count, 2)
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        read_builder = table.new_read_builder()
+        out = 
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+        self.assertEqual(out.num_rows, 2)
+        self.assertEqual(out.column('id').to_pylist(), [1, 2])
+        self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b'])
+        self.assertEqual(out.column('name').to_pylist(), [None, None])
+
+    def test_data_blob_writer_partial_write_normal_only_with_write_type(self):
+        """Partial write without blob columns in write_cols must not touch 
blob split paths."""
+        from pypaimon import Schema
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_partial_normal_only', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_partial_normal_only')
+
+        partial_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+        test_data = pa.Table.from_pydict({'id': [7], 'name': ['n']}, 
schema=partial_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write().with_write_type(['id', 'name'])
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        self.assertFalse(any(f.file_name.endswith('.blob') for f in all_files))
+        parquet_files = [f for f in all_files if 
f.file_name.endswith('.parquet')]
+        self.assertEqual(len(parquet_files), 1)
+        self.assertEqual(parquet_files[0].write_cols, ['id', 'name'])
+        self.assertEqual(parquet_files[0].row_count, 1)
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        read_builder = table.new_read_builder()
+        out = 
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+        self.assertEqual(out.column('id').to_pylist(), [7])
+        self.assertEqual(out.column('name').to_pylist(), ['n'])
+        self.assertEqual(out.column('blob_data').to_pylist(), [None])
+
+    def 
test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self):
+        """with_write_type lists only one blob column: only that column gets 
.blob files."""
+        from pypaimon import Schema
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob1', pa.large_binary()),
+            ('blob2', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_partial_one_of_two', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_partial_one_of_two')
+
+        partial_schema = pa.schema([('id', pa.int32()), ('blob1', 
pa.large_binary())])
+        test_data = pa.Table.from_pydict({
+            'id': [1],
+            'blob1': [b'only_b1'],
+        }, schema=partial_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write().with_write_type(['id', 'blob1'])
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        self.assertEqual(len(blob_files), 1)
+        self.assertEqual(blob_files[0].write_cols, ['blob1'])
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
     def test_data_blob_writer_write_operations(self):
         """Test DataBlobWriter write operations with real data."""
         from pypaimon import Schema
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index a98383d9c9..c33fca3792 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -69,7 +69,8 @@ class FileStoreWrite:
                 partition=partition,
                 bucket=bucket,
                 max_seq_number=0,
-                options=options
+                options=options,
+                write_cols=self.write_cols,
             )
         elif self.table.is_primary_key_table:
             return KeyValueDataWriter(
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 1006ec8289..e7a28cddb7 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -48,6 +48,8 @@ class DataBlobWriter(DataWriter):
     - One normal data file may correspond to multiple blob data files
     - Blob data is written immediately to disk to prevent memory corruption
     - Blob file metadata is stored as separate DataFileMeta objects after 
normal file metadata
+    - When TableWrite.with_write_type narrows columns, incoming batches only 
carry that subset;
+      column lists are narrowed accordingly so splitting never selects missing 
columns.
 
     Rolling behavior:
     - Normal data rolls: Both normal and blob writers are closed together, 
blob metadata added after normal metadata
@@ -76,8 +78,9 @@ class DataBlobWriter(DataWriter):
     # Constant for checking rolling condition periodically
     CHECK_ROLLING_RECORD_CNT = 1000
 
-    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, options: CoreOptions = None):
-        super().__init__(table, partition, bucket, max_seq_number, options)
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, options: CoreOptions = None,
+                 write_cols: Optional[List[str]] = None):
+        super().__init__(table, partition, bucket, max_seq_number, options, 
write_cols=write_cols)
 
         # Determine blob columns from table schema
         self.blob_column_names = self._get_blob_columns_from_schema()
@@ -93,16 +96,31 @@ class DataBlobWriter(DataWriter):
             )
 
         # Blob fields that should still be written to `.blob` files.
-        self.blob_file_column_names = [
+        full_blob_file_column_names = [
             col for col in self.blob_column_names if col not in 
self.blob_descriptor_fields
         ]
-
+        full_blob_file_set = set(full_blob_file_column_names)
         all_column_names = self.table.field_names
-        self.normal_column_names = [
-            col for col in all_column_names if col not in 
self.blob_file_column_names
-        ]
+
+        # Narrow columns when TableWrite.with_write_type(...) supplies a 
partial column list.
+        # Incoming RecordBatches only contain those columns; selecting full 
normal/blob lists
+        # would raise KeyError.
+        if write_cols is not None:
+            write_col_set = set(write_cols)
+            self.blob_file_column_names = [
+                col for col in full_blob_file_column_names if col in 
write_col_set
+            ]
+            self.normal_column_names = [
+                col for col in write_cols if col not in full_blob_file_set
+            ]
+        else:
+            self.blob_file_column_names = list(full_blob_file_column_names)
+            self.normal_column_names = [
+                col for col in all_column_names if col not in 
full_blob_file_set
+            ]
+        normal_name_set = set(self.normal_column_names)
         self.normal_columns = [
-            field for field in self.table.table_schema.fields if field.name in 
self.normal_column_names
+            field for field in self.table.table_schema.fields if field.name in 
normal_name_set
         ]
         self.write_cols = self.normal_column_names
 

Reply via email to