Fokko commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1269096033


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,35 +868,54 @@ def project_table(
         id for id in projected_schema.field_ids if not 
isinstance(projected_schema.find_type(id), (MapType, ListType))
     }.union(extract_field_ids(bound_row_filter))
 
-    with ManagedThreadPoolExecutor() as executor:
-        rows_counter = executor.synchronized(0)
-        deletes_per_file = _read_all_delete_files(fs, executor, tasks)
-        tables = [
-            table
-            for table in executor.map(
-                lambda args: _task_to_table(*args),
-                [
-                    (
-                        fs,
-                        task,
-                        bound_row_filter,
-                        projected_schema,
-                        projected_field_ids,
-                        deletes_per_file.get(task.file.file_path),
-                        case_sensitive,
-                        rows_counter,
-                        limit,
-                    )
-                    for task in tasks
-                ],
-            )
-            if table is not None
-        ]
-
-    if len(tables) > 1:
-        final_table = pa.concat_tables(tables)
-    elif len(tables) == 1:
-        final_table = tables[0]
+    executor = ThreadPoolExecutor()
+    row_count_log: List[int] = []
+    deletes_per_file = _read_all_delete_files(fs, executor, tasks)
+    futures = [
+        executor.submit(
+            _task_to_table,
+            fs,
+            task,
+            bound_row_filter,
+            projected_schema,
+            projected_field_ids,
+            deletes_per_file.get(task.file.file_path),
+            case_sensitive,
+            row_count_log,
+            limit,
+        )
+        for task in tasks
+    ]
+
+    # for consistent ordering, we need to maintain result order

Review Comment:
   I'm not sure if consistent results can be guaranteed at all, also, the order 
on how futures are returned might impact the result.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,35 +868,54 @@ def project_table(
         id for id in projected_schema.field_ids if not 
isinstance(projected_schema.find_type(id), (MapType, ListType))
     }.union(extract_field_ids(bound_row_filter))
 
-    with ManagedThreadPoolExecutor() as executor:
-        rows_counter = executor.synchronized(0)
-        deletes_per_file = _read_all_delete_files(fs, executor, tasks)
-        tables = [
-            table
-            for table in executor.map(
-                lambda args: _task_to_table(*args),
-                [
-                    (
-                        fs,
-                        task,
-                        bound_row_filter,
-                        projected_schema,
-                        projected_field_ids,
-                        deletes_per_file.get(task.file.file_path),
-                        case_sensitive,
-                        rows_counter,
-                        limit,
-                    )
-                    for task in tasks
-                ],
-            )
-            if table is not None
-        ]
-
-    if len(tables) > 1:
-        final_table = pa.concat_tables(tables)
-    elif len(tables) == 1:
-        final_table = tables[0]
+    executor = ThreadPoolExecutor()
+    row_count_log: List[int] = []
+    deletes_per_file = _read_all_delete_files(fs, executor, tasks)
+    futures = [
+        executor.submit(
+            _task_to_table,
+            fs,
+            task,
+            bound_row_filter,
+            projected_schema,
+            projected_field_ids,
+            deletes_per_file.get(task.file.file_path),
+            case_sensitive,
+            row_count_log,
+            limit,
+        )
+        for task in tasks
+    ]
+
+    # for consistent ordering, we need to maintain result order
+    futures_index = {f: i for i, f in enumerate(futures)}
+    tables: List[Tuple[int, pa.Table]] = []
+    row_count = 0
+    for future in concurrent.futures.as_completed(futures):
+        if result := future.result():
+            ix = futures_index[future]
+            tables.append((ix, result))
+            row_count += len(result)
+
+        # cancel remaining futures if limit satisfied
+        if limit and row_count >= limit:
+            pending_futures = (f for f in futures if not f.done())
+            for pf in pending_futures:
+                pf.cancel()
+            break
+
+    # by now, we've either gathered enough rows or completed all tasks
+    # when min python version is 3.9, we can cancel pending futures using
+    # `Executor.shutdown` via `cancel_futures=True`
+    executor.shutdown(wait=False)

Review Comment:
   It would be cool if we can re-use the executor. 



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,35 +868,54 @@ def project_table(
         id for id in projected_schema.field_ids if not 
isinstance(projected_schema.find_type(id), (MapType, ListType))
     }.union(extract_field_ids(bound_row_filter))
 
-    with ManagedThreadPoolExecutor() as executor:
-        rows_counter = executor.synchronized(0)
-        deletes_per_file = _read_all_delete_files(fs, executor, tasks)
-        tables = [
-            table
-            for table in executor.map(
-                lambda args: _task_to_table(*args),
-                [
-                    (
-                        fs,
-                        task,
-                        bound_row_filter,
-                        projected_schema,
-                        projected_field_ids,
-                        deletes_per_file.get(task.file.file_path),
-                        case_sensitive,
-                        rows_counter,
-                        limit,
-                    )
-                    for task in tasks
-                ],
-            )
-            if table is not None
-        ]
-
-    if len(tables) > 1:
-        final_table = pa.concat_tables(tables)
-    elif len(tables) == 1:
-        final_table = tables[0]
+    executor = ThreadPoolExecutor()
+    row_count_log: List[int] = []
+    deletes_per_file = _read_all_delete_files(fs, executor, tasks)
+    futures = [
+        executor.submit(
+            _task_to_table,
+            fs,
+            task,
+            bound_row_filter,
+            projected_schema,
+            projected_field_ids,
+            deletes_per_file.get(task.file.file_path),
+            case_sensitive,
+            row_count_log,
+            limit,
+        )
+        for task in tasks
+    ]
+
+    # for consistent ordering, we need to maintain result order
+    futures_index = {f: i for i, f in enumerate(futures)}
+    tables: List[Tuple[int, pa.Table]] = []
+    row_count = 0
+    for future in concurrent.futures.as_completed(futures):
+        if result := future.result():
+            ix = futures_index[future]
+            tables.append((ix, result))
+            row_count += len(result)
+
+        # cancel remaining futures if limit satisfied
+        if limit and row_count >= limit:
+            pending_futures = (f for f in futures if not f.done())
+            for pf in pending_futures:
+                pf.cancel()
+            break
+

Review Comment:
   ```suggestion
           # cancel remaining futures if limit satisfied
           if limit and row_count >= limit:
               break
   
       # We're looping twice over the futures, probably we can also inline these
       # _ = (f.cancel() for f in futures if not f.done())
       pending_futures = (f.cancel() for f in futures if not f.done())
       for pf in pending_futures:
           pf.cancel()
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to