This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2c8ea75d92ee [SPARK-46660][CONNECT] ReattachExecute requests updates aliveness of SessionHolder 2c8ea75d92ee is described below commit 2c8ea75d92ee49284a0eb2aaecb010d1112d5bef Author: vicennial <venkata.gud...@databricks.com> AuthorDate: Mon Jan 15 17:35:26 2024 +0900 [SPARK-46660][CONNECT] ReattachExecute requests updates aliveness of SessionHolder ### What changes were proposed in this pull request? This PR makes `SparkConnectReattachExecuteHandler` fetch the `ExecuteHolder` via the`SessionHolder` which in turn refreshes it's aliveness. Further, this makes it consistent with `SparkConnectReleaseExecuteHandler`. ### Why are the changes needed? Currently in ReattachExecute, we fetch the ExecuteHolder directly without going through the SessionHolder and hence the "aliveness" of the SessionHolder is not refreshed. This would result in long-running queries (which do not send `ReleaseExecute` requests in specific) failing because the `SessionHolder` would expire from the cache during an active query execution. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the bug where long-running may fail when their corresponding `SessionHolder` is expired during active query execution. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44670 from vicennial/SPARK-46660. Authored-by: vicennial <venkata.gud...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../SparkConnectReattachExecuteHandler.scala | 34 +++++++++++----------- .../execution/ReattachableExecuteSuite.scala | 19 ++++++++++++ .../service/SparkConnectServiceE2ESuite.scala | 12 ++++---- .../sql/tests/connect/client/test_reattach.py | 14 ++++----- 4 files changed, 47 insertions(+), 32 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala index 393b832de878..ecad8c9c73a1 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala @@ -29,25 +29,25 @@ class SparkConnectReattachExecuteHandler( extends Logging { def handle(v: proto.ReattachExecuteRequest): Unit = { - val executeHolder = SparkConnectService.executionManager - .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, v.getOperationId)) - .getOrElse { - if (SparkConnectService.executionManager - .getAbandonedTombstone( - ExecuteKey(v.getUserContext.getUserId, v.getSessionId, v.getOperationId)) - .isDefined) { - logDebug(s"Reattach operation abandoned: ${v.getOperationId}") - throw new SparkSQLException( - errorClass = "INVALID_HANDLE.OPERATION_ABANDONED", - messageParameters = Map("handle" -> v.getOperationId)) + val sessionHolder = SparkConnectService.sessionManager + .getIsolatedSession(SessionKey(v.getUserContext.getUserId, v.getSessionId)) - } else { - logDebug(s"Reattach operation not found: ${v.getOperationId}") - throw new SparkSQLException( - errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND", - messageParameters = Map("handle" -> v.getOperationId)) - } + val executeHolder = sessionHolder.executeHolder(v.getOperationId).getOrElse { + if (SparkConnectService.executionManager + .getAbandonedTombstone( + ExecuteKey(v.getUserContext.getUserId, v.getSessionId, v.getOperationId)) + .isDefined) { + logDebug(s"Reattach operation abandoned: ${v.getOperationId}") + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.OPERATION_ABANDONED", + messageParameters = Map("handle" -> v.getOperationId)) + } else { + logDebug(s"Reattach operation not found: ${v.getOperationId}") + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND", + messageParameters = Map("handle" -> v.getOperationId)) } + } if (!executeHolder.reattachable) { logWarning(s"Reattach to not reattachable operation.") throw new SparkSQLException( diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala index f80229c61980..cf1a2d5032af 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala @@ -394,4 +394,23 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { assertEventuallyNoActiveExecutions() } } + + test("SPARK-46660: reattach updates aliveness of session holder") { + withRawBlockingStub { stub => + val operationId = UUID.randomUUID().toString + val iter = stub.executePlan( + buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId)) + iter.next() // open the iterator, guarantees that the RPC reached the server + + val executionHolder = getExecutionHolder + val lastAccessTime = executionHolder.sessionHolder.getSessionHolderInfo.lastAccessTimeMs + + // send reattach + val iter2 = stub.reattachExecute(buildReattachExecuteRequest(operationId, None)) + iter2.next() // open the iterator, guarantees that the RPC reached the server + val newAccessTime = executionHolder.sessionHolder.getSessionHolderInfo.lastAccessTimeMs + + assert(newAccessTime > lastAccessTime, "reattach should update session holder access time") + } + } } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala index c0b7eaf5823d..7776148077fc 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala @@ -66,22 +66,22 @@ class SparkConnectServiceE2ESuite extends SparkConnectServerTest { // query1 and query2 could get either an: // OPERATION_CANCELED if it happens fast - when closing the session interrupted the queries, // and that error got pushed to the client buffers before the client got disconnected. - // OPERATION_ABANDONED if it happens slow - when closing the session interrupted the client - // RPCs before it pushed out the error above. The client would then get an + // INVALID_HANDLE.SESSION_CLOSED if it happens slow - when closing the session interrupted the + // client RPCs before it pushed out the error above. The client would then get an // INVALID_CURSOR.DISCONNECTED, which it will retry with a ReattachExecute, and then get an - // INVALID_HANDLE.OPERATION_ABANDONED. + // INVALID_HANDLE.SESSION_CLOSED. val query1Error = intercept[SparkException] { while (query1.hasNext) query1.next() } assert( query1Error.getMessage.contains("OPERATION_CANCELED") || - query1Error.getMessage.contains("INVALID_HANDLE.OPERATION_ABANDONED")) + query1Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED")) val query2Error = intercept[SparkException] { while (query2.hasNext) query2.next() } assert( query2Error.getMessage.contains("OPERATION_CANCELED") || - query2Error.getMessage.contains("INVALID_HANDLE.OPERATION_ABANDONED")) + query2Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED")) // No other requests should be allowed in the session, failing with SESSION_CLOSED val requestError = intercept[SparkException] { @@ -115,7 +115,7 @@ class SparkConnectServiceE2ESuite extends SparkConnectServerTest { } assert( queryAError.getMessage.contains("OPERATION_CANCELED") || - queryAError.getMessage.contains("INVALID_HANDLE.OPERATION_ABANDONED")) + queryAError.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED")) // B's query can run. while (queryB.hasNext) queryB.next() diff --git a/python/pyspark/sql/tests/connect/client/test_reattach.py b/python/pyspark/sql/tests/connect/client/test_reattach.py index 5f2cb3c49377..cea0be7008cc 100644 --- a/python/pyspark/sql/tests/connect/client/test_reattach.py +++ b/python/pyspark/sql/tests/connect/client/test_reattach.py @@ -84,10 +84,10 @@ class SparkConnectReattachTestCase(ReusedConnectTestCase, SQLTestUtils, PandasOn # query1 and query2 could get either an: # OPERATION_CANCELED if it happens fast - when closing the session interrupted the queries, # and that error got pushed to the client buffers before the client got disconnected. - # OPERATION_ABANDONED if it happens slow - when closing the session interrupted the client - # RPCs before it pushed out the error above. The client would then get an + # INVALID_HANDLE.SESSION_CLOSED if it happens slow - when closing the session interrupted + # the client RPCs before it pushed out the error above. The client would then get an # INVALID_CURSOR.DISCONNECTED, which it will retry with a ReattachExecute, and then get an - # INVALID_HANDLE.OPERATION_ABANDONED. + # INVALID_HANDLE.SESSION_CLOSED. def check_error(q): try: @@ -97,14 +97,10 @@ class SparkConnectReattachTestCase(ReusedConnectTestCase, SQLTestUtils, PandasOn e = check_error(query1) self.assertIsNotNone(e, "An exception has to be thrown") - self.assertTrue( - "OPERATION_CANCELED" in str(e) or "INVALID_HANDLE.OPERATION_ABANDONED" in str(e) - ) + self.assertTrue("OPERATION_CANCELED" in str(e) or "INVALID_HANDLE.SESSION_CLOSED" in str(e)) e = check_error(query2) self.assertIsNotNone(e, "An exception has to be thrown") - self.assertTrue( - "OPERATION_CANCELED" in str(e) or "INVALID_HANDLE.OPERATION_ABANDONED" in str(e) - ) + self.assertTrue("OPERATION_CANCELED" in str(e) or "INVALID_HANDLE.SESSION_CLOSED" in str(e)) # query3 has not been submitted before, so it should now fail with SESSION_CLOSED e = check_error(query3) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org