This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 51bfa9a10b [python] Fix bin packing bug when writing large data (#6189) 51bfa9a10b is described below commit 51bfa9a10b224fa52b245e65632a79cd65d10dbe Author: umi <55790489+discivig...@users.noreply.github.com> AuthorDate: Wed Sep 3 15:13:24 2025 +0800 [python] Fix bin packing bug when writing large data (#6189) --- paimon-python/pypaimon/read/table_scan.py | 2 +- .../pypaimon/tests/rest_table_read_write_test.py | 122 +++++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 1c2c4f33dc..2a927c0055 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -249,7 +249,7 @@ class TableScan: for item in items: weight = weight_func(item) if bin_weight + weight > target_weight and len(bin_items) > 0: - packed.append(bin_items) + packed.append(list(bin_items)) bin_items.clear() bin_weight = 0 diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py b/paimon-python/pypaimon/tests/rest_table_read_write_test.py index fd1cd65b07..0608c4ef58 100644 --- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py +++ b/paimon-python/pypaimon/tests/rest_table_read_write_test.py @@ -16,9 +16,16 @@ See the License for the specific language governing permissions and limitations under the License. """ +import logging + import pandas as pd import pyarrow as pa +from pypaimon.api.options import Options +from pypaimon.catalog.catalog_context import CatalogContext +from pypaimon.catalog.catalog_factory import CatalogFactory +from pypaimon.catalog.rest.rest_catalog import RESTCatalog +from pypaimon.common.identifier import Identifier from pypaimon.schema.schema import Schema from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest @@ -273,3 +280,118 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): actual = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf() expect = pd.DataFrame(self.raw_data) pd.testing.assert_frame_equal(actual.reset_index(drop=True), expect.reset_index(drop=True)) + + def testWriteWideTableLargeData(self): + logging.basicConfig(level=logging.INFO) + catalog = CatalogFactory.create(self.options) + + # Build table structure: 200 data columns + 1 partition column + # Create PyArrow schema + pa_fields = [] + + # Create 200 data columns f0 to f199 + for i in range(200): + pa_fields.append(pa.field(f"f{i}", pa.string(), metadata={"description": f"Column f{i}"})) + + # Add partition column dt + pa_fields.append(pa.field("dt", pa.string(), metadata={"description": "Partition column dt"})) + + # Create PyArrow schema + pa_schema = pa.schema(pa_fields) + + # Convert to Paimon Schema and specify partition key + schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"]) + + # Create table + table_identifier = Identifier.create("default", "wide_table_200cols") + try: + # If table already exists, drop it first + try: + catalog.get_table(table_identifier) + catalog.drop_table(table_identifier) + print(f"Dropped existing table {table_identifier}") + except Exception: + # Table does not exist, continue creating + pass + + # Create new table + catalog.create_table( + identifier=table_identifier, + schema=schema, + ignore_if_exists=False + ) + + print( + f"Successfully created table {table_identifier} with {len(pa_fields) - 1} " + f"data columns and 1 partition column") + print( + f"Table schema: {len([f for f in pa_fields if f.name != 'dt'])} data columns (f0-f199) + dt partition") + + except Exception as e: + print(f"Error creating table: {e}") + raise e + import random + + table_identifier = Identifier.create("default", "wide_table_200cols") + table = catalog.get_table(table_identifier) + + total_rows = 500000 # rows of data + batch_size = 100000 # 100,000 rows per batch + commit_batches = total_rows // batch_size + + for commit_batch in range(commit_batches): + start_idx = commit_batch * batch_size + end_idx = start_idx + batch_size + + print(f"Processing batch {commit_batch + 1}/{commit_batches} ({start_idx:,} - {end_idx:,})...") + # Generate data for current batch - generate data for all 200 columns + data = {} + # Generate data for f0-f199 + for i in range(200): + if i == 0: + data[f"f{i}"] = [f'value_{j}' for j in range(start_idx, end_idx)] + elif i == 1: + data[f"f{i}"] = [random.choice(['A', 'B', 'C', 'D', 'E']) for _ in range(batch_size)] + elif i == 2: + data[f"f{i}"] = [f'detail_{random.randint(1, 1000)}' for _ in range(batch_size)] + elif i == 3: + data[f"f{i}"] = [f'id_{j:06d}' for j in range(start_idx, end_idx)] + else: + # Generate random string data for other columns + data[f"f{i}"] = [f'col{i}_val_{random.randint(1, 10000)}' for _ in range(batch_size)] + + # Add partition column data + data['dt'] = ['2025-09-01' for _ in range(batch_size)] + # Convert dictionary to PyArrow RecordBatch + arrow_batch = pa.RecordBatch.from_pydict(data) + # Create new write and commit objects for each commit batch + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + try: + # Write current batch data + table_write.write_arrow_batch(arrow_batch) + print("Batch data write completed, committing...") + # Commit current batch + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + print(f"Batch {commit_batch + 1} committed successfully! Written {end_idx:,} rows of data") + + finally: + # Ensure resource cleanup + table_write.close() + table_commit.close() + + print( + f"All data writing completed! " + f"Total written {total_rows:,} rows of data to 200-column wide table in {commit_batches} commits") + rest_catalog = RESTCatalog(CatalogContext.create_from_options(Options(self.options))) + table = rest_catalog.get_table('default.wide_table_200cols') + predicate_builder = table.new_read_builder().new_predicate_builder() + read_builder = (table.new_read_builder() + .with_projection(['f0', 'f1']) + .with_filter(predicate=predicate_builder.equal("dt", "2025-09-01"))) + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + self.assertEqual(table_read.to_arrow(splits).num_rows, total_rows)