mr-brobot commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1276540722
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,30 +868,41 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
+ row_count = 0
+ deletes_per_file = _read_all_delete_files(fs, tasks)
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_count,
Review Comment:
Argh, you're right. There were two subtle issues:
1. `row_count` was an immutable type so workers were not seeing updates. I
switched back to the `row_counts` approach since containers are mutable and
confirmed all threads see updates to it.
2. `(f.cancel() for f in futures if not f.done())` creates an unused
generator so nothing was actually getting canceled. Using `[f.cancel() for f in
futures if not f.done()]` is eager, so cancels futures immediately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]