Fokko commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1272648720
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -798,20 +800,16 @@ def _task_to_table(
else:
arrow_table = fragment_scanner.to_table()
- if limit:
- with rows_counter:
- if rows_counter.value >= limit:
- return None
- rows_counter.value += len(arrow_table)
+ if len(arrow_table) < 1:
+ return None
- # If there is no data, we don't have to go through the schema
- if len(arrow_table) > 0:
- return to_requested_schema(projected_schema, file_project_schema,
arrow_table)
- else:
+ if limit and row_count >= limit:
Review Comment:
If the limit would be 0, then it would just pass:
```python
Python 3.9.6 (default, May 7 2023, 23:32:44)
[Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> if 0:
... print('vo')
...
>>>
```
```suggestion
if limit is not None and row_count >= limit:
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,30 +868,41 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
+ row_count = 0
+ deletes_per_file = _read_all_delete_files(fs, tasks)
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_count,
+ limit,
+ )
+ for task in tasks
+ ]
+
+ # for consistent ordering, we need to maintain future order
+ futures_index = {f: i for i, f in enumerate(futures)}
+ completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[],
key=lambda f: futures_index[f])
+ for future in concurrent.futures.as_completed(futures):
+ if result := future.result():
+ completed_futures.add(future)
+ row_count += len(result)
+
+ # stop early if limit is satisfied
+ if limit and row_count >= limit:
+ break
+
+ # by now, we've either completed all tasks or satisfied the limit
+ if limit:
Review Comment:
```suggestion
if limit is not None:
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,30 +868,41 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
+ row_count = 0
+ deletes_per_file = _read_all_delete_files(fs, tasks)
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_count,
+ limit,
+ )
+ for task in tasks
+ ]
+
+ # for consistent ordering, we need to maintain future order
+ futures_index = {f: i for i, f in enumerate(futures)}
+ completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[],
key=lambda f: futures_index[f])
+ for future in concurrent.futures.as_completed(futures):
+ if result := future.result():
+ completed_futures.add(future)
+ row_count += len(result)
+
+ # stop early if limit is satisfied
+ if limit and row_count >= limit:
Review Comment:
```suggestion
if limit is not None and row_count >= limit:
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -870,30 +868,41 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
+ row_count = 0
+ deletes_per_file = _read_all_delete_files(fs, tasks)
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_count,
Review Comment:
I don't see any early stopping. When I print the `row_count` in the
`_task_to_table` function, it always return `0`, and I see the futures still
executing.
##########
python/pyiceberg/utils/concurrent.py:
##########
@@ -15,53 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
-"""Concurrency concepts that support multi-threading."""
-import threading
+"""Concurrency concepts that support efficient multi-threading."""
from concurrent.futures import Executor, ThreadPoolExecutor
-from contextlib import AbstractContextManager
-from typing import Any, Generic, TypeVar
-from typing_extensions import Self
-
-T = TypeVar("T")
-
-
-class Synchronized(Generic[T], AbstractContextManager): # type: ignore
- """A context manager that provides concurrency-safe access to a value."""
-
- value: T
- lock: threading.Lock
-
- def __init__(self, value: T, lock: threading.Lock):
- super().__init__()
- self.value = value
- self.lock = lock
-
- def __enter__(self) -> T:
- """Acquires a lock, allowing access to the wrapped value."""
- self.lock.acquire()
- return self.value
-
- def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
- """Releases the lock, allowing other threads to access the value."""
- self.lock.release()
-
-
-class ManagedExecutor(Executor):
- """An executor that provides synchronization."""
-
- def synchronized(self, value: T) -> Synchronized[T]:
- raise NotImplementedError
-
-
-class ManagedThreadPoolExecutor(ThreadPoolExecutor, ManagedExecutor):
- """A thread pool executor that provides synchronization."""
-
- def __enter__(self) -> Self:
- """Returns the executor itself as a context manager."""
- super().__enter__()
- return self
-
- def synchronized(self, value: T) -> Synchronized[T]:
- lock = threading.Lock()
- return Synchronized(value, lock)
+executor: Executor = ThreadPoolExecutor()
Review Comment:
Should we create this one lazily, and able to configure the `max_workers`
through the config?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]