This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d12cf4d Fix race condition on `Table.scan` with `limit` (#545)
0d12cf4d is described below

commit 0d12cf4db1667dca137d6b3179546b178550e747
Author: Kevin Liu <[email protected]>
AuthorDate: Mon Mar 25 03:02:21 2024 -0700

    Fix race condition on `Table.scan` with `limit` (#545)
---
 pyiceberg/io/pyarrow.py | 18 ++++--------------
 1 file changed, 4 insertions(+), 14 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 72de1488..4bfd1fd5 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -946,13 +946,9 @@ def _task_to_table(
     projected_field_ids: Set[int],
     positional_deletes: Optional[List[ChunkedArray]],
     case_sensitive: bool,
-    row_counts: List[int],
     limit: Optional[int] = None,
     name_mapping: Optional[NameMapping] = None,
 ) -> Optional[pa.Table]:
-    if limit and sum(row_counts) >= limit:
-        return None
-
     _, _, path = PyArrowFileIO.parse_location(task.file.file_path)
     arrow_format = ds.ParquetFileFormat(pre_buffer=True, 
buffer_size=(ONE_MEGABYTE * 8))
     with fs.open_input_file(path) as fin:
@@ -1015,11 +1011,6 @@ def _task_to_table(
         if len(arrow_table) < 1:
             return None
 
-        if limit is not None and sum(row_counts) >= limit:
-            return None
-
-        row_counts.append(len(arrow_table))
-
         return to_requested_schema(projected_schema, file_project_schema, 
arrow_table)
 
 
@@ -1085,7 +1076,6 @@ def project_table(
         id for id in projected_schema.field_ids if not 
isinstance(projected_schema.find_type(id), (MapType, ListType))
     }.union(extract_field_ids(bound_row_filter))
 
-    row_counts: List[int] = []
     deletes_per_file = _read_all_delete_files(fs, tasks)
     executor = ExecutorFactory.get_or_create()
     futures = [
@@ -1098,21 +1088,21 @@ def project_table(
             projected_field_ids,
             deletes_per_file.get(task.file.file_path),
             case_sensitive,
-            row_counts,
             limit,
             table.name_mapping(),
         )
         for task in tasks
     ]
-
+    total_row_count = 0
     # for consistent ordering, we need to maintain future order
     futures_index = {f: i for i, f in enumerate(futures)}
     completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], 
key=lambda f: futures_index[f])
     for future in concurrent.futures.as_completed(futures):
         completed_futures.add(future)
-
+        if table_result := future.result():
+            total_row_count += len(table_result)
         # stop early if limit is satisfied
-        if limit is not None and sum(row_counts) >= limit:
+        if limit is not None and total_row_count >= limit:
             break
 
     # by now, we've either completed all tasks or satisfied the limit

Reply via email to