cbb330 commented on code in PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#discussion_r2814574602


##########
mkdocs/docs/api.md:
##########
@@ -355,6 +355,36 @@ for buf in tbl.scan().to_arrow_batch_reader():
     print(f"Buffer contains {len(buf)} rows")
 ```
 
+You can control the number of rows per batch using the `batch_size` parameter:
+
+```python

Review Comment:
   The API feels mechanism-oriented (streaming, concurrent_files, batch_size) 
and it’s hard to know what to choose. Could we add a short “which config should 
I use?” table with recommended starting points (ordered
     default, memory-safe streaming, max-throughput streaming+concurrent_files) 
and clarify that batch_size is usually an advanced tuning knob?



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1677,6 +1684,87 @@ def _read_all_delete_files(io: FileIO, tasks: 
Iterable[FileScanTask]) -> dict[st
     return deletes_per_file
 
 
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+    tasks: list[FileScanTask],
+    batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+    concurrent_files: int,
+    max_buffered_batches: int = 16,
+) -> Generator[pa.RecordBatch, None, None]:
+    """Read batches from multiple files concurrently with bounded memory.
+
+    Workers read from files in parallel (up to concurrent_files at a time) and 
push
+    batches into a shared queue. The consumer yields batches from the queue.
+    A sentinel value signals completion, avoiding timeout-based polling.
+
+    Args:
+        tasks: The file scan tasks to process.
+        batch_fn: A callable that takes a FileScanTask and returns an iterator 
of RecordBatches.
+        concurrent_files: Maximum number of files to read concurrently.
+        max_buffered_batches: Maximum number of batches to buffer in the queue.
+    """
+    if not tasks:
+        return
+
+    batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = 
queue.Queue(maxsize=max_buffered_batches)
+    cancel_event = threading.Event()
+    pending_count = len(tasks)
+    pending_lock = threading.Lock()
+    file_semaphore = threading.Semaphore(concurrent_files)
+
+    def worker(task: FileScanTask) -> None:
+        nonlocal pending_count
+        try:
+            # Blocking acquire — on cancellation, extra permits are released 
to unblock.
+            file_semaphore.acquire()
+            if cancel_event.is_set():
+                return
+
+            for batch in batch_fn(task):
+                if cancel_event.is_set():
+                    return
+                batch_queue.put(batch)
+        except BaseException as e:
+            if not cancel_event.is_set():
+                batch_queue.put(e)
+        finally:
+            file_semaphore.release()
+            with pending_lock:
+                pending_count -= 1
+                if pending_count == 0:
+                    batch_queue.put(_QUEUE_SENTINEL)
+
+    executor = ExecutorFactory.get_or_create()
+    futures = [executor.submit(worker, task) for task in tasks]

Review Comment:
   Current shape submits one future per file and then gates with a semaphore.
   
   Could we simplify to keep only `concurrent_files` active iterators at a time 
