mr-brobot commented on code in PR #8015: URL: https://github.com/apache/iceberg/pull/8015#discussion_r1258373103
########## python/pyiceberg/utils/concurrent.py: ########## @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +"""Concurrency concepts that adapt to the shared memory support in the current runtime. + +Performance-optimized concurrency in Python prefers `multiprocessing` to avoid the global +interpreter lock. However, this requires shared memory provided via mount at `/dev/shm`. This +is not provided in serverless runtimes. In environments where multiprocessing is not supported, +we fall back to multithreading. +""" +import logging +import multiprocessing +import multiprocessing.managers +import multiprocessing.synchronize +import threading +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from contextlib import AbstractContextManager +from multiprocessing import get_start_method +from typing import ( + Any, + Generic, + Optional, + Type, + TypeVar, +) + +from typing_extensions import Self + +from pyiceberg.utils.config import Config + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class Synchronized(Generic[T], AbstractContextManager): # type: ignore + """A context manager that provides concurrency-safe access to a value.""" + + value: T + lock: threading.Lock + + def __init__(self, value: T, lock: threading.Lock): + super().__init__() + self.value = value + self.lock = lock + + def __enter__(self) -> T: + """Acquires a lock, allowing access to the wrapped value.""" + self.lock.acquire() + return self.value + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Releases the lock, allowing other threads to access the value.""" + self.lock.release() + + +class ManagedExecutor(Executor): + """An executor that provides synchronization.""" + + def synchronized(self, value: T) -> Synchronized[T]: + raise NotImplementedError + + +class ManagedThreadPoolExecutor(ThreadPoolExecutor, ManagedExecutor): + """A thread pool executor that provides synchronization.""" + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + super().__enter__() + return self + + def synchronized(self, value: T) -> Synchronized[T]: + lock = threading.Lock() + return Synchronized(value, lock) + + +class ManagedProcessPoolExecutor(ProcessPoolExecutor, ManagedExecutor): + """A process pool executor provides synchronization.""" + + manager: multiprocessing.managers.SyncManager + + def __init__(self) -> None: + super().__init__() + self.manager = multiprocessing.Manager() + + def __enter__(self) -> Self: + """Returns the executor itself as a context manager.""" + self.manager.__enter__() + super().__enter__() + return self + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """Exits the executor and the manager.""" + super().__exit__(exc_type, exc_value, traceback) + self.manager.__exit__(exc_type, exc_value, traceback) + + def synchronized(self, value: T) -> Synchronized[T]: + lock = self.manager.Lock() + return Synchronized(value, lock) + + +def _get_executor_class(mode: Optional[str], mp_avail: bool, mp_pref: bool) -> Type[Executor]: + """Returns the executor class for the given concurrency mode.""" + if mode == "process": + return ProcessPoolExecutor + if mode == "thread": + return ThreadPoolExecutor + if mode is None and mp_avail and mp_pref: + return ProcessPoolExecutor + if mode is None and (not mp_avail or not mp_pref): + logger.debug("Falling back to thread pool executor") + return ThreadPoolExecutor + + raise ValueError(f"Invalid concurrency mode: {mode}") + + +def _get_managed_executor_class(mode: Optional[str], mp_avail: bool, mp_pref: bool) -> Type[ManagedExecutor]: + """Returns the managed executor class for the given concurrency mode.""" + if mode == "process": + return ManagedProcessPoolExecutor + if mode == "thread": + return ManagedThreadPoolExecutor + if mode is None and mp_avail and mp_pref: + return ManagedProcessPoolExecutor + if mode is None and (not mp_avail or not mp_pref): + logger.debug("Falling back to managed thread pool executor") + return ManagedThreadPoolExecutor + + raise ValueError(f"Invalid concurrency mode: {mode}") + + +def _concurrency_mode() -> Optional[str]: + mode = Config().config.get("concurrency-mode") + + if mode not in ("thread", "process", None): + raise ValueError(f"Invalid concurrency mode: {mode}") + + return mode # type: ignore + + +def _mp_avail() -> bool: + """Returns whether multiprocessing is available.""" + try: + with ProcessPoolExecutor() as executor: Review Comment: Yeah, it was the only reliable way I could think of to check for multi-processing support across platforms... However, now that multi-threading is the default and multi-processing is only enabled via explicit `PYICEBERG_CONCURRENCY_MODE=process`, it should probably be the user's responsibility to decide if multi-processing is appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
