Fokko commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1294664395
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -902,39 +903,47 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
+ row_counts: List[int] = []
+ deletes_per_file = _read_all_delete_files(fs, tasks)
+ executor = ExecutorFactory.create()
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_counts,
+ limit,
+ )
+ for task in tasks
+ ]
- if len(tables) > 1:
- final_table = pa.concat_tables(tables)
- elif len(tables) == 1:
- final_table = tables[0]
- else:
- final_table = pa.Table.from_batches([],
schema=schema_to_pyarrow(projected_schema))
+ # for consistent ordering, we need to maintain future order
+ futures_index = {f: i for i, f in enumerate(futures)}
+ completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[],
key=lambda f: futures_index[f])
+ for future in concurrent.futures.as_completed(futures):
+ completed_futures.add(future)
+
+ # stop early if limit is satisfied
+ if limit is not None and sum(row_counts) >= limit:
+ break
+
+ # by now, we've either completed all tasks or satisfied the limit
+ if limit is not None:
+ _ = [f.cancel() for f in futures if not f.done()]
+
+ tables = [f.result() for f in completed_futures if f.result()]
+
+ if len(tables) < 1:
+ return pa.Table.from_batches([],
schema=schema_to_pyarrow(projected_schema))
+
+ result = pa.concat_tables(tables)
- return final_table.slice(0, limit)
+ return result.slice(0, limit)
Review Comment:
I would appreciate an `if` in there to only slice when a limit is set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]