Fokko commented on code in PR #8015: URL: https://github.com/apache/iceberg/pull/8015#discussion_r1258249543
########## python/pyiceberg/utils/concurrent.py: ########## @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +"""Concurrency concepts that adapt to the shared memory support in the current runtime. + +Performance-optimized concurrency in Python prefers `multiprocessing` to avoid the global +interpreter lock. However, this requires shared memory provided via mount at `/dev/shm`. This +is not provided in serverless runtimes. In environments where multiprocessing is not supported, +we fall back to multithreading. Review Comment: Just to double-check if this is true. We're currently using a `ThreadPool`: https://github.com/apache/iceberg/blob/e389e4d139624a49729379acd330dd9c96187b04/python/pyiceberg/io/pyarrow.py#L875 And: https://github.com/apache/iceberg/blob/e389e4d139624a49729379acd330dd9c96187b04/python/pyiceberg/table/__init__.py#L776 ########## python/pyiceberg/utils/concurrent.py: ########## @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +"""Concurrency concepts that adapt to the shared memory support in the current runtime. + +Performance-optimized concurrency in Python prefers `multiprocessing` to avoid the global +interpreter lock. However, this requires shared memory provided via mount at `/dev/shm`. This +is not provided in serverless runtimes. In environments where multiprocessing is not supported, +we fall back to multithreading. +""" +import logging +import multiprocessing +import multiprocessing.managers +import multiprocessing.synchronize +import threading +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from contextlib import AbstractContextManager +from multiprocessing import get_start_method +from typing import ( + Any, + Generic, + Optional, + Type, + TypeVar, +) + +from typing_extensions import Self + +from pyiceberg.utils.config import Config + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class Synchronized(Generic[T], AbstractContextManager): # type: ignore + """A context manager that provides concurrency-safe access to a value.""" + + value: T + lock: threading.Lock + + def __init__(self, value: T, lock: threading.Lock): + super().__init__() + self.value = value + self.lock = lock + + def __enter__(self) -> T: + """Acquires a lock, allowing access to the wrapped value.""" + self.lock.acquire() + return self.value + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Releases the lock, allowing other threads to access the value.""" + self.lock.release() + + +class ManagedExecutor(Executor): + """An executor that provides synchronization.""" + + def synchronized(self, value: T) -> Synchronized[T]: + raise NotImplementedError + + +class ManagedThreadPoolExecutor(ThreadPoolExecutor, ManagedExecutor): + """A thread pool executor that provides synchronization.""" + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + super().__enter__() + return self + + def synchronized(self, value: T) -> Synchronized[T]: + lock = threading.Lock() + return Synchronized(value, lock) + + +class ManagedProcessPoolExecutor(ProcessPoolExecutor, ManagedExecutor): + """A process pool executor provides synchronization.""" + + manager: multiprocessing.managers.SyncManager + + def __init__(self) -> None: + super().__init__() + self.manager = multiprocessing.Manager() + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + self.manager.__enter__() + super().__enter__() + return self + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Exits the executor and the manager.""" + super().__exit__(exc_type, exc_value, traceback) + self.manager.__exit__(exc_type, exc_value, traceback) + + def synchronized(self, value: T) -> Synchronized[T]: + lock = self.manager.Lock() + return Synchronized(value, lock) + + +def _get_executor_class(mode: Optional[str], mp_avail: bool, mp_pref: bool) -> Type[Executor]: + """Returns the executor class for the given concurrency mode.""" + if mode == "process": + return ProcessPoolExecutor + if mode == "thread": + return ThreadPoolExecutor + if mode is None and mp_avail and mp_pref: + return ProcessPoolExecutor + if mode is None and (not mp_avail or not mp_pref): + logger.debug("Falling back to thread pool executor") + return ThreadPoolExecutor + + raise ValueError(f"Invalid concurrency mode: {mode}") + + +def _get_managed_executor_class(mode: Optional[str], mp_avail: bool, mp_pref: bool) -> Type[ManagedExecutor]: Review Comment: ```suggestion def _get_managed_executor_class(mode: Optional[Literal["thread", "process"]], mp_avail: bool, mp_pref: bool) -> Type[ManagedExecutor]: ``` ########## python/pyiceberg/utils/concurrent.py: ########## @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +"""Concurrency concepts that adapt to the shared memory support in the current runtime. + +Performance-optimized concurrency in Python prefers `multiprocessing` to avoid the global +interpreter lock. However, this requires shared memory provided via mount at `/dev/shm`. This +is not provided in serverless runtimes. In environments where multiprocessing is not supported, +we fall back to multithreading. +""" +import logging +import multiprocessing +import multiprocessing.managers +import multiprocessing.synchronize +import threading +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from contextlib import AbstractContextManager +from multiprocessing import get_start_method +from typing import ( + Any, + Generic, + Optional, + Type, + TypeVar, +) + +from typing_extensions import Self + +from pyiceberg.utils.config import Config + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class Synchronized(Generic[T], AbstractContextManager): # type: ignore + """A context manager that provides concurrency-safe access to a value.""" + + value: T + lock: threading.Lock + + def __init__(self, value: T, lock: threading.Lock): + super().__init__() + self.value = value + self.lock = lock + + def __enter__(self) -> T: + """Acquires a lock, allowing access to the wrapped value.""" + self.lock.acquire() + return self.value + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Releases the lock, allowing other threads to access the value.""" + self.lock.release() + + +class ManagedExecutor(Executor): + """An executor that provides synchronization.""" + + def synchronized(self, value: T) -> Synchronized[T]: + raise NotImplementedError + + +class ManagedThreadPoolExecutor(ThreadPoolExecutor, ManagedExecutor): + """A thread pool executor that provides synchronization.""" + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + super().__enter__() + return self + + def synchronized(self, value: T) -> Synchronized[T]: + lock = threading.Lock() + return Synchronized(value, lock) + + +class ManagedProcessPoolExecutor(ProcessPoolExecutor, ManagedExecutor): + """A process pool executor provides synchronization.""" + + manager: multiprocessing.managers.SyncManager + + def __init__(self) -> None: + super().__init__() + self.manager = multiprocessing.Manager() + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + self.manager.__enter__() + super().__enter__() + return self + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Exits the executor and the manager.""" + super().__exit__(exc_type, exc_value, traceback) + self.manager.__exit__(exc_type, exc_value, traceback) + + def synchronized(self, value: T) -> Synchronized[T]: + lock = self.manager.Lock() + return Synchronized(value, lock) + + +def _get_executor_class(mode: Optional[str], mp_avail: bool, mp_pref: bool) -> Type[Executor]: + """Returns the executor class for the given concurrency mode.""" + if mode == "process": + return ProcessPoolExecutor + if mode == "thread": + return ThreadPoolExecutor + if mode is None and mp_avail and mp_pref: + return ProcessPoolExecutor + if mode is None and (not mp_avail or not mp_pref): + logger.debug("Falling back to thread pool executor") + return ThreadPoolExecutor + + raise ValueError(f"Invalid concurrency mode: {mode}") + + +def _get_managed_executor_class(mode: Optional[str], mp_avail: bool, mp_pref: bool) -> Type[ManagedExecutor]: + """Returns the managed executor class for the given concurrency mode.""" + if mode == "process": + return ManagedProcessPoolExecutor + if mode == "thread": + return ManagedThreadPoolExecutor + if mode is None and mp_avail and mp_pref: + return ManagedProcessPoolExecutor + if mode is None and (not mp_avail or not mp_pref): + logger.debug("Falling back to managed thread pool executor") + return ManagedThreadPoolExecutor + + raise ValueError(f"Invalid concurrency mode: {mode}") + + +def _concurrency_mode() -> Optional[str]: + mode = Config().config.get("concurrency-mode") + + if mode not in ("thread", "process", None): + raise ValueError(f"Invalid concurrency mode: {mode}") + + return mode # type: ignore + + +def _mp_avail() -> bool: + """Returns whether multiprocessing is available.""" + try: + with ProcessPoolExecutor() as executor: Review Comment: I think this will already spin up new processes? ########## python/pyiceberg/utils/concurrent.py: ########## @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +"""Concurrency concepts that adapt to the shared memory support in the current runtime. + +Performance-optimized concurrency in Python prefers `multiprocessing` to avoid the global +interpreter lock. However, this requires shared memory provided via mount at `/dev/shm`. This +is not provided in serverless runtimes. In environments where multiprocessing is not supported, +we fall back to multithreading. +""" +import logging +import multiprocessing +import multiprocessing.managers +import multiprocessing.synchronize +import threading +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from contextlib import AbstractContextManager +from multiprocessing import get_start_method +from typing import ( + Any, + Generic, + Optional, + Type, + TypeVar, +) + +from typing_extensions import Self + +from pyiceberg.utils.config import Config + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class Synchronized(Generic[T], AbstractContextManager): # type: ignore + """A context manager that provides concurrency-safe access to a value.""" + + value: T + lock: threading.Lock + + def __init__(self, value: T, lock: threading.Lock): + super().__init__() + self.value = value + self.lock = lock + + def __enter__(self) -> T: + """Acquires a lock, allowing access to the wrapped value.""" + self.lock.acquire() + return self.value + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Releases the lock, allowing other threads to access the value.""" + self.lock.release() + + +class ManagedExecutor(Executor): + """An executor that provides synchronization.""" + + def synchronized(self, value: T) -> Synchronized[T]: + raise NotImplementedError + + +class ManagedThreadPoolExecutor(ThreadPoolExecutor, ManagedExecutor): + """A thread pool executor that provides synchronization.""" + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + super().__enter__() + return self + + def synchronized(self, value: T) -> Synchronized[T]: + lock = threading.Lock() + return Synchronized(value, lock) + + +class ManagedProcessPoolExecutor(ProcessPoolExecutor, ManagedExecutor): + """A process pool executor provides synchronization.""" + + manager: multiprocessing.managers.SyncManager + + def __init__(self) -> None: + super().__init__() + self.manager = multiprocessing.Manager() + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + self.manager.__enter__() + super().__enter__() + return self + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Exits the executor and the manager.""" + super().__exit__(exc_type, exc_value, traceback) + self.manager.__exit__(exc_type, exc_value, traceback) + + def synchronized(self, value: T) -> Synchronized[T]: + lock = self.manager.Lock() + return Synchronized(value, lock) + + +def _get_executor_class(mode: Optional[str], mp_avail: bool, mp_pref: bool) -> Type[Executor]: + """Returns the executor class for the given concurrency mode.""" + if mode == "process": + return ProcessPoolExecutor + if mode == "thread": + return ThreadPoolExecutor + if mode is None and mp_avail and mp_pref: + return ProcessPoolExecutor + if mode is None and (not mp_avail or not mp_pref): Review Comment: Currently we use the ThreadPool, should we also use that as a default? ########## python/pyiceberg/table/__init__.py: ########## @@ -773,11 +779,11 @@ def plan_files(self) -> Iterable[FileScanTask]: data_entries: List[ManifestEntry] = [] positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER) - with ThreadPool() as pool: + with DynamicExecutor() as executor: Review Comment: I think this is awesome. I have one more suggestion. What do you think of being able to set the executor through the config? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
