This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9eadb2cbcc90 [SPARK-48634][PYTHON][CONNECT][FOLLOW-UP] Do not make a
request if threadpool is not initialized
9eadb2cbcc90 is described below
commit 9eadb2cbcc90f195809214dea062f93a4e9bcd43
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Jun 20 10:32:18 2024 +0900
[SPARK-48634][PYTHON][CONNECT][FOLLOW-UP] Do not make a request if
threadpool is not initialized
### What changes were proposed in this pull request?
This PR proposes to not make a request if threadpool is not initialized to
keep the same behaviour before https://github.com/apache/spark/pull/46993.
### Why are the changes needed?
To make Python exit slient.
### Does this PR introduce _any_ user-facing change?
Virtually no.
### How was this patch tested?
Manually tested, with long running Python job and exiting it.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47034 from HyukjinKwon/SPARK-48634-followup.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 2 +-
python/pyspark/sql/connect/client/reattach.py | 23 ++++++++---------------
2 files changed, 9 insertions(+), 16 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index efc76bb99f56..f3bbab69f271 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -1108,7 +1108,7 @@ class SparkConnectClient(object):
"""
Close the channel.
"""
- ExecutePlanResponseReattachableIterator.shutdown_threadpool()
+ ExecutePlanResponseReattachableIterator.shutdown()
self._channel.close()
self._closed = True
diff --git a/python/pyspark/sql/connect/client/reattach.py
b/python/pyspark/sql/connect/client/reattach.py
index c20d2b6e2e83..82c7ae977218 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -74,7 +74,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
return cls._release_thread_pool_instance
@classmethod
- def shutdown_threadpool(cls:
Type["ExecutePlanResponseReattachableIterator"]) -> None:
+ def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
"""
When the channel is closed, this method will be called before, to make
sure all
outstanding calls are closed.
@@ -85,15 +85,6 @@ class ExecutePlanResponseReattachableIterator(Generator):
cls._release_thread_pool.join() # type: ignore[attr-defined]
cls._release_thread_pool_instance = None
- def shutdown(self: "ExecutePlanResponseReattachableIterator") -> None:
- """
- When the channel is closed, this method will be called before, to make
sure all
- outstanding calls are closed, and mark this iterator is shutdown.
- """
- with self._lock:
- self.shutdown_threadpool()
- self._is_shutdown = True
-
def __init__(
self,
request: pb2.ExecutePlanRequest,
@@ -101,7 +92,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
retrying: Callable[[], Retrying],
metadata: Iterable[Tuple[str, str]],
):
- self._is_shutdown = False
+ self._release_thread_pool # Trigger initialization
self._request = request
self._retrying = retrying
if request.operation_id:
@@ -219,8 +210,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")
- if not self._is_shutdown:
- self._release_thread_pool.apply_async(target)
+ with self._lock:
+ if self._release_thread_pool_instance is not None:
+ self._release_thread_pool.apply_async(target)
def _release_all(self) -> None:
"""
@@ -243,8 +235,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")
- if not self._is_shutdown:
- self._release_thread_pool.apply_async(target)
+ with self._lock:
+ if self._release_thread_pool_instance is not None:
+ self._release_thread_pool.apply_async(target)
self._result_complete = True
def _call_iter(self, iter_fun: Callable) -> Any:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]