This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 981cf80f7ac [SPARK-44671][PYTHON][CONNECT] Retry ExecutePlan in case 
initial request didn't reach server in Python client
981cf80f7ac is described below

commit 981cf80f7acc01e24be52502d756ac28dd59a3f1
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Sat Aug 5 00:58:16 2023 +0900

    [SPARK-44671][PYTHON][CONNECT] Retry ExecutePlan in case initial request 
didn't reach server in Python client
    
    ### What changes were proposed in this pull request?
    
    The fix for the symmetry to https://github.com/apache/spark/pull/42282.
    
    ### Why are the changes needed?
    
    See also https://github.com/apache/spark/pull/42282
    
    ### Does this PR introduce _any_ user-facing change?
    
    See also https://github.com/apache/spark/pull/42282
    
    ### How was this patch tested?
    
    See also https://github.com/apache/spark/pull/42282
    
    Closes #42338 from HyukjinKwon/SPARK-44671.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 780bae928399947a351dd4b36afcfc7a8be06b13)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/client/core.py     |  7 ++--
 python/pyspark/sql/connect/client/reattach.py | 51 ++++++++++++++++++++++-----
 2 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 816eb8344fe..b62621bc3c2 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -65,7 +65,10 @@ from google.rpc import error_details_pb2
 from pyspark.version import __version__
 from pyspark.resource.information import ResourceInformation
 from pyspark.sql.connect.client.artifact import ArtifactManager
-from pyspark.sql.connect.client.reattach import 
ExecutePlanResponseReattachableIterator
+from pyspark.sql.connect.client.reattach import (
+    ExecutePlanResponseReattachableIterator,
+    RetryException,
+)
 from pyspark.sql.connect.conversion import storage_level_to_proto, 
proto_to_storage_level
 import pyspark.sql.connect.proto as pb2
 import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib
@@ -1549,7 +1552,7 @@ class AttemptManager:
     ) -> Optional[bool]:
         if isinstance(exc_val, BaseException):
             # Swallow the exception.
-            if self._can_retry(exc_val):
+            if self._can_retry(exc_val) or isinstance(exc_val, RetryException):
                 self._retry_state.set_exception(exc_val)
                 return True
             # Bubble up the exception.
diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index 702107d97f5..70c7d126ff1 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -21,10 +21,13 @@ check_dependencies(__name__)
 import warnings
 import uuid
 from collections.abc import Generator
-from typing import Optional, Dict, Any, Iterator, Iterable, Tuple
+from typing import Optional, Dict, Any, Iterator, Iterable, Tuple, Callable, 
cast
 from multiprocessing.pool import ThreadPool
 import os
 
+import grpc
+from grpc_status import rpc_status
+
 import pyspark.sql.connect.proto as pb2
 import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib
 
@@ -42,15 +45,12 @@ class ExecutePlanResponseReattachableIterator(Generator):
     Initial iterator is the result of an ExecutePlan on the request, but it 
can be reattached with
     ReattachExecute request. ReattachExecute request is provided the 
responseId of last returned
     ExecutePlanResponse on the iterator to return a new iterator from server 
that continues after
-    that.
+    that. If the initial ExecutePlan did not even reach the server, and hence 
reattach fails with
+    INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
 
     In reattachable execute the server does buffer some responses in case the 
client needs to
     backtrack. To let server release this buffer sooner, this iterator 
asynchronously sends
     ReleaseExecute RPCs that instruct the server to release responses that it 
already processed.
-
-    Note: If the initial ExecutePlan did not even reach the server and 
execution didn't start,
-    the ReattachExecute can still fail with 
INVALID_HANDLE.OPERATION_NOT_FOUND, failing the whole
-    operation.
     """
 
     _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
@@ -93,6 +93,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
         # Initial iterator comes from ExecutePlan request.
         # Note: This is not retried, because no error would ever be thrown 
here, and GRPC will only
         # throw error on first self._has_next().
+        self._metadata = metadata
         self._iterator: Iterator[pb2.ExecutePlanResponse] = iter(
             self._stub.ExecutePlan(self._initial_request, metadata=metadata)
         )
@@ -139,7 +140,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
 
                         if self._current is None:
                             try:
-                                self._current = next(self._iterator)
+                                self._current = self._call_iter(lambda: 
next(self._iterator))
                             except StopIteration:
                                 pass
 
@@ -159,7 +160,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
                                 # shouldn't change
                                 assert not self._result_complete
                                 try:
-                                    self._current = next(self._iterator)
+                                    self._current = self._call_iter(lambda: 
next(self._iterator))
                                 except StopIteration:
                                     pass
                                 has_next = self._current is not None
@@ -226,6 +227,33 @@ class ExecutePlanResponseReattachableIterator(Generator):
         
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
         self._result_complete = True
 
+    def _call_iter(self, iter_fun: Callable) -> Any:
+        """
+        Call next() on the iterator. If this fails with this operationId not 
existing
+        on the server, this means that the initial ExecutePlan request didn't 
even reach the
+        server. In that case, attempt to start again with ExecutePlan.
+
+        Called inside retry block, so retryable failure will get handled 
upstream.
+        """
+        try:
+            return iter_fun()
+        except grpc.RpcError as e:
+            status = rpc_status.from_call(cast(grpc.Call, e))
+            if "INVALID_HANDLE.OPERATION_NOT_FOUND" in status.message:
+                if self._last_returned_response_id is not None:
+                    raise RuntimeError(
+                        "OPERATION_NOT_FOUND on the server but "
+                        "responses were already received from it.",
+                        e,
+                    )
+                # Try a new ExecutePlan, and throw upstream for retry.
+                self._iterator = iter(
+                    self._stub.ExecutePlan(self._initial_request, 
metadata=self._metadata)
+                )
+                raise RetryException()
+            else:
+                raise e
+
     def _create_reattach_execute_request(self) -> pb2.ReattachExecuteRequest:
         reattach = pb2.ReattachExecuteRequest(
             session_id=self._initial_request.session_id,
@@ -269,3 +297,10 @@ class ExecutePlanResponseReattachableIterator(Generator):
 
     def __del__(self) -> None:
         return self.close()
+
+
+class RetryException(Exception):
+    """
+    An exception that can be thrown upstream when inside retry and which will 
be retryable
+    regardless of policy.
+    """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to