This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9eadb2cbcc90 [SPARK-48634][PYTHON][CONNECT][FOLLOW-UP] Do not make a 
request if threadpool is not initialized
9eadb2cbcc90 is described below

commit 9eadb2cbcc90f195809214dea062f93a4e9bcd43
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Jun 20 10:32:18 2024 +0900

    [SPARK-48634][PYTHON][CONNECT][FOLLOW-UP] Do not make a request if 
threadpool is not initialized
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to not make a request if threadpool is not initialized to 
keep the same behaviour before https://github.com/apache/spark/pull/46993.
    
    ### Why are the changes needed?
    
    To make Python exit slient.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Virtually no.
    
    ### How was this patch tested?
    
    Manually tested, with long running Python job and exiting it.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47034 from HyukjinKwon/SPARK-48634-followup.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/client/core.py     |  2 +-
 python/pyspark/sql/connect/client/reattach.py | 23 ++++++++---------------
 2 files changed, 9 insertions(+), 16 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index efc76bb99f56..f3bbab69f271 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -1108,7 +1108,7 @@ class SparkConnectClient(object):
         """
         Close the channel.
         """
-        ExecutePlanResponseReattachableIterator.shutdown_threadpool()
+        ExecutePlanResponseReattachableIterator.shutdown()
         self._channel.close()
         self._closed = True
 
diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index c20d2b6e2e83..82c7ae977218 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -74,7 +74,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
             return cls._release_thread_pool_instance
 
     @classmethod
-    def shutdown_threadpool(cls: 
Type["ExecutePlanResponseReattachableIterator"]) -> None:
+    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
         """
         When the channel is closed, this method will be called before, to make 
sure all
         outstanding calls are closed.
@@ -85,15 +85,6 @@ class ExecutePlanResponseReattachableIterator(Generator):
                 cls._release_thread_pool.join()  # type: ignore[attr-defined]
                 cls._release_thread_pool_instance = None
 
-    def shutdown(self: "ExecutePlanResponseReattachableIterator") -> None:
-        """
-        When the channel is closed, this method will be called before, to make 
sure all
-        outstanding calls are closed, and mark this iterator is shutdown.
-        """
-        with self._lock:
-            self.shutdown_threadpool()
-            self._is_shutdown = True
-
     def __init__(
         self,
         request: pb2.ExecutePlanRequest,
@@ -101,7 +92,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
         retrying: Callable[[], Retrying],
         metadata: Iterable[Tuple[str, str]],
     ):
-        self._is_shutdown = False
+        self._release_thread_pool  # Trigger initialization
         self._request = request
         self._retrying = retrying
         if request.operation_id:
@@ -219,8 +210,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
             except Exception as e:
                 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 
-        if not self._is_shutdown:
-            self._release_thread_pool.apply_async(target)
+        with self._lock:
+            if self._release_thread_pool_instance is not None:
+                self._release_thread_pool.apply_async(target)
 
     def _release_all(self) -> None:
         """
@@ -243,8 +235,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
             except Exception as e:
                 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 
-        if not self._is_shutdown:
-            self._release_thread_pool.apply_async(target)
+        with self._lock:
+            if self._release_thread_pool_instance is not None:
+                self._release_thread_pool.apply_async(target)
         self._result_complete = True
 
     def _call_iter(self, iter_fun: Callable) -> Any:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to