This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 384c430bbc [python] Fix failing to read 1000cols (#6244) 384c430bbc is described below commit 384c430bbc8adf2e0aa7327eac0e854bed308e98 Author: umi <55790489+discivig...@users.noreply.github.com> AuthorDate: Fri Sep 12 14:40:49 2025 +0800 [python] Fix failing to read 1000cols (#6244) --- paimon-python/pypaimon/table/row/generic_row.py | 4 +- .../pypaimon/tests/py36/ao_read_write_test.py | 76 ++++++++++++++++++++++ .../pypaimon/tests/reader_append_only_test.py | 76 ++++++++++++++++++++++ 3 files changed, 153 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 2f6d3d86ae..14f42e806c 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -54,9 +54,7 @@ class GenericRowDeserializer: arity = len(data_fields) actual_data = bytes_data if len(bytes_data) >= 4: - arity_from_bytes = struct.unpack('>i', bytes_data[:4])[0] - if 0 < arity_from_bytes < 1000: - actual_data = bytes_data[4:] + actual_data = bytes_data[4:] fields = [] null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity) diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 3247ea691f..0e8d97d47b 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -20,7 +20,9 @@ from datetime import datetime from unittest.mock import Mock import pandas as pd +import numpy as np import pyarrow as pa + from pypaimon.api.options import Options from pypaimon.catalog.catalog_context import CatalogContext from pypaimon import CatalogFactory @@ -307,6 +309,80 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): actual = table_sort_by(self._read_test_table(read_builder), 'user_id') self.assertEqual(actual, self.expected) + def test_over1000cols_read(self): + num_rows = 1 + num_cols = 10 + table_name = "default.testBug" + # Generate dynamic schema based on column count + schema_fields = [] + for i in range(1, num_cols + 1): + col_name = f'c{i:03d}' + if i == 1: + schema_fields.append((col_name, pa.string())) # ID column + elif i == 2: + schema_fields.append((col_name, pa.string())) # Name column + elif i == 3: + schema_fields.append((col_name, pa.string())) # Category column (partition key) + elif i % 4 == 0: + schema_fields.append((col_name, pa.float64())) # Float columns + elif i % 4 == 1: + schema_fields.append((col_name, pa.int32())) # Int columns + elif i % 4 == 2: + schema_fields.append((col_name, pa.string())) # String columns + else: + schema_fields.append((col_name, pa.int64())) # Long columns + + pa_schema = pa.schema(schema_fields) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=['c003'], # Use c003 as partition key + ) + + # Create table + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + + # Generate test data + np.random.seed(42) # For reproducible results + categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports', 'Food', 'Toys', 'Beauty', 'Health', 'Auto'] + statuses = ['Active', 'Inactive', 'Pending', 'Completed'] + + # Generate data dictionary + test_data = {} + for i in range(1, num_cols + 1): + col_name = f'c{i:03d}' + if i == 1: + test_data[col_name] = [f'Product_{j}' for j in range(1, num_rows + 1)] + elif i == 2: + test_data[col_name] = [f'Product_{j}' for j in range(1, num_rows + 1)] + elif i == 3: + test_data[col_name] = np.random.choice(categories, num_rows) + elif i % 4 == 0: + test_data[col_name] = np.random.uniform(1.0, 1000.0, num_rows).round(2) + elif i % 4 == 1: + test_data[col_name] = np.random.randint(1, 100, num_rows) + elif i % 4 == 2: + test_data[col_name] = np.random.choice(statuses, num_rows) + else: + test_data[col_name] = np.random.randint(1640995200, 1672531200, num_rows) + + test_df = pd.DataFrame(test_data) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_pandas(test_df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_pandas(table_scan.plan().splits()) + self.assertEqual(result.to_dict(), test_df.to_dict()) + def testAppendOnlyReaderWithFilter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_filter', schema, False) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 5a6c291f2e..795365e48e 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -21,6 +21,8 @@ import tempfile import unittest import pyarrow as pa +import numpy as np +import pandas as pd from pypaimon import CatalogFactory from pypaimon import Schema @@ -113,6 +115,80 @@ class AoReaderTest(unittest.TestCase): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) + def test_over1000cols_read(self): + num_rows = 1 + num_cols = 10 + table_name = "default.testBug" + # Generate dynamic schema based on column count + schema_fields = [] + for i in range(1, num_cols + 1): + col_name = f'c{i:03d}' + if i == 1: + schema_fields.append((col_name, pa.string())) # ID column + elif i == 2: + schema_fields.append((col_name, pa.string())) # Name column + elif i == 3: + schema_fields.append((col_name, pa.string())) # Category column (partition key) + elif i % 4 == 0: + schema_fields.append((col_name, pa.float64())) # Float columns + elif i % 4 == 1: + schema_fields.append((col_name, pa.int32())) # Int columns + elif i % 4 == 2: + schema_fields.append((col_name, pa.string())) # String columns + else: + schema_fields.append((col_name, pa.int64())) # Long columns + + pa_schema = pa.schema(schema_fields) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=['c003'], # Use c003 as partition key + ) + + # Create table + self.catalog.create_table(table_name, schema, False) + table = self.catalog.get_table(table_name) + + # Generate test data + np.random.seed(42) # For reproducible results + categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports', 'Food', 'Toys', 'Beauty', 'Health', 'Auto'] + statuses = ['Active', 'Inactive', 'Pending', 'Completed'] + + # Generate data dictionary + test_data = {} + for i in range(1, num_cols + 1): + col_name = f'c{i:03d}' + if i == 1: + test_data[col_name] = [f'Product_{j}' for j in range(1, num_rows + 1)] + elif i == 2: + test_data[col_name] = [f'Product_{j}' for j in range(1, num_rows + 1)] + elif i == 3: + test_data[col_name] = np.random.choice(categories, num_rows) + elif i % 4 == 0: + test_data[col_name] = np.random.uniform(1.0, 1000.0, num_rows).round(2) + elif i % 4 == 1: + test_data[col_name] = np.random.randint(1, 100, num_rows) + elif i % 4 == 2: + test_data[col_name] = np.random.choice(statuses, num_rows) + else: + test_data[col_name] = np.random.randint(1640995200, 1672531200, num_rows) + + test_df = pd.DataFrame(test_data) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_pandas(test_df) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_pandas(table_scan.plan().splits()) + self.assertEqual(result.to_dict(), test_df.to_dict()) + def testAppendOnlyReaderWithFilter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.catalog.create_table('default.test_append_only_filter', schema, False)