This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 51bfa9a10b [python] Fix bin packing bug when writing large data (#6189)
51bfa9a10b is described below

commit 51bfa9a10b224fa52b245e65632a79cd65d10dbe
Author: umi <55790489+discivig...@users.noreply.github.com>
AuthorDate: Wed Sep 3 15:13:24 2025 +0800

    [python] Fix bin packing bug when writing large data (#6189)
---
 paimon-python/pypaimon/read/table_scan.py          |   2 +-
 .../pypaimon/tests/rest_table_read_write_test.py   | 122 +++++++++++++++++++++
 2 files changed, 123 insertions(+), 1 deletion(-)

diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 1c2c4f33dc..2a927c0055 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -249,7 +249,7 @@ class TableScan:
         for item in items:
             weight = weight_func(item)
             if bin_weight + weight > target_weight and len(bin_items) > 0:
-                packed.append(bin_items)
+                packed.append(list(bin_items))
                 bin_items.clear()
                 bin_weight = 0
 
diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py 
b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
index fd1cd65b07..0608c4ef58 100644
--- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
@@ -16,9 +16,16 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 
+import logging
+
 import pandas as pd
 import pyarrow as pa
 
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
 from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
 
@@ -273,3 +280,118 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
         actual = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
         expect = pd.DataFrame(self.raw_data)
         pd.testing.assert_frame_equal(actual.reset_index(drop=True), 
expect.reset_index(drop=True))
+
+    def testWriteWideTableLargeData(self):
+        logging.basicConfig(level=logging.INFO)
+        catalog = CatalogFactory.create(self.options)
+
+        # Build table structure: 200 data columns + 1 partition column
+        # Create PyArrow schema
+        pa_fields = []
+
+        # Create 200 data columns f0 to f199
+        for i in range(200):
+            pa_fields.append(pa.field(f"f{i}", pa.string(), 
metadata={"description": f"Column f{i}"}))
+
+        # Add partition column dt
+        pa_fields.append(pa.field("dt", pa.string(), metadata={"description": 
"Partition column dt"}))
+
+        # Create PyArrow schema
+        pa_schema = pa.schema(pa_fields)
+
+        # Convert to Paimon Schema and specify partition key
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"])
+
+        # Create table
+        table_identifier = Identifier.create("default", "wide_table_200cols")
+        try:
+            # If table already exists, drop it first
+            try:
+                catalog.get_table(table_identifier)
+                catalog.drop_table(table_identifier)
+                print(f"Dropped existing table {table_identifier}")
+            except Exception:
+                # Table does not exist, continue creating
+                pass
+
+            # Create new table
+            catalog.create_table(
+                identifier=table_identifier,
+                schema=schema,
+                ignore_if_exists=False
+            )
+
+            print(
+                f"Successfully created table {table_identifier} with 
{len(pa_fields) - 1} "
+                f"data columns and 1 partition column")
+            print(
+                f"Table schema: {len([f for f in pa_fields if f.name != 
'dt'])} data columns (f0-f199) + dt partition")
+
+        except Exception as e:
+            print(f"Error creating table: {e}")
+            raise e
+        import random
+
+        table_identifier = Identifier.create("default", "wide_table_200cols")
+        table = catalog.get_table(table_identifier)
+
+        total_rows = 500000  # rows of data
+        batch_size = 100000  # 100,000 rows per batch
+        commit_batches = total_rows // batch_size
+
+        for commit_batch in range(commit_batches):
+            start_idx = commit_batch * batch_size
+            end_idx = start_idx + batch_size
+
+            print(f"Processing batch {commit_batch + 1}/{commit_batches} 
({start_idx:,} - {end_idx:,})...")
+            # Generate data for current batch - generate data for all 200 
columns
+            data = {}
+            # Generate data for f0-f199
+            for i in range(200):
+                if i == 0:
+                    data[f"f{i}"] = [f'value_{j}' for j in range(start_idx, 
end_idx)]
+                elif i == 1:
+                    data[f"f{i}"] = [random.choice(['A', 'B', 'C', 'D', 'E']) 
for _ in range(batch_size)]
+                elif i == 2:
+                    data[f"f{i}"] = [f'detail_{random.randint(1, 1000)}' for _ 
in range(batch_size)]
+                elif i == 3:
+                    data[f"f{i}"] = [f'id_{j:06d}' for j in range(start_idx, 
end_idx)]
+                else:
+                    # Generate random string data for other columns
+                    data[f"f{i}"] = [f'col{i}_val_{random.randint(1, 10000)}' 
for _ in range(batch_size)]
+
+            # Add partition column data
+            data['dt'] = ['2025-09-01' for _ in range(batch_size)]
+            # Convert dictionary to PyArrow RecordBatch
+            arrow_batch = pa.RecordBatch.from_pydict(data)
+            # Create new write and commit objects for each commit batch
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+
+            try:
+                # Write current batch data
+                table_write.write_arrow_batch(arrow_batch)
+                print("Batch data write completed, committing...")
+                # Commit current batch
+                commit_messages = table_write.prepare_commit()
+                table_commit.commit(commit_messages)
+                print(f"Batch {commit_batch + 1} committed successfully! 
Written {end_idx:,} rows of data")
+
+            finally:
+                # Ensure resource cleanup
+                table_write.close()
+                table_commit.close()
+
+        print(
+            f"All data writing completed! "
+            f"Total written {total_rows:,} rows of data to 200-column wide 
table in {commit_batches} commits")
+        rest_catalog = 
RESTCatalog(CatalogContext.create_from_options(Options(self.options)))
+        table = rest_catalog.get_table('default.wide_table_200cols')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        read_builder = (table.new_read_builder()
+                        .with_projection(['f0', 'f1'])
+                        .with_filter(predicate=predicate_builder.equal("dt", 
"2025-09-01")))
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        self.assertEqual(table_read.to_arrow(splits).num_rows, total_rows)

Reply via email to