This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8ede68bf2860 [SPARK-53307][CONNECT][CLIENT][PYTHON][SCALA] Remove RetriesExceeded error from Spark Connect Python and Scala clients 8ede68bf2860 is described below commit 8ede68bf2860e9e4a05d4e75bb716347793aa791 Author: Alex Khakhlyuk <alex.khakhl...@gmail.com> AuthorDate: Mon Aug 18 18:42:22 2025 +0900 [SPARK-53307][CONNECT][CLIENT][PYTHON][SCALA] Remove RetriesExceeded error from Spark Connect Python and Scala clients ### What changes were proposed in this pull request? This PR removes RetriesExceeded from the Spark Connect Scala and Python clients. ### Why are the changes needed? RetriesExceeded was introduced in https://github.com/apache/spark/pull/43591 2 years ago. This exception turned out to be more harmful than useful. 1. The user does not see the underlying issue, only `[RETRIES_EXCEEDED] The maximum number of retries has been exceeded.` To get the underlying root exception, they need to inspect `RetriesExceeded.__cause__`, which is a bad interface. 2. `RetriesExceeded` is a `PySparkException`, which means that it and the root cause are NOT handled by the error handling logic for spark connect grpc exceptions in [_handle_rpc_error](https://github.com/apache/spark/blob/337a67ff58ad91f2f86751bcb9a5e50e1de5cef2/python/pyspark/sql/connect/client/core.py#L1833). 3. `RetriesExceeded` is not used anywhere in the code base except throwing it in retries.py. 4. To simplify error handling, we replace throwing RetriesExceeded by throwing the original error. We also remove all internal references to RetriesExceeded in the code base. Description above discusses Python code, the same applies for Scala. ### Does this PR introduce _any_ user-facing change? Yes 1. RetriesExceeded will no longer be available in the Spark Connect Python and Scala clients. 2. The root cause error will be thrown. The user will still see a warning `[RETRIES_EXCEEDED] The maximum number of retries has been exceeded.` 3. If the user has error handling logic that catches RetriesExceeded, it will no longer work. The user will have to catch the root cause exception. ### How was this patch tested? Updated existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52042 from khakhlyuk/remove-retries-exceeded. Authored-by: Alex Khakhlyuk <alex.khakhl...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- dev/check_pyspark_custom_errors.py | 1 - python/docs/source/reference/pyspark.errors.rst | 1 - python/pyspark/errors/__init__.py | 2 -- python/pyspark/errors/error-conditions.json | 5 ---- python/pyspark/errors/exceptions/base.py | 7 ----- python/pyspark/sql/connect/client/retries.py | 8 +++-- .../tests/connect/client/test_client_retries.py | 35 ++++++++++++++++++---- .../sql/tests/connect/test_connect_retry.py | 7 ++--- .../client/SparkConnectClientRetriesSuite.scala | 12 ++++---- .../sql/connect/client/GrpcRetryHandler.scala | 5 ++-- .../spark/sql/connect/client/RetriesExceeded.scala | 25 ---------------- 11 files changed, 45 insertions(+), 63 deletions(-) diff --git a/dev/check_pyspark_custom_errors.py b/dev/check_pyspark_custom_errors.py index bce73c84028a..db152c77d1b8 100644 --- a/dev/check_pyspark_custom_errors.py +++ b/dev/check_pyspark_custom_errors.py @@ -124,7 +124,6 @@ if __name__ == "__main__": "PySparkValueError", "PythonException", "QueryExecutionException", - "RetriesExceeded", "SessionNotSameException", "SparkNoSuchElementException", "SparkRuntimeException", diff --git a/python/docs/source/reference/pyspark.errors.rst b/python/docs/source/reference/pyspark.errors.rst index 4d2bf7612779..1204cccc8df5 100644 --- a/python/docs/source/reference/pyspark.errors.rst +++ b/python/docs/source/reference/pyspark.errors.rst @@ -50,7 +50,6 @@ Classes QueryContext QueryContextType QueryExecutionException - RetriesExceeded SessionNotSameException SparkRuntimeException SparkUpgradeException diff --git a/python/pyspark/errors/__init__.py b/python/pyspark/errors/__init__.py index 98f8aa593371..c20b057b74e6 100644 --- a/python/pyspark/errors/__init__.py +++ b/python/pyspark/errors/__init__.py @@ -46,7 +46,6 @@ from pyspark.errors.exceptions.base import ( # noqa: F401 PySparkAssertionError, PySparkNotImplementedError, PySparkPicklingError, - RetriesExceeded, PySparkKeyError, QueryContext, QueryContextType, @@ -83,7 +82,6 @@ __all__ = [ "PySparkAssertionError", "PySparkNotImplementedError", "PySparkPicklingError", - "RetriesExceeded", "PySparkKeyError", "QueryContext", "QueryContextType", diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index 154110717785..ba682a65a9f4 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -987,11 +987,6 @@ "Columns do not match in their data type: <mismatch>." ] }, - "RETRIES_EXCEEDED": { - "message": [ - "The maximum number of retries has been exceeded." - ] - }, "REUSE_OBSERVATION": { "message": [ "An Observation can be used with a DataFrame only once." diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index 4fa7ea92e34c..b0455b64dabf 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -350,13 +350,6 @@ class PySparkPicklingError(PySparkException, PicklingError): """ -class RetriesExceeded(PySparkException): - """ - Represents an exception which is considered retriable, but retry limits - were exceeded - """ - - class PySparkKeyError(PySparkException, KeyError): """ Wrapper class for KeyError to support error classes. diff --git a/python/pyspark/sql/connect/client/retries.py b/python/pyspark/sql/connect/client/retries.py index 436da250d791..898d976f2628 100644 --- a/python/pyspark/sql/connect/client/retries.py +++ b/python/pyspark/sql/connect/client/retries.py @@ -19,12 +19,13 @@ import grpc import random import time import typing +import warnings from google.rpc import error_details_pb2 from grpc_status import rpc_status from typing import Optional, Callable, Generator, List, Type, cast from types import TracebackType from pyspark.sql.connect.logging import logger -from pyspark.errors import PySparkRuntimeError, RetriesExceeded +from pyspark.errors import PySparkRuntimeError """ This module contains retry system. The system is designed to be @@ -210,7 +211,7 @@ class Retrying: Do something that can throw exception In case error is considered retriable, it would be retried based on policies, and - RetriesExceeded will be raised if the retries limit would exceed. + it will be raised if the retries limit would exceed. Exceptions not considered retriable will be passed through transparently. """ @@ -277,7 +278,8 @@ class Retrying: # Exceeded retries logger.debug(f"Given up on retrying. error: {repr(exception)}") - raise RetriesExceeded(errorClass="RETRIES_EXCEEDED", messageParameters={}) from exception + warnings.warn("[RETRIES_EXCEEDED] The maximum number of retries has been exceeded.") + raise exception def __iter__(self) -> Generator[AttemptManager, None, None]: """ diff --git a/python/pyspark/sql/tests/connect/client/test_client_retries.py b/python/pyspark/sql/tests/connect/client/test_client_retries.py index 400442363b47..fb3263e2824f 100644 --- a/python/pyspark/sql/tests/connect/client/test_client_retries.py +++ b/python/pyspark/sql/tests/connect/client/test_client_retries.py @@ -16,6 +16,7 @@ # import unittest +import warnings from pyspark.testing.connectutils import should_test_connect, connect_requirement_message @@ -30,7 +31,6 @@ if should_test_connect: Retrying, DefaultPolicy, ) - from pyspark.errors import RetriesExceeded from pyspark.sql.tests.connect.client.test_client import ( TestPolicy, TestException, @@ -91,7 +91,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase): for attempt in Retrying(client._retry_policies, sleep=sleep_tracker.sleep): with attempt: raise TestException("Retryable error", grpc.StatusCode.UNAVAILABLE) - except RetriesExceeded: + except TestException: pass # tolerated at least 10 mins of fails @@ -107,6 +107,29 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase): self.assertEqual(client.get_retry_policies(), [policyA, policyB]) + def test_warning_works(self): + client = SparkConnectClient("sc://foo/;token=bar") + policy = get_client_policies_map(client).get(DefaultPolicy) + self.assertIsNotNone(policy) + + sleep_tracker = SleepTimeTracker() + with warnings.catch_warnings(record=True) as warning_list: + warnings.simplefilter("always") + try: + for attempt in Retrying(client._retry_policies, sleep=sleep_tracker.sleep): + with attempt: + raise TestException( + msg="Some error message", code=grpc.StatusCode.UNAVAILABLE + ) + except TestException: + pass + self.assertEqual(len(sleep_tracker.times), policy.max_retries) + self.assertEqual(len(warning_list), 1) + self.assertEqual( + str(warning_list[0].message), + "[RETRIES_EXCEEDED] The maximum number of retries has been exceeded.", + ) + def test_default_policy_retries_retry_info(self): client = SparkConnectClient("sc://foo/;token=bar") policy = get_client_policies_map(client).get(DefaultPolicy) @@ -124,7 +147,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase): code=grpc.StatusCode.UNIMPLEMENTED, retry_delay=retry_delay, ) - except RetriesExceeded: + except TestException: pass expected_times = [ min(policy.max_backoff, policy.initial_backoff * policy.backoff_multiplier**i) @@ -151,7 +174,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase): grpc.StatusCode.UNAVAILABLE, retry_delay, ) - except RetriesExceeded: + except TestException: pass expected_times = [retry_delay] * policy.max_retries self.assertListsAlmostEqual(sleep_tracker.times, expected_times, delta=policy.jitter) @@ -173,7 +196,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase): grpc.StatusCode.UNAVAILABLE, retry_delay, ) - except RetriesExceeded: + except TestException: pass expected_times = [policy.max_server_retry_delay] * policy.max_retries @@ -204,7 +227,7 @@ class SparkConnectClientRetriesTestCase(unittest.TestCase): grpc.StatusCode.UNAVAILABLE, retry_delay, ) - except RetriesExceeded: + except TestException: pass expected_times = [initial_retry_delay] * 2 + [ diff --git a/python/pyspark/sql/tests/connect/test_connect_retry.py b/python/pyspark/sql/tests/connect/test_connect_retry.py index 61ab0dcea862..21796869e385 100644 --- a/python/pyspark/sql/tests/connect/test_connect_retry.py +++ b/python/pyspark/sql/tests/connect/test_connect_retry.py @@ -18,7 +18,6 @@ import unittest from collections import defaultdict -from pyspark.errors import RetriesExceeded from pyspark.testing.connectutils import ( should_test_connect, connect_requirement_message, @@ -88,7 +87,7 @@ class RetryTests(unittest.TestCase): def test_exceed_retries(self): # Exceed the retries. - with self.assertRaises(RetriesExceeded): + with self.assertRaises(TestError): for attempt in Retrying(TestPolicy(max_retries=2)): with attempt: self.stub(5, grpc.StatusCode.INTERNAL) @@ -117,7 +116,7 @@ class RetryTests(unittest.TestCase): def test_specific_exception_exceed_retries(self): # Exceed the retries. policy = TestPolicySpecificError(max_retries=2, specific_code=grpc.StatusCode.UNAVAILABLE) - with self.assertRaises(RetriesExceeded): + with self.assertRaises(TestError): for attempt in Retrying(policy): with attempt: self.stub(5, grpc.StatusCode.UNAVAILABLE) @@ -157,7 +156,7 @@ class RetryTests(unittest.TestCase): policy1 = TestPolicySpecificError(max_retries=2, specific_code=grpc.StatusCode.INTERNAL) policy2 = TestPolicySpecificError(max_retries=4, specific_code=grpc.StatusCode.INTERNAL) - with self.assertRaises(RetriesExceeded): + with self.assertRaises(TestError): for attempt in Retrying([policy1, policy2]): with attempt: self.stub(10, grpc.StatusCode.INTERNAL) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala index 3408c15b73f0..c0738d7de325 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientRetriesSuite.scala @@ -96,7 +96,7 @@ class SparkConnectClientRetriesSuite val dummyFn = new DummyFn(new StatusRuntimeException(Status.UNAVAILABLE), numFails = 100) val retryHandler = new GrpcRetryHandler(RetryPolicy.defaultPolicies(), st.sleep) - assertThrows[RetriesExceeded] { + assertThrows[StatusRuntimeException] { retryHandler.retry { dummyFn.fn() } @@ -143,7 +143,7 @@ class SparkConnectClientRetriesSuite val retryPolicy = RetryPolicy(canRetry = _ => true, maxRetries = Some(1), name = "TestPolicy") val retryHandler = new GrpcRetryHandler(retryPolicy, sleep = _ => {}) - assertThrows[RetriesExceeded] { + assertThrows[StatusRuntimeException] { retryHandler.retry { dummyFn.fn() } } assert(dummyFn.counter == 2) @@ -184,7 +184,7 @@ class SparkConnectClientRetriesSuite val errors = List.fill(10)(Status.INTERNAL).iterator var countAttempted = 0 - assertThrows[RetriesExceeded]( + assertThrows[StatusRuntimeException]( new GrpcRetryHandler(List(policy1, policy2), sleep = _ => {}).retry({ countAttempted += 1 val e = errors.nextOption() @@ -202,7 +202,7 @@ class SparkConnectClientRetriesSuite new DummyFn(createTestExceptionWithDetails(msg = "Some error message"), numFails = 100) val retryPolicies = RetryPolicy.defaultPolicies() val retryHandler = new GrpcRetryHandler(retryPolicies, sleep = _ => {}) - assertThrows[RetriesExceeded] { + assertThrows[StatusRuntimeException] { retryHandler.retry { dummyFn.fn() } } @@ -220,7 +220,7 @@ class SparkConnectClientRetriesSuite val retryPolicies = RetryPolicy.defaultPolicies() val retryHandler = new GrpcRetryHandler(retryPolicies, sleep = st.sleep) - assertThrows[RetriesExceeded] { + assertThrows[StatusRuntimeException] { retryHandler.retry { dummyFn.fn() } } @@ -241,7 +241,7 @@ class SparkConnectClientRetriesSuite val retryPolicies = RetryPolicy.defaultPolicies() val retryHandler = new GrpcRetryHandler(retryPolicies, sleep = st.sleep) - assertThrows[RetriesExceeded] { + assertThrows[StatusRuntimeException] { retryHandler.retry { dummyFn.fn() } } diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index 855d5651e3f7..3f4558ee97da 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -213,9 +213,8 @@ private[sql] object GrpcRetryHandler extends Logging { log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + log"exceeded retries (currentRetryNum=${MDC(NUM_RETRY, currentRetryNum)})") - val error = new RetriesExceeded() - exceptionList.foreach(error.addSuppressed) - throw error + logWarning(log"[RETRIES_EXCEEDED] The maximum number of retries has been exceeded.") + throw lastException } def retry(): T = { diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetriesExceeded.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetriesExceeded.scala deleted file mode 100644 index 77e1c0deab24..000000000000 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetriesExceeded.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connect.client - -/** - * Represents an exception which was considered retriable but has exceeded retry limits. - * - * The actual exceptions incurred can be retrieved with getSuppressed() - */ -class RetriesExceeded extends Throwable --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org