(e.g. `ThreadPoolExecutor(max_workers=concurrent_files)` +
     `wait(..., FIRST_COMPLETED)` on `next(batch_iter))`?
   
   That would naturally bound in-flight work and remove the extra 
semaphore/lock/cancellation bookkeeping.



##########
mkdocs/docs/api.md:
##########
@@ -355,6 +355,36 @@ for buf in tbl.scan().to_arrow_batch_reader():
     print(f"Buffer contains {len(buf)} rows")
 ```
 
+You can control the number of rows per batch using the `batch_size` parameter:
+
+```python
+for buf in tbl.scan().to_arrow_batch_reader(batch_size=1000):
+    print(f"Buffer contains {len(buf)} rows")
+```
+
+By default, each file's batches are materialized in memory before being 
yielded. For large files that may exceed available memory, use `streaming=True` 
to yield batches as they are produced without materializing entire files:
+
+```python
+for buf in tbl.scan().to_arrow_batch_reader(streaming=True, batch_size=1000):
+    print(f"Buffer contains {len(buf)} rows")
+```
+
+For maximum throughput, use `concurrent_files` to read multiple files in 
parallel while streaming. Batches are yielded as they arrive from any file — 
ordering across files is not guaranteed:

Review Comment:
   For DDP, deterministic input order (or deterministic per-rank sharding 
independent of arrival order) is usually required.
   
   So this API tradeoff matters directly for training correctness and 
reproducibility, not just ergonomics.
   
   I think it would be best to default to a deterministic ordering and allow 
true arrival first mechanism with an override. for example by doing an ordered 
merge.  while there is still a fundamental cost: if an early file is slow, 
later files wait (head-of-line blocking), so peak throughput drops.
   
   Perhaps Good API shape is to make ordering explicit, e.g. order="task" 
(deterministic) vs order="arrival" (fastest)



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1677,6 +1684,87 @@ def _read_all_delete_files(io: FileIO, tasks: 
Iterable[FileScanTask]) -> dict[st
     return deletes_per_file
 
 
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+    tasks: list[FileScanTask],
+    batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+    concurrent_files: int,
+    max_buffered_batches: int = 16,
+) -> Generator[pa.RecordBatch, None, None]:
+    """Read batches from multiple files concurrently with bounded memory.
+
+    Workers read from files in parallel (up to concurrent_files at a time) and 
push
+    batches into a shared queue. The consumer yields batches from the queue.
+    A sentinel value signals completion, avoiding timeout-based polling.
+
+    Args:
+        tasks: The file scan tasks to process.
+        batch_fn: A callable that takes a FileScanTask and returns an iterator 
of RecordBatches.
+        concurrent_files: Maximum number of files to read concurrently.
+        max_buffered_batches: Maximum number of batches to buffer in the queue.
+    """
+    if not tasks:
+        return
+
+    batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = 
queue.Queue(maxsize=max_buffered_batches)
+    cancel_event = threading.Event()
+    pending_count = len(tasks)
+    pending_lock = threading.Lock()
+    file_semaphore = threading.Semaphore(concurrent_files)
+
+    def worker(task: FileScanTask) -> None:
+        nonlocal pending_count
+        try:
+            # Blocking acquire — on cancellation, extra permits are released 
to unblock.
+            file_semaphore.acquire()
+            if cancel_event.is_set():
+                return
+
+            for batch in batch_fn(task):
+                if cancel_event.is_set():
+                    return
+                batch_queue.put(batch)
+        except BaseException as e:
+            if not cancel_event.is_set():
+                batch_queue.put(e)
+        finally:
+            file_semaphore.release()
+            with pending_lock:
+                pending_count -= 1
+                if pending_count == 0:
+                    batch_queue.put(_QUEUE_SENTINEL)
+
+    executor = ExecutorFactory.get_or_create()
+    futures = [executor.submit(worker, task) for task in tasks]
+
+    try:
+        while True:
+            item = batch_queue.get()
+
+            if item is _QUEUE_SENTINEL:
+                break
+
+            if isinstance(item, BaseException):
+                raise item
+
+            yield item
+    finally:
+        cancel_event.set()
+        # Release semaphore permits to unblock any workers waiting on acquire()
+        for _ in range(len(tasks)):
+            file_semaphore.release()
+        # Drain the queue to unblock any workers stuck on put()
+        while not batch_queue.empty():
+            try:
+                batch_queue.get_nowait()
+            except queue.Empty:
+                break
+        for future in futures:
+            future.cancel()

Review Comment:
   future.cancel() only stops tasks that have not started yet. If the reader 
stops early, already-running workers may keep running in the background. Can we 
shut down worker threads deterministically (for example with a per-scan 
executor) so no background work is left after the iterator is closed?



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1773,17 +1879,40 @@ def to_record_batches(self, tasks: 
Iterable[FileScanTask]) -> Iterator[pa.Record
         Raises:
             ResolveError: When a required field cannot be found in the file
             ValueError: When a field type in the file cannot be projected to 
the schema type
+                or when concurrent_files < 1
         """
+        if concurrent_files < 1:

Review Comment:
   this important function now does a few different things.
   
   I’d split to_record_batches into 4 helpers and keep it as a dispatcher:
   
     1. `_prepare_tasks_and_deletes(tasks)` -> `tuple[list[FileScanTask], 
dict[...] ]`
     2. `_iter_batches_streaming(task_list, deletes_per_file, batch_size, 
concurrent_files)`
     3. `_iter_batches_materialized(task_list, deletes_per_file, batch_size)`
     4. `_apply_global_limit(batches)`
   
     Then to_record_batches becomes:
   
   ```python
     task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
     batches = (
         self._iter_batches_streaming(...) if streaming
         else self._iter_batches_materialized(...)
     )
     yield from self._apply_global_limit(batches)
   ```
   
   so ultimately
   - keeps one place for limit semantics.
   - Makes streaming/non-streaming behavior independently testable.
   - Reduces cognitive load in the public method.
   - Makes future ordering modes easier to add



##########
tests/benchmark/test_read_benchmark.py:
##########
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   must this run in CI? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to