mr-brobot commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1269495262
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,35 +868,54 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
-
- if len(tables) > 1:
- final_table = pa.concat_tables(tables)
- elif len(tables) == 1:
- final_table = tables[0]
+ executor = ThreadPoolExecutor()
+ row_count_log: List[int] = []
+ deletes_per_file = _read_all_delete_files(fs, executor, tasks)
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_count_log,
+ limit,
+ )
+ for task in tasks
+ ]
+
+ # for consistent ordering, we need to maintain result order
+ futures_index = {f: i for i, f in enumerate(futures)}
+ tables: List[Tuple[int, pa.Table]] = []
+ row_count = 0
+ for future in concurrent.futures.as_completed(futures):
+ if result := future.result():
+ ix = futures_index[future]
+ tables.append((ix, result))
+ row_count += len(result)
+
+ # cancel remaining futures if limit satisfied
+ if limit and row_count >= limit:
+ pending_futures = (f for f in futures if not f.done())
+ for pf in pending_futures:
+ pf.cancel()
+ break
+
+ # by now, we've either gathered enough rows or completed all tasks
+ # when min python version is 3.9, we can cancel pending futures using
+ # `Executor.shutdown` via `cancel_futures=True`
+ executor.shutdown(wait=False)
Review Comment:
Agreed! This PR will now close #8034. I took an initial swing at this by
adding a default executor to each `Table` instance.
I need to think through a few things further:
1. It would be cool if we could re-use the executor across tables instances
2. We should try to clean up resources at some point (i.e., calling
`Executor.__exit__`) but knowing when it's safe to do so is a little
challenging.
Approach 1: Inspired from [`loky`](https://github.com/joblib/loky), we could
have a free function `get_reusable_executor` and shut down after an idle
timeout. Requesting an executor after shutdown would simply produce a new one.
Approach 2: Required the user to supply the executor. This adds more
ceremony compared to other Python data libraries (e.g., DuckDB and DataFusion,
which handle all concurrency in C++/Rust).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]