sumedhsakdeo commented on code in PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#discussion_r2831108371


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1710,6 +1685,76 @@ def _read_all_delete_files(io: FileIO, tasks: 
Iterable[FileScanTask]) -> dict[st
     return deletes_per_file
 
 
+_QUEUE_SENTINEL = object()
+
+
+def _bounded_concurrent_batches(
+    tasks: list[FileScanTask],
+    batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
+    concurrent_files: int,
+    max_buffered_batches: int = 16,
+) -> Generator[pa.RecordBatch, None, None]:
+    """Read batches from multiple files concurrently with bounded memory.
+
+    Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_files) to 
naturally
+    bound concurrency. Workers push batches into a bounded queue which provides
+    backpressure when the consumer is slower than the producers.
+
+    Args:
+        tasks: The file scan tasks to process.
+        batch_fn: A callable that takes a FileScanTask and returns an iterator 
of RecordBatches.
+        concurrent_files: Maximum number of files to read concurrently.
+        max_buffered_batches: Maximum number of batches to buffer in the queue.
+    """
+    if not tasks:
+        return
+
+    batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = 
queue.Queue(maxsize=max_buffered_batches)
+    cancel = threading.Event()
+    remaining = len(tasks)
+    remaining_lock = threading.Lock()
+
+    def worker(task: FileScanTask) -> None:
+        nonlocal remaining
+        try:
+            for batch in batch_fn(task):
+                if cancel.is_set():
+                    return
+                batch_queue.put(batch)
+        except BaseException as e:
+            if not cancel.is_set():
+                batch_queue.put(e)
+        finally:
+            with remaining_lock:
+                remaining -= 1
+                if remaining == 0:
+                    batch_queue.put(_QUEUE_SENTINEL)
+
+    with ThreadPoolExecutor(max_workers=concurrent_files) as executor:
+        for task in tasks:
+            executor.submit(worker, task)
+
+        try:
+            while True:
+                item = batch_queue.get()
+
+                if item is _QUEUE_SENTINEL:
+                    break
+
+                if isinstance(item, BaseException):
+                    raise item
+
+                yield item
+        finally:
+            cancel.set()
+            # Drain the queue to unblock any workers stuck on put()
+            while not batch_queue.empty():

Review Comment:
   We have a 15% regression if timeout is added. Will need to find another way.
   ```  
┌──────────────────────────────┬─────────────────────┬───────┬────────┬─────────────┐
     │            Config            │ Throughput (rows/s) │ Time  │  TTFR  │ 
Peak Memory │
     
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
     │ default (TASK, all-parallel) │ 199M                │ 0.08s │ 60.9ms │ 
609.5 MB    │
     
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
     │ arrival-cf1                  │ 56.6M               │ 0.28s │ 29.9ms │ 
10.3 MB     │
     
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
     │ arrival-cf2                  │ 93.3M               │ 0.17s │ 32.6ms │ 
42.6 MB     │
     
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
     │ arrival-cf4                  │ 159.7M              │ 0.10s │ 32.9ms │ 
114.7 MB    │
     
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
     │ arrival-cf8                  │ 186.4M              │ 0.09s │ 37.8ms │ 
276.2 MB    │
     
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
     │ arrival-cf16                 │ 188.9M              │ 0.09s │ 53.4ms │ 
453.2 MB    │
     
└──────────────────────────────┴─────────────────────┴───────┴────────┴─────────────┘
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to