Fokko commented on code in PR #8104:
URL: https://github.com/apache/iceberg/pull/8104#discussion_r1294277780
##########
python/pyiceberg/exceptions.py:
##########
@@ -110,3 +110,7 @@ class CommitFailedException(RESTError):
class CommitStateUnknownException(RESTError):
"""Commit failed due to unknown reason."""
+
+
+class InvalidConfigurationError(Exception):
Review Comment:
Let's remove this one and just throw a `ValueError`. We also throw a
ValueError when the catalog configuration is not set properly. We try to avoid
introducing a lot of custom exceptions.
##########
python/pyiceberg/utils/concurrent.py:
##########
@@ -14,54 +14,36 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# pylint: disable=redefined-outer-name,arguments-renamed,fixme
-"""Concurrency concepts that support multi-threading."""
-import threading
+"""Concurrency concepts that support efficient multi-threading."""
from concurrent.futures import Executor, ThreadPoolExecutor
-from contextlib import AbstractContextManager
-from typing import Any, Generic, TypeVar
+from typing import Optional
-from typing_extensions import Self
+from pyiceberg.exceptions import InvalidConfigurationError
+from pyiceberg.utils.config import Config
-T = TypeVar("T")
+class ExecutorFactory:
+ _instance: Optional[Executor] = None
-class Synchronized(Generic[T], AbstractContextManager): # type: ignore
- """A context manager that provides concurrency-safe access to a value."""
+ @staticmethod
+ def create() -> Executor:
Review Comment:
```suggestion
def get_or_create() -> Executor:
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -902,39 +903,47 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
+ row_counts: List[int] = []
+ deletes_per_file = _read_all_delete_files(fs, tasks)
+ executor = ExecutorFactory.create()
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_counts,
+ limit,
+ )
+ for task in tasks
+ ]
- if len(tables) > 1:
- final_table = pa.concat_tables(tables)
- elif len(tables) == 1:
- final_table = tables[0]
- else:
- final_table = pa.Table.from_batches([],
schema=schema_to_pyarrow(projected_schema))
+ # for consistent ordering, we need to maintain future order
+ futures_index = {f: i for i, f in enumerate(futures)}
+ completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[],
key=lambda f: futures_index[f])
+ for future in concurrent.futures.as_completed(futures):
+ completed_futures.add(future)
+
+ # stop early if limit is satisfied
+ if limit is not None and sum(row_counts) >= limit:
+ break
+
+ # by now, we've either completed all tasks or satisfied the limit
+ if limit is not None:
+ _ = [f.cancel() for f in futures if not f.done()]
+
+ tables = [f.result() for f in completed_futures if f.result()]
+
+ if len(tables) < 1:
+ return pa.Table.from_batches([],
schema=schema_to_pyarrow(projected_schema))
+
+ result = pa.concat_tables(tables)
- return final_table.slice(0, limit)
+ return result.slice(0, limit)
Review Comment:
Do we also slice when there is no limit? 🤔
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -902,39 +903,47 @@ def project_table(
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
- with ManagedThreadPoolExecutor() as executor:
- rows_counter = executor.synchronized(0)
- deletes_per_file = _read_all_delete_files(fs, executor, tasks)
- tables = [
- table
- for table in executor.map(
- lambda args: _task_to_table(*args),
- [
- (
- fs,
- task,
- bound_row_filter,
- projected_schema,
- projected_field_ids,
- deletes_per_file.get(task.file.file_path),
- case_sensitive,
- rows_counter,
- limit,
- )
- for task in tasks
- ],
- )
- if table is not None
- ]
+ row_counts: List[int] = []
+ deletes_per_file = _read_all_delete_files(fs, tasks)
+ executor = ExecutorFactory.create()
+ futures = [
+ executor.submit(
+ _task_to_table,
+ fs,
+ task,
+ bound_row_filter,
+ projected_schema,
+ projected_field_ids,
+ deletes_per_file.get(task.file.file_path),
+ case_sensitive,
+ row_counts,
+ limit,
+ )
+ for task in tasks
+ ]
- if len(tables) > 1:
- final_table = pa.concat_tables(tables)
- elif len(tables) == 1:
- final_table = tables[0]
- else:
- final_table = pa.Table.from_batches([],
schema=schema_to_pyarrow(projected_schema))
+ # for consistent ordering, we need to maintain future order
+ futures_index = {f: i for i, f in enumerate(futures)}
+ completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[],
key=lambda f: futures_index[f])
+ for future in concurrent.futures.as_completed(futures):
+ completed_futures.add(future)
+
+ # stop early if limit is satisfied
+ if limit is not None and sum(row_counts) >= limit:
+ break
+
+ # by now, we've either completed all tasks or satisfied the limit
+ if limit is not None:
+ _ = [f.cancel() for f in futures if not f.done()]
+
+ tables = [f.result() for f in completed_futures if f.result()]
+
+ if len(tables) < 1:
+ return pa.Table.from_batches([],
schema=schema_to_pyarrow(projected_schema))
+
+ result = pa.concat_tables(tables)
- return final_table.slice(0, limit)
+ return result.slice(0, limit)
Review Comment:
Do we also slice when there is no limit? 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]