This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9636052e3d [python] Fix update-by-row-id for list and map column types 
(#8162)
9636052e3d is described below

commit 9636052e3d424a976bd5a0a7ccd8e176da966671
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jun 8 20:29:53 2026 +0800

    [python] Fix update-by-row-id for list and map column types (#8162)
---
 paimon-python/pypaimon/tests/table_update_test.py  | 80 ++++++++++++++++++++++
 .../pypaimon/write/table_update_by_row_id.py       | 51 ++++++++++++--
 2 files changed, 127 insertions(+), 4 deletions(-)

diff --git a/paimon-python/pypaimon/tests/table_update_test.py 
b/paimon-python/pypaimon/tests/table_update_test.py
index 57ae605703..97f8a4f676 100644
--- a/paimon-python/pypaimon/tests/table_update_test.py
+++ b/paimon-python/pypaimon/tests/table_update_test.py
@@ -570,6 +570,86 @@ class _TableUpdateTestBase(DataEvolutionTestBase):
         # Rows 3 & 4 must remain at seed values
         self.assertEqual([40, 45], ages[3:])
 
+    def test_update_list_and_map_columns(self):
+        list_map_schema = pa.schema([
+            ('id', pa.int32()),
+            ('tags', pa.list_(pa.string())),
+            ('meta', pa.map_(pa.string(), pa.string())),
+        ])
+        table = self._create_table(pa_schema=list_map_schema)
+        self._write_arrow(table, pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'tags': [['a', 'b'], ['c'], ['d', 'e']],
+            'meta': [[('k1', 'v1')], [('k2', 'v2')], [('k3', 'v3')]],
+        }, schema=list_map_schema))
+
+        rb = table.new_read_builder().with_projection(
+            ['id', '_ROW_ID'])
+        rid_result = rb.new_read().to_arrow(
+            rb.new_scan().plan().splits()).sort_by('id')
+        row_ids = rid_result['_ROW_ID'].to_pylist()
+
+        self._do_update(table, pa.Table.from_pydict({
+            '_ROW_ID': pa.array([row_ids[0], row_ids[2]],
+                                type=pa.int64()),
+            'tags': [['x', 'y'], ['z']],
+            'meta': [[('k1', 'new1')], [('k3', 'new3')]],
+        }), ['tags', 'meta'])
+
+        result = self._read_all(table).sort_by('id')
+        self.assertEqual(
+            [['x', 'y'], ['c'], ['z']],
+            result['tags'].to_pylist())
+        self.assertEqual(
+            [[('k1', 'new1')], [('k2', 'v2')], [('k3', 'new3')]],
+            result['meta'].to_pylist())
+
+        self._do_update(table, pa.Table.from_pydict({
+            '_ROW_ID': pa.array([row_ids[1]], type=pa.int64()),
+            'meta': [{'k2': 'dict_val'}],
+        }), ['meta'])
+        result2 = self._read_all(table).sort_by('id')
+        self.assertEqual(
+            [[('k1', 'new1')], [('k2', 'dict_val')], [('k3', 'new3')]],
+            result2['meta'].to_pylist())
+
+        with self.assertRaisesRegex(ValueError, "schema-less dict"):
+            self._do_update(table, pa.Table.from_pydict({
+                '_ROW_ID': pa.array([row_ids[0], row_ids[2]],
+                                    type=pa.int64()),
+                'meta': [{'a': '1'}, {'b': '2'}],
+            }), ['meta'])
+
+        self._do_update(table, pa.Table.from_pydict({
+            '_ROW_ID': pa.array([row_ids[0], row_ids[2]],
+                                type=pa.int64()),
+            'meta': pa.array(
+                [[('a', '1')], [('b', '2')]],
+                type=pa.map_(pa.string(), pa.string())),
+        }), ['meta'])
+        result3 = self._read_all(table).sort_by('id')
+        self.assertEqual(
+            [[('a', '1')], [('k2', 'dict_val')], [('b', '2')]],
+            result3['meta'].to_pylist())
+
+        with self.assertRaisesRegex(ValueError, "schema-less dict"):
+            self._do_update(table, pa.Table.from_pydict({
+                '_ROW_ID': pa.array([row_ids[0]], type=pa.int64()),
+                'meta': [{'a': None}],
+            }), ['meta'])
+
+        self._do_update(table, pa.Table.from_pydict({
+            '_ROW_ID': pa.array([row_ids[0]], type=pa.int64()),
+            'meta': [[('a', None)]],
+        }, schema=pa.schema([
+            ('_ROW_ID', pa.int64()),
+            ('meta', pa.map_(pa.string(), pa.string())),
+        ])), ['meta'])
+        result4 = self._read_all(table).sort_by('id')
+        self.assertEqual(
+            [[('a', None)], [('k2', 'dict_val')], [('b', '2')]],
+            result4['meta'].to_pylist())
+
 
 # ======================================================================
 # Mode-specific mixins (add the ``update_by_arrow_with_row_id`` primitive)
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py 
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index c45a705d00..130d2763a2 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -19,6 +19,7 @@ import bisect
 from dataclasses import dataclass, field
 from typing import Dict, List, Optional, Tuple
 
