mr-brobot commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1269890629


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,35 +868,54 @@ def project_table(
         id for id in projected_schema.field_ids if not 
isinstance(projected_schema.find_type(id), (MapType, ListType))
     }.union(extract_field_ids(bound_row_filter))
 
-    with ManagedThreadPoolExecutor() as executor:
-        rows_counter = executor.synchronized(0)
-        deletes_per_file = _read_all_delete_files(fs, executor, tasks)
-        tables = [
-            table
-            for table in executor.map(
-                lambda args: _task_to_table(*args),
-                [
-                    (
-                        fs,
-                        task,
-                        bound_row_filter,
-                        projected_schema,
-                        projected_field_ids,
-                        deletes_per_file.get(task.file.file_path),
-                        case_sensitive,
-                        rows_counter,
-                        limit,
-                    )
-                    for task in tasks
-                ],
-            )
-            if table is not None
-        ]
-
-    if len(tables) > 1:
-        final_table = pa.concat_tables(tables)
-    elif len(tables) == 1:
-        final_table = tables[0]
+    executor = ThreadPoolExecutor()
+    row_count_log: List[int] = []
+    deletes_per_file = _read_all_delete_files(fs, executor, tasks)
+    futures = [
+        executor.submit(
+            _task_to_table,
+            fs,
+            task,
+            bound_row_filter,
+            projected_schema,
+            projected_field_ids,
+            deletes_per_file.get(task.file.file_path),
+            case_sensitive,
+            row_count_log,
+            limit,
+        )
+        for task in tasks
+    ]
+
+    # for consistent ordering, we need to maintain result order
+    futures_index = {f: i for i, f in enumerate(futures)}
+    tables: List[Tuple[int, pa.Table]] = []
+    row_count = 0
+    for future in concurrent.futures.as_completed(futures):
+        if result := future.result():
+            ix = futures_index[future]
+            tables.append((ix, result))
+            row_count += len(result)
+
+        # cancel remaining futures if limit satisfied
+        if limit and row_count >= limit:
+            pending_futures = (f for f in futures if not f.done())
+            for pf in pending_futures:
+                pf.cancel()
+            break
+
+    # by now, we've either gathered enough rows or completed all tasks
+    # when min python version is 3.9, we can cancel pending futures using
+    # `Executor.shutdown` via `cancel_futures=True`
+    executor.shutdown(wait=False)

Review Comment:
   I just ended up with a global executor that lives for the life of the Python 
process. Open to feedback if we need to do anything more interesting at this 
point. 😃 



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,35 +868,54 @@ def project_table(
         id for id in projected_schema.field_ids if not 
isinstance(projected_schema.find_type(id), (MapType, ListType))
     }.union(extract_field_ids(bound_row_filter))
 
-    with ManagedThreadPoolExecutor() as executor:
-        rows_counter = executor.synchronized(0)
-        deletes_per_file = _read_all_delete_files(fs, executor, tasks)
-        tables = [
-            table
-            for table in executor.map(
-                lambda args: _task_to_table(*args),
-                [
-                    (
-                        fs,
-                        task,
-                        bound_row_filter,
-                        projected_schema,
-                        projected_field_ids,
-                        deletes_per_file.get(task.file.file_path),
-                        case_sensitive,
-                        rows_counter,
-                        limit,
-                    )
-                    for task in tasks
-                ],
-            )
-            if table is not None
-        ]
-
-    if len(tables) > 1:
-        final_table = pa.concat_tables(tables)
-    elif len(tables) == 1:
-        final_table = tables[0]
+    executor = ThreadPoolExecutor()
+    row_count_log: List[int] = []
+    deletes_per_file = _read_all_delete_files(fs, executor, tasks)
+    futures = [
+        executor.submit(
+            _task_to_table,
+            fs,
+            task,
+            bound_row_filter,
+            projected_schema,
+            projected_field_ids,
+            deletes_per_file.get(task.file.file_path),
+            case_sensitive,
+            row_count_log,
+            limit,
+        )
+        for task in tasks
+    ]
+
+    # for consistent ordering, we need to maintain result order
+    futures_index = {f: i for i, f in enumerate(futures)}
+    tables: List[Tuple[int, pa.Table]] = []
+    row_count = 0
+    for future in concurrent.futures.as_completed(futures):
+        if result := future.result():
+            ix = futures_index[future]
+            tables.append((ix, result))
+            row_count += len(result)
+
+        # cancel remaining futures if limit satisfied
+        if limit and row_count >= limit:
+            pending_futures = (f for f in futures if not f.done())
+            for pf in pending_futures:
+                pf.cancel()
+            break
+
+    # by now, we've either gathered enough rows or completed all tasks
+    # when min python version is 3.9, we can cancel pending futures using
+    # `Executor.shutdown` via `cancel_futures=True`
+    executor.shutdown(wait=False)

Review Comment:
   I just ended up with a global executor that lives for the life of the Python 
process. Open to feedback if we need to do anything more interesting at this 
point. 😃 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to