This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a4bf8884c [python] Fix rolling stack overflow by replacing recursion 
with iteration (#7578)
0a4bf8884c is described below

commit 0a4bf8884ca1d52ef5c679f08ec3848ef2467ca5
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Apr 2 14:28:44 2026 +0800

    [python] Fix rolling stack overflow by replacing recursion with iteration 
(#7578)
    
    Fix potential stack overflow in `_check_and_roll_if_needed` by replacing
    recursion with a while loop. This can occur when `target_file_size` is
    set to a small value, causing excessive recursive splits.
---
 .../pypaimon/tests/write/table_write_test.py       | 35 ++++++++++++++++++++++
 paimon-python/pypaimon/write/writer/data_writer.py | 22 +++++++-------
 2 files changed, 45 insertions(+), 12 deletions(-)

diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py 
b/paimon-python/pypaimon/tests/write/table_write_test.py
index 60ddcdc0bd..2c685f5830 100644
--- a/paimon-python/pypaimon/tests/write/table_write_test.py
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -28,6 +28,8 @@ import pyarrow as pa
 from parameterized import parameterized
 
 from pypaimon.common.json_util import JSON
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
 
 
 class TableWriteTest(unittest.TestCase):
@@ -436,3 +438,36 @@ class TableWriteTest(unittest.TestCase):
         splits = read_builder.new_scan().plan().splits()
         actual = table_read.to_arrow(splits)
         self.assertEqual(expected, actual)
+
+    def test_rolling(self):
+        pa_schema = pa.schema([('name', pa.string())])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=[])
+        self.catalog.create_table('default.test_rolling_recursion', schema, 
True)
+        table = self.catalog.get_table('default.test_rolling_recursion')
+
+        row_value = 'x' * 100
+        sample = pa.Table.from_batches([
+            pa.RecordBatch.from_pydict({'name': pa.array([row_value], 
type=pa.string())})
+        ])
+        # Set target just above single chunk nbytes so best_split=1 every time
+        target = sample.nbytes + 1
+
+        options = CoreOptions.copy(table.options)
+        options.set(CoreOptions.TARGET_FILE_SIZE, str(target))
+        writer = AppendOnlyDataWriter(
+            table=table, partition=(), bucket=0,
+            max_seq_number=0, options=options,
+        )
+
+        num_rows = 1500
+        big_batch = pa.RecordBatch.from_pydict(
+            {'name': pa.array([row_value] * num_rows, type=pa.string())}
+        )
+        writer.write(big_batch)
+
+        pending_rows = writer.pending_data.num_rows if writer.pending_data is 
not None else 0
+        committed_rows = sum(f.row_count for f in writer.committed_files)
+        self.assertEqual(committed_rows + pending_rows, num_rows)
+        self.assertGreater(len(writer.committed_files), 0)
+        if writer.pending_data is not None:
+            self.assertLessEqual(writer.pending_data.nbytes, target)
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index a58e7b36af..725a1fb230 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -142,19 +142,17 @@ class DataWriter(ABC):
         """Merge existing data with new data. Must be implemented by 
subclasses."""
 
     def _check_and_roll_if_needed(self):
-        if self.pending_data is None:
-            return
-
-        current_size = self.pending_data.nbytes
-        if current_size > self.target_file_size:
+        while self.pending_data is not None:
+            current_size = self.pending_data.nbytes
+            if current_size <= self.target_file_size:
+                break
             split_row = self._find_optimal_split_point(self.pending_data, 
self.target_file_size)
-            if split_row > 0:
-                data_to_write = self.pending_data.slice(0, split_row)
-                remaining_data = self.pending_data.slice(split_row)
-
-                self._write_data_to_file(data_to_write)
-                self.pending_data = remaining_data
-                self._check_and_roll_if_needed()
+            if split_row <= 0:
+                break
+            data_to_write = self.pending_data.slice(0, split_row)
+            remaining_data = self.pending_data.slice(split_row)
+            self._write_data_to_file(data_to_write)
+            self.pending_data = remaining_data
 
     def _write_data_to_file(self, data: pa.Table):
         if data.num_rows == 0:

Reply via email to