This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 04558fc90fe [SPARK-44833][CONNECT] Fix sending Reattach too fast after
Execute
04558fc90fe is described below
commit 04558fc90fe2df2d2791c98c5f7b15ee26e250eb
Author: Juliusz Sompolski <[email protected]>
AuthorDate: Wed Sep 6 14:21:47 2023 +0900
[SPARK-44833][CONNECT] Fix sending Reattach too fast after Execute
### What changes were proposed in this pull request?
Redo the retry logic, so that getting a new iterator via ReattachExecute
does not depend on "firstTry", but there is logic in "callIter" with unsetting
the iterator when a new one is needed.
### Why are the changes needed?
After an "INVALID_HANDLE.OPERATION_NOT_FOUND" error, client would realize
that the failure in ReattachExecute was because the initial ExecutePlan didn't
reach the server. It would then call another ExecutePlan, and it will throw a
RetryException to let the retry logic handle retrying. However, the retry logic
would then immediately send a ReattachExecute, and the client will want to use
the iterator of the reattach.
However, on the server the ExecutePlan and ReattachExecute could race with
each other:
* ExecutePlan didn't reach
executeHolder.runGrpcResponseSender(responseSender) in
SparkConnectExecutePlanHandler yet.
* ReattachExecute races around and reaches
executeHolder.runGrpcResponseSender(responseSender) in
SparkConnectReattachExecuteHandler first.
* When ExecutePlan reaches
executeHolder.runGrpcResponseSender(responseSender), and
executionObserver.attachConsumer(this) is called in ExecuteGrpcResponseSender
of ExecutePlan, it will kick out the ExecuteGrpcResponseSender of
ReattachExecute.
So even though ReattachExecute came later, it will get interrupted by the
earlier ExecutePlan and finish with a INVALID_CURSOR.DISCONNECTED error.
After this change, such a race between ExecutePlan / ReattachExecute can
still happens, but the client should no longer send these requests in such
quick succession.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Integration testing.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42806 from juliuszsompolski/SPARK-44833.
Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit e4d17e9a1fb64454a6a007171837d159633e91fb)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../ExecutePlanResponseReattachableIterator.scala | 33 ++++++++--------------
python/pyspark/sql/connect/client/reattach.py | 31 ++++++++++----------
2 files changed, 28 insertions(+), 36 deletions(-)
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index aeb452faecf..9bf7de33da8 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -91,8 +91,8 @@ class ExecutePlanResponseReattachableIterator(
// Initial iterator comes from ExecutePlan request.
// Note: This is not retried, because no error would ever be thrown here,
and GRPC will only
// throw error on first iter.hasNext() or iter.next()
- private var iter: java.util.Iterator[proto.ExecutePlanResponse] =
- rawBlockingStub.executePlan(initialRequest)
+ private var iter: Option[java.util.Iterator[proto.ExecutePlanResponse]] =
+ Some(rawBlockingStub.executePlan(initialRequest))
override def next(): proto.ExecutePlanResponse = synchronized {
// hasNext will trigger reattach in case the stream completed without
resultComplete
@@ -102,15 +102,7 @@ class ExecutePlanResponseReattachableIterator(
try {
// Get next response, possibly triggering reattach in case of stream
error.
- var firstTry = true
val ret = retry {
- if (firstTry) {
- // on first try, we use the existing iter.
- firstTry = false
- } else {
- // on retry, the iter is borked, so we need a new one
- iter =
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
- }
callIter(_.next())
}
@@ -134,23 +126,15 @@ class ExecutePlanResponseReattachableIterator(
// After response complete response
return false
}
- var firstTry = true
try {
retry {
- if (firstTry) {
- // on first try, we use the existing iter.
- firstTry = false
- } else {
- // on retry, the iter is borked, so we need a new one
- iter =
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
- }
var hasNext = callIter(_.hasNext())
// Graceful reattach:
// If iter ended, but there was no ResultComplete, it means that there
is more,
// and we need to reattach.
if (!hasNext && !resultComplete) {
do {
- iter =
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+ iter = None // unset iterator for new ReattachExecute to be called
in _call_iter
assert(!resultComplete) // shouldn't change...
hasNext = callIter(_.hasNext())
// It's possible that the new iter will be empty, so we need to
loop to get another.
@@ -208,7 +192,10 @@ class ExecutePlanResponseReattachableIterator(
*/
private def callIter[V](iterFun:
java.util.Iterator[proto.ExecutePlanResponse] => V) = {
try {
- iterFun(iter)
+ if (iter.isEmpty) {
+ iter =
Some(rawBlockingStub.reattachExecute(createReattachExecuteRequest()))
+ }
+ iterFun(iter.get)
} catch {
case ex: StatusRuntimeException
if Option(StatusProto.fromThrowable(ex))
@@ -219,8 +206,12 @@ class ExecutePlanResponseReattachableIterator(
ex)
}
// Try a new ExecutePlan, and throw upstream for retry.
- iter = rawBlockingStub.executePlan(initialRequest)
+ iter = Some(rawBlockingStub.executePlan(initialRequest))
throw new GrpcRetryHandler.RetryException
+ case NonFatal(e) =>
+ // Remove the iterator, so that a new one will be created after retry.
+ iter = None
+ throw e
}
}
diff --git a/python/pyspark/sql/connect/client/reattach.py
b/python/pyspark/sql/connect/client/reattach.py
index c6b1beaa121..d3765fb6696 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -131,15 +131,6 @@ class ExecutePlanResponseReattachableIterator(Generator):
can_retry=SparkConnectClient.retry_exception,
**self._retry_policy
):
with attempt:
- # on first try, we use the existing iterator.
- if not attempt.is_first_try():
- # on retry, the iterator is borked, so we need a
new one
- self._iterator = iter(
- self._stub.ReattachExecute(
- self._create_reattach_execute_request(),
metadata=self._metadata
- )
- )
-
if self._current is None:
try:
self._current = self._call_iter(lambda:
next(self._iterator))
@@ -154,12 +145,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
# arrive, we keep reattaching.
if not self._result_complete and not has_next:
while not has_next:
- self._iterator = iter(
- self._stub.ReattachExecute(
-
self._create_reattach_execute_request(),
- metadata=self._metadata,
- )
- )
+ # unset iterator for new ReattachExecute to be
called in _call_iter
+ self._iterator = None
# shouldn't change
assert not self._result_complete
try:
@@ -238,6 +225,14 @@ class ExecutePlanResponseReattachableIterator(Generator):
Called inside retry block, so retryable failure will get handled
upstream.
"""
+ if self._iterator is None:
+ # we get a new iterator with ReattachExecute if it was unset.
+ self._iterator = iter(
+ self._stub.ReattachExecute(
+ self._create_reattach_execute_request(),
metadata=self._metadata
+ )
+ )
+
try:
return iter_fun()
except grpc.RpcError as e:
@@ -255,7 +250,13 @@ class ExecutePlanResponseReattachableIterator(Generator):
)
raise RetryException()
else:
+ # Remove the iterator, so that a new one will be created after
retry.
+ self._iterator = None
raise e
+ except Exception as e:
+ # Remove the iterator, so that a new one will be created after
retry.
+ self._iterator = None
+ raise e
def _create_reattach_execute_request(self) -> pb2.ReattachExecuteRequest:
reattach = pb2.ReattachExecuteRequest(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]