This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cd466136528 [SPARK-46308] Forbid recursive error handling by adding 
recursion guards
8cd466136528 is described below

commit 8cd466136528e479514a1d8dfe8ecaba9d0f8cce
Author: Alice Sayutina <[email protected]>
AuthorDate: Tue Dec 19 10:24:58 2023 +0900

    [SPARK-46308] Forbid recursive error handling by adding recursion guards
    
    ### What changes were proposed in this pull request?
    
    Revert https://github.com/apache/spark/pull/44144, and introduce a forbid 
recursion guard as previously proposed. This way the infinite error handling 
recursion is still prevented, but the client-side knob is still present.
    
    ### Why are the changes needed?
    
    Previously proposed as part of https://github.com/apache/spark/pull/44144, 
however was discussed in favour of something else. However it seems (proposal 
by grundprinzip) that the original proposal was more correct, since it seems 
driver stacktrace is decided on client not server (see 
https://github.com/apache/spark/pull/43667)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Hand testing
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44210 from cdkrot/forbid_recursive_error_handling_2.
    
    Authored-by: Alice Sayutina <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/client/core.py | 49 ++++++++++++++++++++++++-------
 1 file changed, 39 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 85b2b98fef13..10235fd7d6c4 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -587,7 +587,12 @@ class SparkConnectClient(object):
         use_reattachable_execute: bool
             Enable reattachable execution.
         """
-        self.thread_local = threading.local()
+
+        class ClientThreadLocals(threading.local):
+            tags: set = set()
+            inside_error_handling: bool = False
+
+        self.thread_local = ClientThreadLocals()
 
         # Parse the connection string.
         self._builder = (
@@ -1497,14 +1502,24 @@ class SparkConnectClient(object):
         -------
         Throws the appropriate internal Python exception.
         """
-        if isinstance(error, grpc.RpcError):
-            self._handle_rpc_error(error)
-        elif isinstance(error, ValueError):
-            if "Cannot invoke RPC" in str(error) and "closed" in str(error):
-                raise SparkConnectException(
-                    error_class="NO_ACTIVE_SESSION", message_parameters=dict()
-                ) from None
-        raise error
+
+        if self.thread_local.inside_error_handling:
+            # We are already inside error handling routine,
+            # avoid recursive error processing (with potentially infinite 
recursion)
+            raise error
+
+        try:
+            self.thread_local.inside_error_handling = True
+            if isinstance(error, grpc.RpcError):
+                self._handle_rpc_error(error)
+            elif isinstance(error, ValueError):
+                if "Cannot invoke RPC" in str(error) and "closed" in 
str(error):
+                    raise SparkConnectException(
+                        error_class="NO_ACTIVE_SESSION", 
message_parameters=dict()
+                    ) from None
+            raise error
+        finally:
+            self.thread_local.inside_error_handling = False
 
     def _fetch_enriched_error(self, info: "ErrorInfo") -> 
Optional[pb2.FetchErrorDetailsResponse]:
         if "errorId" not in info.metadata:
@@ -1523,6 +1538,20 @@ class SparkConnectClient(object):
         except grpc.RpcError:
             return None
 
+    def _display_server_stack_trace(self) -> bool:
+        from pyspark.sql.connect.conf import RuntimeConf
+
+        conf = RuntimeConf(self)
+        try:
+            if conf.get("spark.sql.connect.serverStacktrace.enabled") == 
"true":
+                return True
+            return conf.get("spark.sql.pyspark.jvmStacktrace.enabled") == 
"true"
+        except Exception as e:  # noqa: F841
+            # Falls back to true if an exception occurs during reading the 
config.
+            # Otherwise, it will recursively try to get the conf when it 
consistently
+            # fails, ending up with `RecursionError`.
+            return True
+
     def _handle_rpc_error(self, rpc_error: grpc.RpcError) -> NoReturn:
         """
         Error handling helper for dealing with GRPC Errors. On the server 
side, certain
@@ -1556,7 +1585,7 @@ class SparkConnectClient(object):
                         info,
                         status.message,
                         self._fetch_enriched_error(info),
-                        True,
+                        self._display_server_stack_trace(),
                     ) from None
 
             raise SparkConnectGrpcException(status.message) from None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to