ShreyeshArangath commented on code in PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#discussion_r2854429087
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1710,6 +1685,86 @@ def _read_all_delete_files(io: FileIO, tasks:
Iterable[FileScanTask]) -> dict[st
return deletes_per_file
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+ tasks: list[FileScanTask],
+ batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+ concurrent_streams: int,
+ max_buffered_batches: int = 16,
+) -> Generator[pa.RecordBatch, None, None]:
+ """Read batches from multiple files concurrently with bounded memory.
+
+ Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_streams) to
naturally
+ bound concurrency. Workers push batches into a bounded queue which provides
+ backpressure when the consumer is slower than the producers.
+
+ Args:
+ tasks: The file scan tasks to process.
+ batch_fn: A callable that takes a FileScanTask and returns an iterator
of RecordBatches.
+ concurrent_streams: Maximum number of concurrent read streams.
+ max_buffered_batches: Maximum number of batches to buffer in the queue.
+ """
+ if not tasks:
+ return
+
+ batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] =
queue.Queue(maxsize=max_buffered_batches)
Review Comment:
Should we add defensive guardrails here? If the max_buffered_batches is set
to 0 by the user, the queue size would be
[infinite](https://docs.python.org/3/library/queue.html#queue.Queue). This
seems slightly counterintuitive from an API usage standpoint
##########
mkdocs/docs/api.md:
##########
@@ -355,6 +355,61 @@ for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
```
+By default, each file's batches are materialized in memory before being
yielded (`TaskOrder()`). For large files that may exceed available memory, use
`ArrivalOrder()` to yield batches as they are produced without materializing
entire files:
+
+```python
+from pyiceberg.table import ArrivalOrder
+
+for buf in tbl.scan().to_arrow_batch_reader(order=ArrivalOrder()):
+ print(f"Buffer contains {len(buf)} rows")
+```
+
+For maximum throughput, tune `concurrent_streams` to read multiple files in
parallel with arrival order. Batches are yielded as they arrive from any file —
ordering across files is not guaranteed:
+
+```python
+from pyiceberg.table import ArrivalOrder
+
+for buf in
tbl.scan().to_arrow_batch_reader(order=ArrivalOrder(concurrent_streams=4)):
+ print(f"Buffer contains {len(buf)} rows")
+```
+
+**Ordering semantics:**
+
+| Configuration | File ordering | Within-file ordering |
+|---|---|---|
+| `TaskOrder()` (default) | Batches grouped by file, in task submission order
| Row order |
+| `ArrivalOrder()` | Interleaved across files (no grouping guarantee) | Row
order within each file |
Review Comment:
Should we specify this only applies to ArrivalOrder(concurrent_streams>1)?
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1789,54 +1844,115 @@ def to_table(self, tasks: Iterable[FileScanTask]) ->
pa.Table:
return result
- def to_record_batches(self, tasks: Iterable[FileScanTask]) ->
Iterator[pa.RecordBatch]:
+ def to_record_batches(
+ self,
+ tasks: Iterable[FileScanTask],
+ order: ScanOrder = _DEFAULT_SCAN_ORDER,
+ ) -> Iterator[pa.RecordBatch]:
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
by resolving the right columns that match the current table schema.
Only data that matches the provided row_filter expression is returned.
+ Ordering semantics:
+ - TaskOrder() (default): Yields batches one file at a time in task
submission order.
+ - ArrivalOrder(): Batches may be interleaved across files as they
arrive.
+ Within each file, batch ordering follows row order.
+
Args:
tasks: FileScanTasks representing the data files and delete files
to read from.
+ order: Controls the order in which record batches are returned.
+ TaskOrder() (default) yields batches one file at a time in
task order.
+ ArrivalOrder(concurrent_streams=N, batch_size=B,
max_buffered_batches=M)
+ yields batches as they are produced without materializing
entire files
+ into memory. Peak memory ≈ concurrent_streams × batch_size ×
max_buffered_batches
+ × (average row size in bytes). batch_size is the number of
rows per batch.
+ For example (if average row size ≈ 32 bytes):
+ - ArrivalOrder(concurrent_streams=4, batch_size=32768,
max_buffered_batches=8)
+ - Peak memory ≈ 4 × 32768 rows × 8 × 32 bytes ≈ ~32 MB (plus
Arrow overhead)
Returns:
An Iterator of PyArrow RecordBatches.
Total number of rows will be capped if specified.
Raises:
ResolveError: When a required field cannot be found in the file
- ValueError: When a field type in the file cannot be projected to
the schema type
+ ValueError: When a field type in the file cannot be projected to
the schema type,
+ or when an invalid order value is provided, or when
concurrent_streams < 1.
"""
- deletes_per_file = _read_all_delete_files(self._io, tasks)
+ if not isinstance(order, ScanOrder):
+ raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder
instance (TaskOrder() or ArrivalOrder()).")
- total_row_count = 0
+ task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
+
+ if isinstance(order, ArrivalOrder):
+ if order.concurrent_streams < 1:
+ raise ValueError(f"concurrent_streams must be >= 1, got
{order.concurrent_streams}")
+ return self._apply_limit(
+ self._iter_batches_arrival(
+ task_list, deletes_per_file, order.batch_size,
order.concurrent_streams, order.max_buffered_batches
+ )
+ )
+
+ return self._apply_limit(self._iter_batches_materialized(task_list,
deletes_per_file))
+
+ def _prepare_tasks_and_deletes(
+ self, tasks: Iterable[FileScanTask]
+ ) -> tuple[list[FileScanTask], dict[str, list[ChunkedArray]]]:
+ """Resolve delete files and return tasks as a list."""
+ task_list = list(tasks)
+ deletes_per_file = _read_all_delete_files(self._io, task_list)
+ return task_list, deletes_per_file
+
+ def _iter_batches_arrival(
+ self,
+ task_list: list[FileScanTask],
+ deletes_per_file: dict[str, list[ChunkedArray]],
+ batch_size: int | None,
+ concurrent_streams: int,
+ max_buffered_batches: int = 16,
+ ) -> Iterator[pa.RecordBatch]:
+ """Yield batches using bounded concurrent streaming in arrival
order."""
+
+ def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
+ return self._record_batches_from_scan_tasks_and_deletes([task],
deletes_per_file, batch_size)
+
+ yield from _bounded_concurrent_batches(task_list, batch_fn,
concurrent_streams, max_buffered_batches)
+
+ def _iter_batches_materialized(
+ self,
+ task_list: list[FileScanTask],
+ deletes_per_file: dict[str, list[ChunkedArray]],
+ ) -> Iterator[pa.RecordBatch]:
+ """Yield batches using executor.map with full file materialization."""
executor = ExecutorFactory.get_or_create()
def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
- # Materialize the iterator here to ensure execution happens within
the executor.
- # Otherwise, the iterator would be lazily consumed later (in the
main thread),
- # defeating the purpose of using executor.map.
return
list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
- limit_reached = False
- for batches in executor.map(batches_for_task, tasks):
- for batch in batches:
- current_batch_size = len(batch)
- if self._limit is not None and total_row_count +
current_batch_size >= self._limit:
- yield batch.slice(0, self._limit - total_row_count)
+ for batches in executor.map(batches_for_task, task_list):
+ yield from batches
- limit_reached = True
- break
- else:
- yield batch
- total_row_count += current_batch_size
+ def _apply_limit(self, batches: Iterator[pa.RecordBatch]) ->
Iterator[pa.RecordBatch]:
Review Comment:
Based on my understanding, in the sequential path,
`_record_batches_from_scan_tasks_and_deletes` enforces self._limit using a
running `total_row_count` across tasks. In the ArrivalOrder concurrent path, we
call it per-task, so each worker resets total_row_count to 0 and can
independently produce up to limit rows before the downstream `_apply_limit`
stops consumption. That means we can bring in more than limit rows (worst-case
proportional to concurrent_streams, bounded by buffering/backpressure).
This can be a later PR, but do we want a shared counter/event so producers
can stop once the global limit is satisfied, or treat the per-task limit as a
best-effort optimization only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]