+import numpy as np
 import pyarrow as pa
 import pyarrow.compute as pc
 
@@ -309,6 +310,9 @@ class TableUpdateByRowId:
         for col_name in column_names:
             update_col = update_by_col[col_name]
             original_col = original_data[col_name].combine_chunks()
+            if update_col.type != original_col.type:
+                update_col = self._coerce_column(
+                    update_col, original_col.type)
             if self._is_blob_column(col_name):
                 blob_columns[col_name] = [
                     update_col[update_positions[i]].as_py()
@@ -317,15 +321,54 @@ class TableUpdateByRowId:
                     for i in range(original_data.num_rows)
                 ]
             else:
-                # replace_with_mask fills mask=True positions with update 
values in order
-                merged_columns[col_name] = pc.replace_with_mask(
-                    original_col, mask, update_col.cast(original_col.type)
-                )
+                try:
+                    merged_columns[col_name] = pc.replace_with_mask(
+                        original_col, mask, update_col)
+                except pa.lib.ArrowNotImplementedError:
+                    n = original_data.num_rows
+                    combined = pa.concat_arrays(
+                        [original_col, update_col])
+                    offset = len(original_col)
+                    indices = np.arange(n, dtype=np.int64)
+                    for orig_pos, upd_idx in update_positions.items():
+                        indices[orig_pos] = offset + upd_idx
+                    merged_columns[col_name] = combined.take(
+                        pa.array(indices))
 
         merged_table = pa.table(merged_columns) if merged_columns else None
 
         return merged_table, blob_columns
 
+    @staticmethod
+    def _coerce_column(col: pa.Array, target_type: pa.DataType) -> pa.Array:
+        try:
+            return col.cast(target_type)
+        except (pa.lib.ArrowNotImplementedError,
+                pa.lib.ArrowInvalid,
+                pa.lib.ArrowTypeError):
+            pass
+        pylist = col.to_pylist()
+        if pa.types.is_map(target_type):
+            converted = []
+            for row in pylist:
+                if row is None:
+                    converted.append(None)
+                elif isinstance(row, dict):
+                    if pa.types.is_struct(col.type) and any(
+                            v is None for v in row.values()):
+                        raise ValueError(
+                            "Cannot coerce schema-less dict input with null "
+                            "values to map type. PyArrow represents both "
+                            "missing dict keys and explicit null map values "
+                            "as None; pass an explicit map-typed array or "
+                            "list-of-pairs instead.")
+                    converted.append(list(row.items()))
+                else:
+                    converted.append(
+                        [tuple(pair) for pair in row])
+            pylist = converted
+        return pa.array(pylist, type=target_type)
+
     def _is_blob_column(self, column_name: str) -> bool:
         for table_field in self.table.fields:
             if table_field.name == column_name:

Reply via email to