This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 79c51b9216 [python] Require polars >= 1.32 on Python 3.9+ to fix 
reading nested map types (#8091)
79c51b9216 is described below

commit 79c51b92161cefb25572672eb82c052dd441125e
Author: chaoyang <[email protected]>
AuthorDate: Tue Jun 2 23:02:17 2026 +0800

    [python] Require polars >= 1.32 on Python 3.9+ to fix reading nested map 
types (#8091)
---
 paimon-python/dev/requirements.txt                 |   3 +-
 .../pypaimon/tests/nested_type_read_write_test.py  | 168 +++++++++++++++++++++
 2 files changed, 170 insertions(+), 1 deletion(-)

diff --git a/paimon-python/dev/requirements.txt 
b/paimon-python/dev/requirements.txt
index 828670f6dd..49cdfef034 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -26,7 +26,8 @@ pandas>=1.1,<2; python_version < "3.7"
 pandas>=1.3,<3; python_version >= "3.7" and python_version < "3.9"
 pandas>=1.5,<3; python_version >= "3.9"
 polars>=0.9,<1; python_version<"3.8"
-polars>=1,<2; python_version>="3.8"
+polars>=1,<2; python_version=="3.8"
+polars>=1.32,<2; python_version>="3.9"
 pyarrow>=6,<7; python_version < "3.8"
 pyarrow>=16,<20; python_version >= "3.8"
 pyroaring<=0.3.3; python_version < "3.7"
diff --git a/paimon-python/pypaimon/tests/nested_type_read_write_test.py 
b/paimon-python/pypaimon/tests/nested_type_read_write_test.py
new file mode 100644
index 0000000000..0b2b9da403
--- /dev/null
+++ b/paimon-python/pypaimon/tests/nested_type_read_write_test.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class NestedTypeReadWriteTest(unittest.TestCase):
+    """Read/write of nested map types on primary-key tables.
+
+    Primary-key tables read through a row-based merge path that converts each
+    arrow batch with polars before merging, so reading a map nested inside a
+    struct exercises that conversion. This guards against regressions there
+    (e.g. polars releases that cannot decode a struct-nested arrow Map).
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_pk_table(self, name, pa_schema):
+        self.catalog.create_table(
+            'default.' + name,
+            Schema.from_pyarrow_schema(
+                pa_schema, primary_keys=['id'],
+                options={'bucket': '1', 'file.format': 'parquet'}),
+            False)
+        return self.catalog.get_table('default.' + name)
+
+    @staticmethod
+    def _write(table, pa_schema, ids, values):
+        data = pa.table(
+            {'id': pa.array(ids, pa_schema.field('id').type),
+             pa_schema.field(1).name: pa.array(
+                 values, type=pa_schema.field(1).type)},
+            schema=pa_schema)
+        write_builder = table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_arrow(data)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+    @staticmethod
+    def _read_sorted(table):
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        rows = read_builder.new_read().to_arrow(splits).to_pylist()
+        rows.sort(key=lambda r: r['id'])
+        return rows
+
+    def test_pk_merge_read_map_nested_in_struct(self):
+        # row<latest_version str, latest_value row<...>,
+        #     all_versioned_values map<string, row<...>>>
+        inner = pa.struct([
+            pa.field('audio_vae_version', pa.string()),
+            pa.field('audio_vae_result_path', pa.string()),
+            pa.field('audio_vae_latent_shape', pa.string()),
+        ])
+        top = pa.struct([
+            pa.field('latest_version', pa.string()),
+            pa.field('latest_value', inner),
+            pa.field('all_versioned_values', pa.map_(pa.string(), inner)),
+        ])
+        pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            pa.field('info', top),
+        ])
+
+        def value(ver, path, shape):
+            return {'audio_vae_version': ver,
+                    'audio_vae_result_path': path,
+                    'audio_vae_latent_shape': shape}
+
+        table = self._create_pk_table('nested_map_in_struct', pa_schema)
+
+        # first version
+        self._write(table, pa_schema, [1], [{
+            'latest_version': 'v2',
+            'latest_value': value('v2', '/p/v2', '[1,2,3]'),
+            'all_versioned_values': [
+                ('v1', value('v1', '/p/v1', '[1,2]')),
+                ('v2', value('v2', '/p/v2', '[1,2,3]')),
+            ],
+        }])
+        # same pk again -> forces a merge read (the polars conversion path)
+        self._write(table, pa_schema, [1], [{
+            'latest_version': 'v3',
+            'latest_value': value('v3', '/p/v3', '[1,2,3,4]'),
+            'all_versioned_values': [
+                ('v1', value('v1', '/p/v1', '[1,2]')),
+                ('v2', value('v2', '/p/v2', '[1,2,3]')),
+                ('v3', value('v3', '/p/v3', '[1,2,3,4]')),
+            ],
+        }])
+
+        rows = self._read_sorted(table)
+        self.assertEqual(1, len(rows))
+        info = rows[0]['info']
+        self.assertEqual('v3', info['latest_version'])
+        self.assertEqual(value('v3', '/p/v3', '[1,2,3,4]'),
+                         info['latest_value'])
+        self.assertEqual(
+            [('v1', value('v1', '/p/v1', '[1,2]')),
+             ('v2', value('v2', '/p/v2', '[1,2,3]')),
+             ('v3', value('v3', '/p/v3', '[1,2,3,4]'))],
+            info['all_versioned_values'])
+
+    def test_pk_merge_read_top_level_map(self):
+        # map<string, row<a string, b int>> as a top-level value column
+        row_ab = pa.struct([
+            pa.field('a', pa.string()),
+            pa.field('b', pa.int32()),
+        ])
+        pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            pa.field('v', pa.map_(pa.string(), row_ab)),
+        ])
+
+        table = self._create_pk_table('top_level_map', pa_schema)
+
+        self._write(table, pa_schema, [1, 2], [
+            [('k1', {'a': 'OLD', 'b': 1})],
+            [('k2', {'a': 'keep', 'b': 2})],
+        ])
+        # overwrite id=1, leave id=2 untouched
+        self._write(table, pa_schema, [1], [
+            [('k1', {'a': 'NEW', 'b': 100}), ('k1b', {'a': 'extra', 'b': 
101})],
+        ])
+
+        rows = self._read_sorted(table)
+        self.assertEqual(2, len(rows))
+        self.assertEqual(
+            [('k1', {'a': 'NEW', 'b': 100}), ('k1b', {'a': 'extra', 'b': 
101})],
+            rows[0]['v'])
+        self.assertEqual([('k2', {'a': 'keep', 'b': 2})], rows[1]['v'])
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to