This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8cd466136528 [SPARK-46308] Forbid recursive error handling by adding
recursion guards
8cd466136528 is described below
commit 8cd466136528e479514a1d8dfe8ecaba9d0f8cce
Author: Alice Sayutina <[email protected]>
AuthorDate: Tue Dec 19 10:24:58 2023 +0900
[SPARK-46308] Forbid recursive error handling by adding recursion guards
### What changes were proposed in this pull request?
Revert https://github.com/apache/spark/pull/44144, and introduce a forbid
recursion guard as previously proposed. This way the infinite error handling
recursion is still prevented, but the client-side knob is still present.
### Why are the changes needed?
Previously proposed as part of https://github.com/apache/spark/pull/44144,
however was discussed in favour of something else. However it seems (proposal
by grundprinzip) that the original proposal was more correct, since it seems
driver stacktrace is decided on client not server (see
https://github.com/apache/spark/pull/43667)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Hand testing
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44210 from cdkrot/forbid_recursive_error_handling_2.
Authored-by: Alice Sayutina <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 49 ++++++++++++++++++++++++-------
1 file changed, 39 insertions(+), 10 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index 85b2b98fef13..10235fd7d6c4 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -587,7 +587,12 @@ class SparkConnectClient(object):
use_reattachable_execute: bool
Enable reattachable execution.
"""
- self.thread_local = threading.local()
+
+ class ClientThreadLocals(threading.local):
+ tags: set = set()
+ inside_error_handling: bool = False
+
+ self.thread_local = ClientThreadLocals()
# Parse the connection string.
self._builder = (
@@ -1497,14 +1502,24 @@ class SparkConnectClient(object):
-------
Throws the appropriate internal Python exception.
"""
- if isinstance(error, grpc.RpcError):
- self._handle_rpc_error(error)
- elif isinstance(error, ValueError):
- if "Cannot invoke RPC" in str(error) and "closed" in str(error):
- raise SparkConnectException(
- error_class="NO_ACTIVE_SESSION", message_parameters=dict()
- ) from None
- raise error
+
+ if self.thread_local.inside_error_handling:
+ # We are already inside error handling routine,
+ # avoid recursive error processing (with potentially infinite
recursion)
+ raise error
+
+ try:
+ self.thread_local.inside_error_handling = True
+ if isinstance(error, grpc.RpcError):
+ self._handle_rpc_error(error)
+ elif isinstance(error, ValueError):
+ if "Cannot invoke RPC" in str(error) and "closed" in
str(error):
+ raise SparkConnectException(
+ error_class="NO_ACTIVE_SESSION",
message_parameters=dict()
+ ) from None
+ raise error
+ finally:
+ self.thread_local.inside_error_handling = False
def _fetch_enriched_error(self, info: "ErrorInfo") ->
Optional[pb2.FetchErrorDetailsResponse]:
if "errorId" not in info.metadata:
@@ -1523,6 +1538,20 @@ class SparkConnectClient(object):
except grpc.RpcError:
return None
+ def _display_server_stack_trace(self) -> bool:
+ from pyspark.sql.connect.conf import RuntimeConf
+
+ conf = RuntimeConf(self)
+ try:
+ if conf.get("spark.sql.connect.serverStacktrace.enabled") ==
"true":
+ return True
+ return conf.get("spark.sql.pyspark.jvmStacktrace.enabled") ==
"true"
+ except Exception as e: # noqa: F841
+ # Falls back to true if an exception occurs during reading the
config.
+ # Otherwise, it will recursively try to get the conf when it
consistently
+ # fails, ending up with `RecursionError`.
+ return True
+
def _handle_rpc_error(self, rpc_error: grpc.RpcError) -> NoReturn:
"""
Error handling helper for dealing with GRPC Errors. On the server
side, certain
@@ -1556,7 +1585,7 @@ class SparkConnectClient(object):
info,
status.message,
self._fetch_enriched_error(info),
- True,
+ self._display_server_stack_trace(),
) from None
raise SparkConnectGrpcException(status.message) from None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]