This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9636052e3d [python] Fix update-by-row-id for list and map column types
(#8162)
9636052e3d is described below
commit 9636052e3d424a976bd5a0a7ccd8e176da966671
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jun 8 20:29:53 2026 +0800
[python] Fix update-by-row-id for list and map column types (#8162)
---
paimon-python/pypaimon/tests/table_update_test.py | 80 ++++++++++++++++++++++
.../pypaimon/write/table_update_by_row_id.py | 51 ++++++++++++--
2 files changed, 127 insertions(+), 4 deletions(-)
diff --git a/paimon-python/pypaimon/tests/table_update_test.py
b/paimon-python/pypaimon/tests/table_update_test.py
index 57ae605703..97f8a4f676 100644
--- a/paimon-python/pypaimon/tests/table_update_test.py
+++ b/paimon-python/pypaimon/tests/table_update_test.py
@@ -570,6 +570,86 @@ class _TableUpdateTestBase(DataEvolutionTestBase):
# Rows 3 & 4 must remain at seed values
self.assertEqual([40, 45], ages[3:])
+ def test_update_list_and_map_columns(self):
+ list_map_schema = pa.schema([
+ ('id', pa.int32()),
+ ('tags', pa.list_(pa.string())),
+ ('meta', pa.map_(pa.string(), pa.string())),
+ ])
+ table = self._create_table(pa_schema=list_map_schema)
+ self._write_arrow(table, pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'tags': [['a', 'b'], ['c'], ['d', 'e']],
+ 'meta': [[('k1', 'v1')], [('k2', 'v2')], [('k3', 'v3')]],
+ }, schema=list_map_schema))
+
+ rb = table.new_read_builder().with_projection(
+ ['id', '_ROW_ID'])
+ rid_result = rb.new_read().to_arrow(
+ rb.new_scan().plan().splits()).sort_by('id')
+ row_ids = rid_result['_ROW_ID'].to_pylist()
+
+ self._do_update(table, pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids[0], row_ids[2]],
+ type=pa.int64()),
+ 'tags': [['x', 'y'], ['z']],
+ 'meta': [[('k1', 'new1')], [('k3', 'new3')]],
+ }), ['tags', 'meta'])
+
+ result = self._read_all(table).sort_by('id')
+ self.assertEqual(
+ [['x', 'y'], ['c'], ['z']],
+ result['tags'].to_pylist())
+ self.assertEqual(
+ [[('k1', 'new1')], [('k2', 'v2')], [('k3', 'new3')]],
+ result['meta'].to_pylist())
+
+ self._do_update(table, pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids[1]], type=pa.int64()),
+ 'meta': [{'k2': 'dict_val'}],
+ }), ['meta'])
+ result2 = self._read_all(table).sort_by('id')
+ self.assertEqual(
+ [[('k1', 'new1')], [('k2', 'dict_val')], [('k3', 'new3')]],
+ result2['meta'].to_pylist())
+
+ with self.assertRaisesRegex(ValueError, "schema-less dict"):
+ self._do_update(table, pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids[0], row_ids[2]],
+ type=pa.int64()),
+ 'meta': [{'a': '1'}, {'b': '2'}],
+ }), ['meta'])
+
+ self._do_update(table, pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids[0], row_ids[2]],
+ type=pa.int64()),
+ 'meta': pa.array(
+ [[('a', '1')], [('b', '2')]],
+ type=pa.map_(pa.string(), pa.string())),
+ }), ['meta'])
+ result3 = self._read_all(table).sort_by('id')
+ self.assertEqual(
+ [[('a', '1')], [('k2', 'dict_val')], [('b', '2')]],
+ result3['meta'].to_pylist())
+
+ with self.assertRaisesRegex(ValueError, "schema-less dict"):
+ self._do_update(table, pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids[0]], type=pa.int64()),
+ 'meta': [{'a': None}],
+ }), ['meta'])
+
+ self._do_update(table, pa.Table.from_pydict({
+ '_ROW_ID': pa.array([row_ids[0]], type=pa.int64()),
+ 'meta': [[('a', None)]],
+ }, schema=pa.schema([
+ ('_ROW_ID', pa.int64()),
+ ('meta', pa.map_(pa.string(), pa.string())),
+ ])), ['meta'])
+ result4 = self._read_all(table).sort_by('id')
+ self.assertEqual(
+ [[('a', None)], [('k2', 'dict_val')], [('b', '2')]],
+ result4['meta'].to_pylist())
+
# ======================================================================
# Mode-specific mixins (add the ``update_by_arrow_with_row_id`` primitive)
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index c45a705d00..130d2763a2 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -19,6 +19,7 @@ import bisect
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
+import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
@@ -309,6 +310,9 @@ class TableUpdateByRowId:
for col_name in column_names:
update_col = update_by_col[col_name]
original_col = original_data[col_name].combine_chunks()
+ if update_col.type != original_col.type:
+ update_col = self._coerce_column(
+ update_col, original_col.type)
if self._is_blob_column(col_name):
blob_columns[col_name] = [
update_col[update_positions[i]].as_py()
@@ -317,15 +321,54 @@ class TableUpdateByRowId:
for i in range(original_data.num_rows)
]
else:
- # replace_with_mask fills mask=True positions with update
values in order
- merged_columns[col_name] = pc.replace_with_mask(
- original_col, mask, update_col.cast(original_col.type)
- )
+ try:
+ merged_columns[col_name] = pc.replace_with_mask(
+ original_col, mask, update_col)
+ except pa.lib.ArrowNotImplementedError:
+ n = original_data.num_rows
+ combined = pa.concat_arrays(
+ [original_col, update_col])
+ offset = len(original_col)
+ indices = np.arange(n, dtype=np.int64)
+ for orig_pos, upd_idx in update_positions.items():
+ indices[orig_pos] = offset + upd_idx
+ merged_columns[col_name] = combined.take(
+ pa.array(indices))
merged_table = pa.table(merged_columns) if merged_columns else None
return merged_table, blob_columns
+ @staticmethod
+ def _coerce_column(col: pa.Array, target_type: pa.DataType) -> pa.Array:
+ try:
+ return col.cast(target_type)
+ except (pa.lib.ArrowNotImplementedError,
+ pa.lib.ArrowInvalid,
+ pa.lib.ArrowTypeError):
+ pass
+ pylist = col.to_pylist()
+ if pa.types.is_map(target_type):
+ converted = []
+ for row in pylist:
+ if row is None:
+ converted.append(None)
+ elif isinstance(row, dict):
+ if pa.types.is_struct(col.type) and any(
+ v is None for v in row.values()):
+ raise ValueError(
+ "Cannot coerce schema-less dict input with null "
+ "values to map type. PyArrow represents both "
+ "missing dict keys and explicit null map values "
+ "as None; pass an explicit map-typed array or "
+ "list-of-pairs instead.")
+ converted.append(list(row.items()))
+ else:
+ converted.append(
+ [tuple(pair) for pair in row])
+ pylist = converted
+ return pa.array(pylist, type=target_type)
+
def _is_blob_column(self, column_name: str) -> bool:
for table_field in self.table.fields:
if table_field.name == column_name: