This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ede68bf2860 [SPARK-53307][CONNECT][CLIENT][PYTHON][SCALA] Remove 
RetriesExceeded error from Spark Connect Python and Scala clients
8ede68bf2860 is described below

commit 8ede68bf2860e9e4a05d4e75bb716347793aa791
Author: Alex Khakhlyuk <alex.khakhl...@gmail.com>
AuthorDate: Mon Aug 18 18:42:22 2025 +0900

    [SPARK-53307][CONNECT][CLIENT][PYTHON][SCALA] Remove RetriesExceeded error 
from Spark Connect Python and Scala clients
    
    ### What changes were proposed in this pull request?
    
    This PR removes RetriesExceeded from the Spark Connect Scala and Python 
clients.
    
    ### Why are the changes needed?
    
    RetriesExceeded was introduced in 
https://github.com/apache/spark/pull/43591 2 years ago.
    This exception turned out to be more harmful than useful.
    1. The user does not see the underlying issue, only `[RETRIES_EXCEEDED] The 
maximum number of retries has been exceeded.` To get the underlying root 
exception, they need to inspect `RetriesExceeded.__cause__`, which is a bad 
interface.
    2. `RetriesExceeded` is a `PySparkException`, which means that it and the 
root cause are NOT handled by the error handling logic for spark connect grpc 
exceptions in 
[_handle_rpc_error](https://github.com/apache/spark/blob/337a67ff58ad91f2f86751bcb9a5e50e1de5cef2/python/pyspark/sql/connect/client/core.py#L1833).
    3. `RetriesExceeded` is not used anywhere in the code base except throwing 
it in retries.py.
    4. To simplify error handling, we replace throwing RetriesExceeded by 
throwing the original error. We also remove all internal references to 
RetriesExceeded in the code base.
    
    Description above discusses Python code, the same applies for Scala.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes
    1. RetriesExceeded will no longer be available in the Spark Connect Python 
and Scala clients.
    2. The root cause error will be thrown. The user will still see a warning 
`[RETRIES_EXCEEDED] The maximum number of retries has been exceeded.`
    3. If the user has error handling logic that catches RetriesExceeded, it 
will no longer work. The user will have to catch the root cause exception.
    
    ### How was this patch tested?
    
    Updated existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52042 from khakhlyuk/remove-retries-exceeded.
    
    Authored-by: Alex Khakhlyuk <alex.khakhl...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 dev/check_pyspark_custom_errors.py                 |  1 -
 python/docs/source/reference/pyspark.errors.rst    |  1 -
 python/pyspark/errors/__init__.py                  |  2 --
 python/pyspark/errors/error-conditions.json        |  5 ----
 python/pyspark/errors/exceptions/base.py           |  7 -----
 python/pyspark/sql/connect/client/retries.py       |  8 +++--
 .../tests/connect/client/test_client_retries.py    | 35 ++++++++++++++++++----
 .../sql/tests/connect/test_connect_retry.py        |  7 ++---
 .../client/SparkConnectClientRetriesSuite.scala    | 12 ++++----
 .../sql/connect/client/GrpcRetryHandler.scala      |  5 ++--
 .../spark/sql/connect/client/RetriesExceeded.scala | 25 ----------------
 11 files changed, 45 insertions(+), 63 deletions(-)

diff --git a/dev/check_pyspark_custom_errors.py 
b/dev/check_pyspark_custom_errors.py
index bce73c84028a..db152c77d1b8 100644
--- a/dev/check_pyspark_custom_errors.py
+++ b/dev/check_pyspark_custom_errors.py
@@ -124,7 +124,6 @@ if __name__ == "__main__":
         "PySparkValueError",
         "PythonException",
         "QueryExecutionException",
-        "RetriesExceeded",
         "SessionNotSameException",
         "SparkNoSuchElementException",
         "SparkRuntimeException",
diff --git a/python/docs/source/reference/pyspark.errors.rst 
b/python/docs/source/reference/pyspark.errors.rst
index 4d2bf7612779..1204cccc8df5 100644
--- a/python/docs/source/reference/pyspark.errors.rst
+++ b/python/docs/source/reference/pyspark.errors.rst
@@ -50,7 +50,6 @@ Classes
     QueryContext
     QueryContextType
     QueryExecutionException
-    RetriesExceeded
     SessionNotSameException
     SparkRuntimeException
     SparkUpgradeException
diff --git a/python/pyspark/errors/__init__.py 
b/python/pyspark/errors/__init__.py
index 98f8aa593371..c20b057b74e6 100644
--- a/python/pyspark/errors/__init__.py
+++ b/python/pyspark/errors/__init__.py
@@ -46,7 +46,6 @@ from pyspark.errors.exceptions.base import (  # noqa: F401
     PySparkAssertionError,
     PySparkNotImplementedError,
     PySparkPicklingError,
-    RetriesExceeded,
     PySparkKeyError,
     QueryContext,
     QueryContextType,
@@ -83,7 +82,6 @@ __all__ = [
     "PySparkAssertionError",
     "PySparkNotImplementedError",
     "PySparkPicklingError",
-    "RetriesExceeded",
     "PySparkKeyError",
     "QueryContext",
     "QueryContextType",
diff --git a/python/pyspark/errors/error-conditions.json 
b/python/pyspark/errors/error-conditions.json
index 154110717785..ba682a65a9f4 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -987,11 +987,6 @@
       "Columns do not match in their data type: <mismatch>."
     ]
   },
-  "RETRIES_EXCEEDED": {
-    "message": [
-      "The maximum number of retries has been exceeded."
-    ]
-  },
   "REUSE_OBSERVATION": {
     "message": [
       "An Observation can be used with a DataFrame only once."
diff --git a/python/pyspark/errors/exceptions/base.py 
b/python/pyspark/errors/exceptions/base.py
index 4fa7ea92e34c..b0455b64dabf 100644
--- a/python/pyspark/errors/exceptions/base.py
+++ b/python/pyspark/errors/exceptions/base.py
@@ -350,13 +350,6 @@ class PySparkPicklingError(PySparkException, 
PicklingError):
     """
 
 
-class RetriesExceeded(PySparkException):
-    """
-    Represents an exception which is considered retriable, but retry limits
-    were exceeded
-    """
-
-
 class PySparkKeyError(PySparkException, KeyError):
     """
     Wrapper class for KeyError to support error classes.
diff --git a/python/pyspark/sql/connect/client/retries.py 
b/python/pyspark/sql/connect/client/retries.py
index 436da250d791..898d976f2628 100644
--- a/python/pyspark/sql/connect/client/retries.py
+++ b/python/pyspark/sql/connect/client/retries.py
@@ -19,12 +19,13 @@ import grpc
 import random
 import time
 import typing
+import warnings
 from google.rpc import error_details_pb2
 from grpc_status import rpc_status
 from typing import Optional, Callable, Generator, List, Type, cast
 from types import TracebackType
 from pyspark.sql.connect.logging import logger
-from pyspark.errors import PySparkRuntimeError, RetriesExceeded
+from pyspark.errors import PySparkRuntimeError
 
 """
 This module contains retry system. The system is designed to be
@@ -210,7 +211,7 @@ class Retrying:
             Do something that can throw exception
 
     In case error is considered retriable, it would be retried based on 
policies, and
-    RetriesExceeded will be raised if the retries limit would exceed.
+    it will be raised if the retries limit would exceed.
 
     Exceptions not considered retriable will be passed through transparently.
     """
@@ -277,7 +278,8 @@ class Retrying:
 
         # Exceeded retries
         logger.debug(f"Given up on retrying. error: {repr(exception)}")
-        raise RetriesExceeded(errorClass="RETRIES_EXCEEDED", 
messageParameters={}) from exception
+        warnings.warn("[RETRIES_EXCEEDED] The maximum number of retries has 
been exceeded.")
+        raise exception
 
     def __iter__(self) -> Generator[AttemptManager, None, None]:
         """
diff --git a/python/pyspark/sql/tests/connect/client/test_client_retries.py 
b/python/pyspark/sql/tests/connect/client/test_client_retries.py
index 400442363b47..fb3263e2824f 100644
--- a/python/pyspark/sql/tests/connect/client/test_client_retries.py
+++ b/python/pyspark/sql/tests/connect/client/test_client_retries.py
@@ -16,6 +16,7 @@
 #
 
 import unittest
+import warnings
 
 from pyspark.testing.connectutils import should_test_connect, 
connect_requirement_message
 
@@ -30,7 +31,6 @@ if should_test_connect:
         Retrying,
         DefaultPolicy,
     )
-    from pyspark.errors import RetriesExceeded
     from pyspark.sql.tests.connect.client.test_client import (
         TestPolicy,
         TestException,
@@ -91,7 +91,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase):
             for attempt in Retrying(client._retry_policies, 
sleep=sleep_tracker.sleep):
                 with attempt:
                     raise TestException("Retryable error", 
grpc.StatusCode.UNAVAILABLE)
-        except RetriesExceeded:
+        except TestException:
             pass
 
         # tolerated at least 10 mins of fails
@@ -107,6 +107,29 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase):
 
         self.assertEqual(client.get_retry_policies(), [policyA, policyB])
 
+    def test_warning_works(self):
+        client = SparkConnectClient("sc://foo/;token=bar")
+        policy = get_client_policies_map(client).get(DefaultPolicy)
+        self.assertIsNotNone(policy)
+
+        sleep_tracker = SleepTimeTracker()
+        with warnings.catch_warnings(record=True) as warning_list:
+            warnings.simplefilter("always")
+            try:
+                for attempt in Retrying(client._retry_policies, 
sleep=sleep_tracker.sleep):
+                    with attempt:
+                        raise TestException(
+                            msg="Some error message", 
code=grpc.StatusCode.UNAVAILABLE
+                        )
+            except TestException:
+                pass
+            self.assertEqual(len(sleep_tracker.times), policy.max_retries)
+            self.assertEqual(len(warning_list), 1)
+            self.assertEqual(
+                str(warning_list[0].message),
+                "[RETRIES_EXCEEDED] The maximum number of retries has been 
exceeded.",
+            )
+
     def test_default_policy_retries_retry_info(self):
         client = SparkConnectClient("sc://foo/;token=bar")
         policy = get_client_policies_map(client).get(DefaultPolicy)
@@ -124,7 +147,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase):
                         code=grpc.StatusCode.UNIMPLEMENTED,
                         retry_delay=retry_delay,
                     )
-        except RetriesExceeded:
+        except TestException:
             pass
         expected_times = [
             min(policy.max_backoff, policy.initial_backoff * 
policy.backoff_multiplier**i)
@@ -151,7 +174,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase):
                         grpc.StatusCode.UNAVAILABLE,
                         retry_delay,
                     )
-        except RetriesExceeded:
+        except TestException:
             pass
         expected_times = [retry_delay] * policy.max_retries
         self.assertListsAlmostEqual(sleep_tracker.times, expected_times, 
delta=policy.jitter)
@@ -173,7 +196,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase):
                         grpc.StatusCode.UNAVAILABLE,
                         retry_delay,
                     )
-        except RetriesExceeded:
+        except TestException:
             pass
 
         expected_times = [policy.max_server_retry_delay] * policy.max_retries
@@ -204,7 +227,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase):
                         grpc.StatusCode.UNAVAILABLE,
                         retry_delay,
                     )
-        except RetriesExceeded:
+        except TestException:
             pass
 
         expected_times = [initial_retry_delay] * 2 + [
diff --git a/python/pyspark/sql/tests/connect/test_connect_retry.py 
b/python/pyspark/sql/tests/connect/test_connect_retry.py
index 61ab0dcea862..21796869e385 100644
--- a/python/pyspark/sql/tests/connect/test_connect_retry.py
+++ b/python/pyspark/sql/tests/connect/test_connect_retry.py
@@ -18,7 +18,6 @@
 import unittest
 from collections import defaultdict
 
-from pyspark.errors import RetriesExceeded
 from pyspark.testing.connectutils import (
     should_test_connect,
     connect_requirement_message,
@@ -88,7 +87,7 @@ class RetryTests(unittest.TestCase):
 
     def test_exceed_retries(self):
         # Exceed the retries.
-        with self.assertRaises(RetriesExceeded):
+        with self.assertRaises(TestError):
             for attempt in Retrying(TestPolicy(max_retries=2)):
                 with attempt:
                     self.stub(5, grpc.StatusCode.INTERNAL)
@@ -117,7 +116,7 @@ class RetryTests(unittest.TestCase):
     def test_specific_exception_exceed_retries(self):
         # Exceed the retries.
         policy = TestPolicySpecificError(max_retries=2, 
specific_code=grpc.StatusCode.UNAVAILABLE)
-        with self.assertRaises(RetriesExceeded):
+        with self.assertRaises(TestError):
             for attempt in Retrying(policy):
                 with attempt:
                     self.stub(5, grpc.StatusCode.UNAVAILABLE)
@@ -157,7 +156,7 @@ class RetryTests(unittest.TestCase):
         policy1 = TestPolicySpecificError(max_retries=2, 
specific_code=grpc.StatusCode.INTERNAL)
         policy2 = TestPolicySpecificError(max_retries=4, 
specific_code=grpc.StatusCode.INTERNAL)
 
-        with self.assertRaises(RetriesExceeded):
+        with self.assertRaises(TestError):
             for attempt in Retrying([policy1, policy2]):
                 with attempt:
                     self.stub(10, grpc.StatusCode.INTERNAL)
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala
index 3408c15b73f0..c0738d7de325 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala
@@ -96,7 +96,7 @@ class SparkConnectClientRetriesSuite
       val dummyFn = new DummyFn(new 
StatusRuntimeException(Status.UNAVAILABLE), numFails = 100)
       val retryHandler = new GrpcRetryHandler(RetryPolicy.defaultPolicies(), 
st.sleep)
 
-      assertThrows[RetriesExceeded] {
+      assertThrows[StatusRuntimeException] {
         retryHandler.retry {
           dummyFn.fn()
         }
@@ -143,7 +143,7 @@ class SparkConnectClientRetriesSuite
     val retryPolicy = RetryPolicy(canRetry = _ => true, maxRetries = Some(1), 
name = "TestPolicy")
     val retryHandler = new GrpcRetryHandler(retryPolicy, sleep = _ => {})
 
-    assertThrows[RetriesExceeded] {
+    assertThrows[StatusRuntimeException] {
       retryHandler.retry { dummyFn.fn() }
     }
     assert(dummyFn.counter == 2)
@@ -184,7 +184,7 @@ class SparkConnectClientRetriesSuite
     val errors = List.fill(10)(Status.INTERNAL).iterator
     var countAttempted = 0
 
-    assertThrows[RetriesExceeded](
+    assertThrows[StatusRuntimeException](
       new GrpcRetryHandler(List(policy1, policy2), sleep = _ => {}).retry({
         countAttempted += 1
         val e = errors.nextOption()
@@ -202,7 +202,7 @@ class SparkConnectClientRetriesSuite
       new DummyFn(createTestExceptionWithDetails(msg = "Some error message"), 
numFails = 100)
     val retryPolicies = RetryPolicy.defaultPolicies()
     val retryHandler = new GrpcRetryHandler(retryPolicies, sleep = _ => {})
-    assertThrows[RetriesExceeded] {
+    assertThrows[StatusRuntimeException] {
       retryHandler.retry { dummyFn.fn() }
     }
 
@@ -220,7 +220,7 @@ class SparkConnectClientRetriesSuite
     val retryPolicies = RetryPolicy.defaultPolicies()
     val retryHandler = new GrpcRetryHandler(retryPolicies, sleep = st.sleep)
 
-    assertThrows[RetriesExceeded] {
+    assertThrows[StatusRuntimeException] {
       retryHandler.retry { dummyFn.fn() }
     }
 
@@ -241,7 +241,7 @@ class SparkConnectClientRetriesSuite
     val retryPolicies = RetryPolicy.defaultPolicies()
     val retryHandler = new GrpcRetryHandler(retryPolicies, sleep = st.sleep)
 
-    assertThrows[RetriesExceeded] {
+    assertThrows[StatusRuntimeException] {
       retryHandler.retry { dummyFn.fn() }
     }
 
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index 855d5651e3f7..3f4558ee97da 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -213,9 +213,8 @@ private[sql] object GrpcRetryHandler extends Logging {
         log"Non-Fatal error during RPC execution: ${MDC(ERROR, 
lastException)}, " +
           log"exceeded retries (currentRetryNum=${MDC(NUM_RETRY, 
currentRetryNum)})")
 
-      val error = new RetriesExceeded()
-      exceptionList.foreach(error.addSuppressed)
-      throw error
+      logWarning(log"[RETRIES_EXCEEDED] The maximum number of retries has been 
exceeded.")
+      throw lastException
     }
 
     def retry(): T = {
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetriesExceeded.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetriesExceeded.scala
deleted file mode 100644
index 77e1c0deab24..000000000000
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetriesExceeded.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connect.client
-
-/**
- * Represents an exception which was considered retriable but has exceeded 
retry limits.
- *
- * The actual exceptions incurred can be retrieved with getSuppressed()
- */
-class RetriesExceeded extends Throwable


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to