cbb330 commented on code in PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#discussion_r2814574602
##########
mkdocs/docs/api.md:
##########
@@ -355,6 +355,36 @@ for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
```
+You can control the number of rows per batch using the `batch_size` parameter:
+
+```python
Review Comment:
The API feels mechanism-oriented (streaming, concurrent_files, batch_size)
and it’s hard to know what to choose. Could we add a short “which config should
I use?” table with recommended starting points (ordered
default, memory-safe streaming, max-throughput streaming+concurrent_files)
and clarify that batch_size is usually an advanced tuning knob?
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1677,6 +1684,87 @@ def _read_all_delete_files(io: FileIO, tasks:
Iterable[FileScanTask]) -> dict[st
return deletes_per_file
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+ tasks: list[FileScanTask],
+ batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+ concurrent_files: int,
+ max_buffered_batches: int = 16,
+) -> Generator[pa.RecordBatch, None, None]:
+ """Read batches from multiple files concurrently with bounded memory.
+
+ Workers read from files in parallel (up to concurrent_files at a time) and
push
+ batches into a shared queue. The consumer yields batches from the queue.
+ A sentinel value signals completion, avoiding timeout-based polling.
+
+ Args:
+ tasks: The file scan tasks to process.
+ batch_fn: A callable that takes a FileScanTask and returns an iterator
of RecordBatches.
+ concurrent_files: Maximum number of files to read concurrently.
+ max_buffered_batches: Maximum number of batches to buffer in the queue.
+ """
+ if not tasks:
+ return
+
+ batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] =
queue.Queue(maxsize=max_buffered_batches)
+ cancel_event = threading.Event()
+ pending_count = len(tasks)
+ pending_lock = threading.Lock()
+ file_semaphore = threading.Semaphore(concurrent_files)
+
+ def worker(task: FileScanTask) -> None:
+ nonlocal pending_count
+ try:
+ # Blocking acquire — on cancellation, extra permits are released
to unblock.
+ file_semaphore.acquire()
+ if cancel_event.is_set():
+ return
+
+ for batch in batch_fn(task):
+ if cancel_event.is_set():
+ return
+ batch_queue.put(batch)
+ except BaseException as e:
+ if not cancel_event.is_set():
+ batch_queue.put(e)
+ finally:
+ file_semaphore.release()
+ with pending_lock:
+ pending_count -= 1
+ if pending_count == 0:
+ batch_queue.put(_QUEUE_SENTINEL)
+
+ executor = ExecutorFactory.get_or_create()
+ futures = [executor.submit(worker, task) for task in tasks]
Review Comment:
Current shape submits one future per file and then gates with a semaphore.
Could we simplify to keep only `concurrent_files` active iterators at a time
(e.g. `ThreadPoolExecutor(max_workers=concurrent_files)` +
`wait(..., FIRST_COMPLETED)` on `next(batch_iter))`?
That would naturally bound in-flight work and remove the extra
semaphore/lock/cancellation bookkeeping.
##########
mkdocs/docs/api.md:
##########
@@ -355,6 +355,36 @@ for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
```
+You can control the number of rows per batch using the `batch_size` parameter:
+
+```python
+for buf in tbl.scan().to_arrow_batch_reader(batch_size=1000):
+ print(f"Buffer contains {len(buf)} rows")
+```
+
+By default, each file's batches are materialized in memory before being
yielded. For large files that may exceed available memory, use `streaming=True`
to yield batches as they are produced without materializing entire files:
+
+```python
+for buf in tbl.scan().to_arrow_batch_reader(streaming=True, batch_size=1000):
+ print(f"Buffer contains {len(buf)} rows")
+```
+
+For maximum throughput, use `concurrent_files` to read multiple files in
parallel while streaming. Batches are yielded as they arrive from any file —
ordering across files is not guaranteed:
Review Comment:
For DDP, deterministic input order (or deterministic per-rank sharding
independent of arrival order) is usually required.
So this API tradeoff matters directly for training correctness and
reproducibility, not just ergonomics.
I think it would be best to default to a deterministic ordering and allow
true arrival first mechanism with an override. for example by doing an ordered
merge. while there is still a fundamental cost: if an early file is slow,
later files wait (head-of-line blocking), so peak throughput drops.
Perhaps Good API shape is to make ordering explicit, e.g. order="task"
(deterministic) vs order="arrival" (fastest)
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1677,6 +1684,87 @@ def _read_all_delete_files(io: FileIO, tasks:
Iterable[FileScanTask]) -> dict[st
return deletes_per_file
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+ tasks: list[FileScanTask],
+ batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+ concurrent_files: int,
+ max_buffered_batches: int = 16,
+) -> Generator[pa.RecordBatch, None, None]:
+ """Read batches from multiple files concurrently with bounded memory.
+
+ Workers read from files in parallel (up to concurrent_files at a time) and
push
+ batches into a shared queue. The consumer yields batches from the queue.
+ A sentinel value signals completion, avoiding timeout-based polling.
+
+ Args:
+ tasks: The file scan tasks to process.
+ batch_fn: A callable that takes a FileScanTask and returns an iterator
of RecordBatches.
+ concurrent_files: Maximum number of files to read concurrently.
+ max_buffered_batches: Maximum number of batches to buffer in the queue.
+ """
+ if not tasks:
+ return
+
+ batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] =
queue.Queue(maxsize=max_buffered_batches)
+ cancel_event = threading.Event()
+ pending_count = len(tasks)
+ pending_lock = threading.Lock()
+ file_semaphore = threading.Semaphore(concurrent_files)
+
+ def worker(task: FileScanTask) -> None:
+ nonlocal pending_count
+ try:
+ # Blocking acquire — on cancellation, extra permits are released
to unblock.
+ file_semaphore.acquire()
+ if cancel_event.is_set():
+ return
+
+ for batch in batch_fn(task):
+ if cancel_event.is_set():
+ return
+ batch_queue.put(batch)
+ except BaseException as e:
+ if not cancel_event.is_set():
+ batch_queue.put(e)
+ finally:
+ file_semaphore.release()
+ with pending_lock:
+ pending_count -= 1
+ if pending_count == 0:
+ batch_queue.put(_QUEUE_SENTINEL)
+
+ executor = ExecutorFactory.get_or_create()
+ futures = [executor.submit(worker, task) for task in tasks]
+
+ try:
+ while True:
+ item = batch_queue.get()
+
+ if item is _QUEUE_SENTINEL:
+ break
+
+ if isinstance(item, BaseException):
+ raise item
+
+ yield item
+ finally:
+ cancel_event.set()
+ # Release semaphore permits to unblock any workers waiting on acquire()
+ for _ in range(len(tasks)):
+ file_semaphore.release()
+ # Drain the queue to unblock any workers stuck on put()
+ while not batch_queue.empty():
+ try:
+ batch_queue.get_nowait()
+ except queue.Empty:
+ break
+ for future in futures:
+ future.cancel()
Review Comment:
future.cancel() only stops tasks that have not started yet. If the reader
stops early, already-running workers may keep running in the background. Can we
shut down worker threads deterministically (for example with a per-scan
executor) so no background work is left after the iterator is closed?
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1773,17 +1879,40 @@ def to_record_batches(self, tasks:
Iterable[FileScanTask]) -> Iterator[pa.Record
Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to
the schema type
+ or when concurrent_files < 1
"""
+ if concurrent_files < 1:
Review Comment:
this important function now does a few different things.
I’d split to_record_batches into 4 helpers and keep it as a dispatcher:
1. `_prepare_tasks_and_deletes(tasks)` -> `tuple[list[FileScanTask],
dict[...] ]`
2. `_iter_batches_streaming(task_list, deletes_per_file, batch_size,
concurrent_files)`
3. `_iter_batches_materialized(task_list, deletes_per_file, batch_size)`
4. `_apply_global_limit(batches)`
Then to_record_batches becomes:
```python
task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
batches = (
self._iter_batches_streaming(...) if streaming
else self._iter_batches_materialized(...)
)
yield from self._apply_global_limit(batches)
```
so ultimately
- keeps one place for limit semantics.
- Makes streaming/non-streaming behavior independently testable.
- Reduces cognitive load in the public method.
- Makes future ordering modes easier to add
##########
tests/benchmark/test_read_benchmark.py:
##########
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
must this run in CI?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]