This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4bf8884c [python] Fix rolling stack overflow by replacing recursion
with iteration (#7578)
0a4bf8884c is described below
commit 0a4bf8884ca1d52ef5c679f08ec3848ef2467ca5
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Apr 2 14:28:44 2026 +0800
[python] Fix rolling stack overflow by replacing recursion with iteration
(#7578)
Fix potential stack overflow in `_check_and_roll_if_needed` by replacing
recursion with a while loop. This can occur when `target_file_size` is
set to a small value, causing excessive recursive splits.
---
.../pypaimon/tests/write/table_write_test.py | 35 ++++++++++++++++++++++
paimon-python/pypaimon/write/writer/data_writer.py | 22 +++++++-------
2 files changed, 45 insertions(+), 12 deletions(-)
diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py
b/paimon-python/pypaimon/tests/write/table_write_test.py
index 60ddcdc0bd..2c685f5830 100644
--- a/paimon-python/pypaimon/tests/write/table_write_test.py
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -28,6 +28,8 @@ import pyarrow as pa
from parameterized import parameterized
from pypaimon.common.json_util import JSON
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
class TableWriteTest(unittest.TestCase):
@@ -436,3 +438,36 @@ class TableWriteTest(unittest.TestCase):
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits)
self.assertEqual(expected, actual)
+
+ def test_rolling(self):
+ pa_schema = pa.schema([('name', pa.string())])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=[])
+ self.catalog.create_table('default.test_rolling_recursion', schema,
True)
+ table = self.catalog.get_table('default.test_rolling_recursion')
+
+ row_value = 'x' * 100
+ sample = pa.Table.from_batches([
+ pa.RecordBatch.from_pydict({'name': pa.array([row_value],
type=pa.string())})
+ ])
+ # Set target just above single chunk nbytes so best_split=1 every time
+ target = sample.nbytes + 1
+
+ options = CoreOptions.copy(table.options)
+ options.set(CoreOptions.TARGET_FILE_SIZE, str(target))
+ writer = AppendOnlyDataWriter(
+ table=table, partition=(), bucket=0,
+ max_seq_number=0, options=options,
+ )
+
+ num_rows = 1500
+ big_batch = pa.RecordBatch.from_pydict(
+ {'name': pa.array([row_value] * num_rows, type=pa.string())}
+ )
+ writer.write(big_batch)
+
+ pending_rows = writer.pending_data.num_rows if writer.pending_data is
not None else 0
+ committed_rows = sum(f.row_count for f in writer.committed_files)
+ self.assertEqual(committed_rows + pending_rows, num_rows)
+ self.assertGreater(len(writer.committed_files), 0)
+ if writer.pending_data is not None:
+ self.assertLessEqual(writer.pending_data.nbytes, target)
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index a58e7b36af..725a1fb230 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -142,19 +142,17 @@ class DataWriter(ABC):
"""Merge existing data with new data. Must be implemented by
subclasses."""
def _check_and_roll_if_needed(self):
- if self.pending_data is None:
- return
-
- current_size = self.pending_data.nbytes
- if current_size > self.target_file_size:
+ while self.pending_data is not None:
+ current_size = self.pending_data.nbytes
+ if current_size <= self.target_file_size:
+ break
split_row = self._find_optimal_split_point(self.pending_data,
self.target_file_size)
- if split_row > 0:
- data_to_write = self.pending_data.slice(0, split_row)
- remaining_data = self.pending_data.slice(split_row)
-
- self._write_data_to_file(data_to_write)
- self.pending_data = remaining_data
- self._check_and_roll_if_needed()
+ if split_row <= 0:
+ break
+ data_to_write = self.pending_data.slice(0, split_row)
+ remaining_data = self.pending_data.slice(split_row)
+ self._write_data_to_file(data_to_write)
+ self.pending_data = remaining_data
def _write_data_to_file(self, data: pa.Table):
if data.num_rows == 0: