This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2d2bedf4aa16 [SPARK-48056][CONNECT][FOLLOW-UP] Scala Client re-execute
plan if a SESSION_NOT_FOUND error is raised and no partial response was received
2d2bedf4aa16 is described below
commit 2d2bedf4aa16adfc2f45c192c4b7b954788b3acd
Author: Changgyoo Park <[email protected]>
AuthorDate: Fri Jun 14 16:31:37 2024 +0800
[SPARK-48056][CONNECT][FOLLOW-UP] Scala Client re-execute plan if a
SESSION_NOT_FOUND error is raised and no partial response was received
### What changes were proposed in this pull request?
This change lets a Scala Spark Connect client reattempt execution of a plan
when it receives a SESSION_NOT_FOUND error from the Spark Connect service if it
has not received any partial responses.
This is a Scala version of the previous fix of the same issue -
https://github.com/apache/spark/pull/46297.
### Why are the changes needed?
Spark Connect clients often get a spurious error from the Spark Connect
service if the service is busy or the network is congested. This error leads to
a situation where the client immediately attempts to reattach without the
service being aware of the client; this leads to a query failure.
### Does this PR introduce _any_ user-facing change?
Prevoiusly, a Scala Spark Connect client would fail with the error code
"INVALID_HANDLE.SESSION_NOT_FOUND" in the very first attempt to make a request
to the service, but with this change, the client will automatically retry.
### How was this patch tested?
Attached unit test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46971 from changgyoopark-db/SPARK-48056.
Authored-by: Changgyoo Park <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../connect/client/SparkConnectClientSuite.scala | 28 ++++++++++++++++++++++
.../ExecutePlanResponseReattachableIterator.scala | 14 +++++++----
2 files changed, 38 insertions(+), 4 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 55f962b2a52c..46aeaeff43d2 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -530,6 +530,25 @@ class SparkConnectClientSuite extends ConnectFunSuite with
BeforeAndAfterEach {
assert(reattachableIter.resultComplete)
}
+ test("SPARK-48056: Client execute gets INVALID_HANDLE.SESSION_NOT_FOUND and
proceeds") {
+ startDummyServer(0)
+ client = SparkConnectClient
+ .builder()
+ .connectionString(s"sc://localhost:${server.getPort}")
+ .enableReattachableExecute()
+ .build()
+ service.errorToThrowOnExecute = Some(
+ new StatusRuntimeException(
+ Status.INTERNAL.withDescription("INVALID_HANDLE.SESSION_NOT_FOUND")))
+
+ val plan = buildPlan("select * from range(1)")
+ val iter = client.execute(plan)
+ val reattachableIter =
+ ExecutePlanResponseReattachableIterator.fromIterator(iter)
+ reattachableIter.foreach(_ => ())
+ assert(reattachableIter.resultComplete)
+ }
+
test("GRPC stub unary call throws error immediately") {
// Spark Connect error retry handling depends on the error being returned
from the unary
// call immediately.
@@ -609,6 +628,8 @@ class DummySparkConnectService() extends
SparkConnectServiceGrpc.SparkConnectSer
private val inputArtifactRequests: mutable.ListBuffer[AddArtifactsRequest] =
mutable.ListBuffer.empty
+ var errorToThrowOnExecute: Option[Throwable] = None
+
private[sql] def getAndClearLatestInputPlan(): proto.Plan = {
val plan = inputPlan
inputPlan = null
@@ -624,6 +645,13 @@ class DummySparkConnectService() extends
SparkConnectServiceGrpc.SparkConnectSer
override def executePlan(
request: ExecutePlanRequest,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+ if (errorToThrowOnExecute.isDefined) {
+ val error = errorToThrowOnExecute.get
+ errorToThrowOnExecute = None
+ responseObserver.onError(error)
+ return
+ }
+
// Reply with a dummy response using the same client ID
val requestSessionId = request.getSessionId
val operationId = if (request.hasOperationId) {
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 74f13272a365..f3c13c9c2c4d 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -42,7 +42,8 @@ import
org.apache.spark.sql.connect.client.GrpcRetryHandler.RetryException
* ReattachExecute request. ReattachExecute request is provided the responseId
of last returned
* ExecutePlanResponse on the iterator to return a new iterator from server
that continues after
* that. If the initial ExecutePlan did not even reach the server, and hence
reattach fails with
- * INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
+ * INVALID_HANDLE.OPERATION_NOT_FOUND or INVALID_HANDLE.SESSION_NOT_FOUND, we
attempt to retry
+ * ExecutePlan.
*
* In reattachable execute the server does buffer some responses in case the
client needs to
* backtrack. To let server release this buffer sooner, this iterator
asynchronously sends
@@ -66,7 +67,8 @@ class ExecutePlanResponseReattachableIterator(
// Add operation id, if not present.
// with operationId set by the client, the client can use it to try to
reattach on error
// even before getting the first response. If the operation in fact didn't
even reach the
- // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND error.
+ // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND or
+ // INVALID_HANDLE.SESSION_NOT_FOUND error.
UUID.randomUUID.toString
}
@@ -234,10 +236,14 @@ class ExecutePlanResponseReattachableIterator(
} catch {
case ex: StatusRuntimeException
if Option(StatusProto.fromThrowable(ex))
-
.exists(_.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND")) =>
+ .exists(ex => {
+ ex.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND") ||
+ ex.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND")
+ }) =>
if (lastReturnedResponseId.isDefined) {
throw new IllegalStateException(
- "OPERATION_NOT_FOUND on the server but responses were already
received from it.",
+ "OPERATION_NOT_FOUND/SESSION_NOT_FOUND on the server but responses
were already " +
+ "received from it.",
ex)
}
// Try a new ExecutePlan, and throw upstream for retry.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]