This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6ee7c257b9fb [SPARK-48634][PYTHON][CONNECT] Avoid statically
initialize threadpool at ExecutePlanResponseReattachableIterator
6ee7c257b9fb is described below
commit 6ee7c257b9fb47400cb447dca7c6cd37364476f3
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Jun 19 09:13:00 2024 +0900
[SPARK-48634][PYTHON][CONNECT] Avoid statically initialize threadpool at
ExecutePlanResponseReattachableIterator
### What changes were proposed in this pull request?
This PR propose to avoid having
`ExecutePlanResponseReattachableIterator._release_thread_pool` to initialize
`ThreadPool`.
### Why are the changes needed?
This instance might be dragged in during pickle because it's statically
initialized.
```
_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count()
if os.cpu_count() else 8)
File "/usr/lib/python3.10/multiprocessing/pool.py", line 930, in __init__
Pool.__init__(self, processes, initializer, initargs)
File "/usr/lib/python3.10/multiprocessing/pool.py", line 196, in __init__
self._change_notifier = self._ctx.SimpleQueue()
File "/usr/lib/python3.10/multiprocessing/context.py", line 113, in
SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/usr/lib/python3.10/multiprocessing/queues.py", line 341, in
__init__
self._rlock = ctx.Lock()
File "/usr/lib/python3.10/multiprocessing/context.py", line 68, in Lock
return Lock(ctx=self.get_context())
File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 162, in
__init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 57, in
__init__
sl = self._semlock = _multiprocessing.SemLock(
PermissionError: [Errno 13] Permission denied
```
which requires to change in OS level.
### Does this PR introduce _any_ user-facing change?
Yeah, potentially this could trigger some random job failures in some
environment like Ubuntu
### How was this patch tested?
Manually tested.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46993 from HyukjinKwon/make-thread.
Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 2 +-
python/pyspark/sql/connect/client/reattach.py | 47 +++++++++++++++++----------
2 files changed, 31 insertions(+), 18 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index f3bbab69f271..efc76bb99f56 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -1108,7 +1108,7 @@ class SparkConnectClient(object):
"""
Close the channel.
"""
- ExecutePlanResponseReattachableIterator.shutdown()
+ ExecutePlanResponseReattachableIterator.shutdown_threadpool()
self._channel.close()
self._closed = True
diff --git a/python/pyspark/sql/connect/client/reattach.py
b/python/pyspark/sql/connect/client/reattach.py
index cc50e5892631..c20d2b6e2e83 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -58,28 +58,41 @@ class ExecutePlanResponseReattachableIterator(Generator):
# Lock to manage the pool
_lock: ClassVar[RLock] = RLock()
- _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if
os.cpu_count() else 8)
+ _release_thread_pool_instance: Optional[ThreadPool] = None
+
+ @classmethod # type: ignore[misc]
+ @property
+ def _release_thread_pool(cls) -> ThreadPool:
+ # Perform a first check outside the critical path.
+ if cls._release_thread_pool_instance is not None:
+ return cls._release_thread_pool_instance
+ with cls._lock:
+ if cls._release_thread_pool_instance is None:
+ cls._release_thread_pool_instance = ThreadPool(
+ os.cpu_count() if os.cpu_count() else 8
+ )
+ return cls._release_thread_pool_instance
@classmethod
- def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+ def shutdown_threadpool(cls:
Type["ExecutePlanResponseReattachableIterator"]) -> None:
"""
When the channel is closed, this method will be called before, to make
sure all
outstanding calls are closed.
"""
with cls._lock:
- if cls._release_thread_pool is not None:
- cls._release_thread_pool.close()
- cls._release_thread_pool.join()
- cls._release_thread_pool = None
+ if cls._release_thread_pool_instance is not None:
+ cls._release_thread_pool.close() # type: ignore[attr-defined]
+ cls._release_thread_pool.join() # type: ignore[attr-defined]
+ cls._release_thread_pool_instance = None
- @classmethod
- def _initialize_pool_if_necessary(cls:
Type["ExecutePlanResponseReattachableIterator"]) -> None:
+ def shutdown(self: "ExecutePlanResponseReattachableIterator") -> None:
"""
- If the processing pool for the release calls is None, initialize the
pool exactly once.
+ When the channel is closed, this method will be called before, to make
sure all
+ outstanding calls are closed, and mark this iterator is shutdown.
"""
- with cls._lock:
- if cls._release_thread_pool is None:
- cls._release_thread_pool = ThreadPool(os.cpu_count() if
os.cpu_count() else 8)
+ with self._lock:
+ self.shutdown_threadpool()
+ self._is_shutdown = True
def __init__(
self,
@@ -88,7 +101,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
retrying: Callable[[], Retrying],
metadata: Iterable[Tuple[str, str]],
):
- ExecutePlanResponseReattachableIterator._initialize_pool_if_necessary()
+ self._is_shutdown = False
self._request = request
self._retrying = retrying
if request.operation_id:
@@ -206,8 +219,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")
- if ExecutePlanResponseReattachableIterator._release_thread_pool is not
None:
-
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
+ if not self._is_shutdown:
+ self._release_thread_pool.apply_async(target)
def _release_all(self) -> None:
"""
@@ -230,8 +243,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")
- if ExecutePlanResponseReattachableIterator._release_thread_pool is not
None:
-
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
+ if not self._is_shutdown:
+ self._release_thread_pool.apply_async(target)
self._result_complete = True
def _call_iter(self, iter_fun: Callable) -> Any:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]