sumedhsakdeo opened a new pull request, #3046:
URL: https://github.com/apache/iceberg-python/pull/3046
# Rationale for this change
## Summary
Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list()
which eagerly materializes all record batches per file into memory, causing OOM
on large tables.
This PR adds three parameters to to_arrow_batch_reader() that give users
control over memory usage and parallelism:
- batch_size — Controls the number of rows per batch passed to PyArrow's
ds.Scanner. Default is PyArrow's built-in 131,072 rows.
- streaming — When True, batches are yielded as they are produced without
materializing entire files into memory. Uses a bounded queue with backpressure
instead of executor.map + list().
- concurrent_files — Number of files to read concurrently when
streaming=True. A semaphore limits active file readers, and a bounded queue
(max 16 batches) provides backpressure to cap memory usage.
## Problem
The current implementation materializes all batches from each file via
list() inside executor.map, which runs up to min(32, cpu_count+4) files in
parallel. For large files this means all batches from ~20 files are held in
memory simultaneously
before any are yielded to the consumer.
## Solution
### Before: OOM on large tables
```python
batches = table.scan().to_arrow_batch_reader()
```
### After: bounded memory, tunable parallelism
```python
batches = table.scan().to_arrow_batch_reader(
streaming=True,
concurrent_files=4,
batch_size=10000,
)
```
Default behavior is unchanged — `streaming=False` preserves the existing
executor.map + list() path for backwards compatibility.
## Architecture
When `streaming=True`, batches flow through _bounded_concurrent_batches:
1. All file tasks are submitted to the shared thread pool
2. A Semaphore(concurrent_files) limits how many files are read
simultaneously
3. Workers push batches into a bounded Queue(maxsize=16) — when full,
workers block (backpressure)
4. The consumer yields batches from the queue via blocking queue.get()
5. A sentinel value signals completion — no timeout-based polling
6. On early termination (consumer stops), extra semaphore permits are
released to unblock waiting workers, and the queue is drained
## Ordering semantics:
| Configuration | File ordering
| Within-file ordering |
|----------------------------|--------------------------------------------------|----------------------------|
| Default (`streaming=False`) | Batches grouped by file, in task
submission order | Row order |
| `streaming=True` | Interleaved across files (no grouping
guarantee) | Row order within each file |
## PR Stack
Breakdown of this large PR into smaller PRs:
1. **[PR 0](https://github.com/sumedhsakdeo/iceberg-python/pull/3)**:
`batch_size` forwarding
2. **[PR 1](https://github.com/sumedhsakdeo/iceberg-python/pull/1)**:
`streaming` flag — stop materializing entire files
3. **[PR 2](https://github.com/sumedhsakdeo/iceberg-python/pull/2)**:
`concurrent_files` — bounded concurrent streaming
4. **[PR 3](https://github.com/sumedhsakdeo/iceberg-python/pull/4)**:
`benchmark`
## Benchmark results
32 files x 500K rows, 5 columns (int64, float64, string, bool, timestamp),
batch_size=131,072 (PyArrow default):
| Config | Throughput | Peak Arrow
Memory |
|-----------------------------------------|--------------|-------------------|
| Default (`executor.map` + `list()`) | 212M rows/s | 635 MB
|
| `streaming=True, concurrent_files=1` | 61M rows/s | 10 MB
|
| `streaming=True, concurrent_files=2` | 111M rows/s | 42 MB
|
| `streaming=True, concurrent_files=4` | 182M rows/s | 111 MB
|
| `streaming=True, concurrent_files=8` | 227M rows/s | 251 MB
|
| `streaming=True, concurrent_files=16` | 218M rows/s | 457 MB
|
Positional deletes, row filters, and limit are handled correctly in all
modes.
## Are these changes tested?
Yes. 23 new unit tests across two test files, plus a micro-benchmark:
- tests/io/test_pyarrow.py (14 tests): batch_size controls rows per batch,
streaming yields all rows correctly, streaming respects limit, within-file
ordering preserved, positional deletes applied correctly in all three modes
(default,
streaming, concurrent), positional deletes with limit, concurrent_files <
1 raises ValueError
- tests/io/test_bounded_concurrent_batches.py (9 tests): single/multi-file
correctness, incremental streaming, backpressure blocks producers when queue is
full, error propagation from workers to consumer, early termination cancels
workers
cleanly, concurrency limit enforced, empty task list, ArrowScan
integration with limit
- tests/benchmark/test_read_benchmark.py: read throughput micro-benchmark
across 6 configurations measuring rows/sec and peak Arrow memory
## Are there any user-facing changes?
Yes. Three new optional parameters on DataScan.to_arrow_batch_reader():
- batch_size: int | None — number of rows per batch (default: PyArrow's
131,072)
- streaming: bool — yield batches without materializing entire files
(default: False)
- concurrent_files: int — number of files to read concurrently when
streaming (default: 1)
All parameters are optional with backwards-compatible defaults. Existing
code is unaffected.
Documentation updated in mkdocs/docs/api.md with usage examples and
ordering semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]