This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 981cf80f7ac [SPARK-44671][PYTHON][CONNECT] Retry ExecutePlan in case initial request didn't reach server in Python client 981cf80f7ac is described below commit 981cf80f7acc01e24be52502d756ac28dd59a3f1 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Sat Aug 5 00:58:16 2023 +0900 [SPARK-44671][PYTHON][CONNECT] Retry ExecutePlan in case initial request didn't reach server in Python client ### What changes were proposed in this pull request? The fix for the symmetry to https://github.com/apache/spark/pull/42282. ### Why are the changes needed? See also https://github.com/apache/spark/pull/42282 ### Does this PR introduce _any_ user-facing change? See also https://github.com/apache/spark/pull/42282 ### How was this patch tested? See also https://github.com/apache/spark/pull/42282 Closes #42338 from HyukjinKwon/SPARK-44671. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 780bae928399947a351dd4b36afcfc7a8be06b13) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/client/core.py | 7 ++-- python/pyspark/sql/connect/client/reattach.py | 51 ++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 816eb8344fe..b62621bc3c2 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -65,7 +65,10 @@ from google.rpc import error_details_pb2 from pyspark.version import __version__ from pyspark.resource.information import ResourceInformation from pyspark.sql.connect.client.artifact import ArtifactManager -from pyspark.sql.connect.client.reattach import ExecutePlanResponseReattachableIterator +from pyspark.sql.connect.client.reattach import ( + ExecutePlanResponseReattachableIterator, + RetryException, +) from pyspark.sql.connect.conversion import storage_level_to_proto, proto_to_storage_level import pyspark.sql.connect.proto as pb2 import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib @@ -1549,7 +1552,7 @@ class AttemptManager: ) -> Optional[bool]: if isinstance(exc_val, BaseException): # Swallow the exception. - if self._can_retry(exc_val): + if self._can_retry(exc_val) or isinstance(exc_val, RetryException): self._retry_state.set_exception(exc_val) return True # Bubble up the exception. diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py index 702107d97f5..70c7d126ff1 100644 --- a/python/pyspark/sql/connect/client/reattach.py +++ b/python/pyspark/sql/connect/client/reattach.py @@ -21,10 +21,13 @@ check_dependencies(__name__) import warnings import uuid from collections.abc import Generator -from typing import Optional, Dict, Any, Iterator, Iterable, Tuple +from typing import Optional, Dict, Any, Iterator, Iterable, Tuple, Callable, cast from multiprocessing.pool import ThreadPool import os +import grpc +from grpc_status import rpc_status + import pyspark.sql.connect.proto as pb2 import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib @@ -42,15 +45,12 @@ class ExecutePlanResponseReattachableIterator(Generator): Initial iterator is the result of an ExecutePlan on the request, but it can be reattached with ReattachExecute request. ReattachExecute request is provided the responseId of last returned ExecutePlanResponse on the iterator to return a new iterator from server that continues after - that. + that. If the initial ExecutePlan did not even reach the server, and hence reattach fails with + INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan. In reattachable execute the server does buffer some responses in case the client needs to backtrack. To let server release this buffer sooner, this iterator asynchronously sends ReleaseExecute RPCs that instruct the server to release responses that it already processed. - - Note: If the initial ExecutePlan did not even reach the server and execution didn't start, - the ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, failing the whole - operation. """ _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8) @@ -93,6 +93,7 @@ class ExecutePlanResponseReattachableIterator(Generator): # Initial iterator comes from ExecutePlan request. # Note: This is not retried, because no error would ever be thrown here, and GRPC will only # throw error on first self._has_next(). + self._metadata = metadata self._iterator: Iterator[pb2.ExecutePlanResponse] = iter( self._stub.ExecutePlan(self._initial_request, metadata=metadata) ) @@ -139,7 +140,7 @@ class ExecutePlanResponseReattachableIterator(Generator): if self._current is None: try: - self._current = next(self._iterator) + self._current = self._call_iter(lambda: next(self._iterator)) except StopIteration: pass @@ -159,7 +160,7 @@ class ExecutePlanResponseReattachableIterator(Generator): # shouldn't change assert not self._result_complete try: - self._current = next(self._iterator) + self._current = self._call_iter(lambda: next(self._iterator)) except StopIteration: pass has_next = self._current is not None @@ -226,6 +227,33 @@ class ExecutePlanResponseReattachableIterator(Generator): ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target) self._result_complete = True + def _call_iter(self, iter_fun: Callable) -> Any: + """ + Call next() on the iterator. If this fails with this operationId not existing + on the server, this means that the initial ExecutePlan request didn't even reach the + server. In that case, attempt to start again with ExecutePlan. + + Called inside retry block, so retryable failure will get handled upstream. + """ + try: + return iter_fun() + except grpc.RpcError as e: + status = rpc_status.from_call(cast(grpc.Call, e)) + if "INVALID_HANDLE.OPERATION_NOT_FOUND" in status.message: + if self._last_returned_response_id is not None: + raise RuntimeError( + "OPERATION_NOT_FOUND on the server but " + "responses were already received from it.", + e, + ) + # Try a new ExecutePlan, and throw upstream for retry. + self._iterator = iter( + self._stub.ExecutePlan(self._initial_request, metadata=self._metadata) + ) + raise RetryException() + else: + raise e + def _create_reattach_execute_request(self) -> pb2.ReattachExecuteRequest: reattach = pb2.ReattachExecuteRequest( session_id=self._initial_request.session_id, @@ -269,3 +297,10 @@ class ExecutePlanResponseReattachableIterator(Generator): def __del__(self) -> None: return self.close() + + +class RetryException(Exception): + """ + An exception that can be thrown upstream when inside retry and which will be retryable + regardless of policy. + """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org