mr-brobot commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1276540722


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,30 +868,41 @@ def project_table(
         id for id in projected_schema.field_ids if not 
isinstance(projected_schema.find_type(id), (MapType, ListType))
     }.union(extract_field_ids(bound_row_filter))
 
-    with ManagedThreadPoolExecutor() as executor:
-        rows_counter = executor.synchronized(0)
-        deletes_per_file = _read_all_delete_files(fs, executor, tasks)
-        tables = [
-            table
-            for table in executor.map(
-                lambda args: _task_to_table(*args),
-                [
-                    (
-                        fs,
-                        task,
-                        bound_row_filter,
-                        projected_schema,
-                        projected_field_ids,
-                        deletes_per_file.get(task.file.file_path),
-                        case_sensitive,
-                        rows_counter,
-                        limit,
-                    )
-                    for task in tasks
-                ],
-            )
-            if table is not None
-        ]
+    row_count = 0
+    deletes_per_file = _read_all_delete_files(fs, tasks)
+    futures = [
+        executor.submit(
+            _task_to_table,
+            fs,
+            task,
+            bound_row_filter,
+            projected_schema,
+            projected_field_ids,
+            deletes_per_file.get(task.file.file_path),
+            case_sensitive,
+            row_count,

Review Comment:
   Argh, you're right. There were two subtle issues:
   
   1. `row_count` was an immutable type so workers were not seeing updates. I 
switched back to the `row_counts` approach since containers are mutable and 
confirmed all threads see updates to it.
   2. `(f.cancel() for f in futures if not f.done())` creates an unused 
generator so nothing was actually getting canceled. Using `[f.cancel() for f in 
futures if not f.done()]` is eager, so cancels futures immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to