TheR1sing3un opened a new pull request, #7870:
URL: https://github.com/apache/paimon/pull/7870

   ## Purpose
   
   Today `TableRead.to_pandas` / `to_arrow` iterate splits serially in 
`_arrow_batch_generator`, so wall time scales linearly with the number of 
splits even though PyArrow's parquet/orc readers release the GIL during decode. 
Unlike Java, where Flink/Spark fan splits out across TaskManagers/Executors, 
PyPaimon has no external framework above the SDK; split-level parallelism 
therefore has to live inside the SDK.
   
   This PR adds an opt-in `max_workers` parameter to `to_pandas` / `to_arrow`. 
Default behavior is unchanged.
   
   ## Linked issue
   
   N/A — direct contribution.
   
   ## API
   
   ```python
   read.to_arrow(splits, max_workers=4)
   read.to_pandas(splits, max_workers=4)
   ```
   
   - `max_workers=None` (default) or `1` → original serial path, no thread pool 
created.
   - `>= 2` with at least 2 splits → `ThreadPoolExecutor` runs splits 
concurrently; the final `Table` is assembled in the input splits' order 
(results collected by submission index).
   - `< 1` → `ValueError`.
   
   Other `to_*` methods (`to_arrow_batch_reader`, `to_iterator`, `to_duckdb`, 
`to_ray`, `to_torch`) are intentionally untouched — their order-preserving / 
streaming semantics deserve a separate look.
   
   ## Correctness under `limit`
   
   `_RemainingRows` is a thread-safe row-quota counter shared by all workers. 
Quota is *pre-debited* under a single lock so the combined output never exceeds 
`self.limit`, even if individual readers decode one extra batch after the quota 
is gone (the surplus batch is simply dropped, never emitted).
   
   ## Resource handling
   
   Each worker uses `try/finally: reader.close()`. `ThreadPoolExecutor`'s 
wait-on-exit guarantees every started reader is closed before `to_arrow` 
returns, even when one worker raises and propagates its exception.
   
   ## Tests
   
   Added `paimon-python/pypaimon/tests/reader_parallel_test.py` (16 tests):
   
   - `_RemainingRows`: unbounded, bounded pre-debit, zero-request, 8-thread 
contention.
   - Append-only multi-partition: parallel result is byte-equal to serial.
   - PK merge-on-read multi-bucket: parallel + serial produce the same merged 
rows.
   - `limit` + parallel: 10 repeated runs return exactly the configured row 
count.
   - Edge cases: empty splits with `max_workers=4`, `max_workers` exceeding 
split count, `max_workers=0/-1` rejected, `max_workers=1` matches serial, 
`include_row_kind=True` parity.
   - Reader error propagation: when one split's `create_reader` raises, the 
exception surfaces from `to_pandas` and sibling readers are cleaned up.
   
   ## API / format impact
   
   - API: additive only (one new optional parameter; default preserves existing 
behavior).
   - Storage / on-disk format: no change.
   - `CoreOptions`: no new option introduced in this round.
   
   ## Documentation impact
   
   Docstrings on `to_arrow` / `to_pandas` updated. Design doc added at 
`paimon-python/docs/design/2026-05-15-pypaimon-parallel-to-pandas.md`. README 
untouched.
   
   ## Generative AI disclosure
   
   Yes — the implementation, tests, and design doc were drafted with Claude 
Code assistance under my direction and review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to