This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f11c7ab [KYUUBI #4981] Refactor code of closeBatchSession
13f11c7ab is described below

commit 13f11c7ab4d99894eafc6dfa9fba7228016632db
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jun 20 16:36:27 2023 +0800

    [KYUUBI #4981] Refactor code of closeBatchSession
    
    ### _Why are the changes needed?_
    
    This PR aims to refactor the method `closeBatchSession`, by
    
    - extracting sub-methods `checkPermission` and `forceKill`
    - explicitly handling redirection on `metadata.kyuubiInstance != 
fe.connectionUrl` to make the logic clearer
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4981 from pan3793/cancel.
    
    Closes #4981
    
    0d2e85acc [Cheng Pan] Refactor code of closeBatchSession
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/server/api/v1/BatchesResource.scala     | 59 +++++++++++-----------
 1 file changed, 30 insertions(+), 29 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 8271c8b88..cb0c63be0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -40,7 +40,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys._
-import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
+import org.apache.kyuubi.engine.{ApplicationInfo, KillResponse, 
KyuubiApplicationManager}
 import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, 
OperationState}
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource._
@@ -398,29 +398,37 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
   def closeBatchSession(
       @PathParam("batchId") batchId: String,
       @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): 
CloseBatchResponse = {
-    val sessionHandle = formatSessionHandle(batchId)
-
-    val userName = fe.getSessionUser(hs2ProxyUser)
 
-    sessionManager.getBatchSession(sessionHandle).map { batchSession =>
-      if (userName != batchSession.user) {
+    def checkPermission(operator: String, owner: String): Unit = {
+      if (operator != owner) {
         throw new WebApplicationException(
-          s"$userName is not allowed to close the session belong to 
${batchSession.user}",
+          s"$operator is not allowed to close the session belong to $owner",
           Status.METHOD_NOT_ALLOWED)
       }
+    }
+
+    def forceKill(clusterManager: Option[String], batchId: String): 
KillResponse = {
+      val (killed, message) = sessionManager.applicationManager
+        .killApplication(clusterManager, batchId)
+      info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
+      sessionManager.updateMetadata(Metadata(identifier = batchId, 
peerInstanceClosed = true))
+      (killed, message)
+    }
+
+    val sessionHandle = formatSessionHandle(batchId)
+    val userName = fe.getSessionUser(hs2ProxyUser)
+
+    sessionManager.getBatchSession(sessionHandle).map { batchSession =>
+      checkPermission(userName, batchSession.user)
       sessionManager.closeSession(batchSession.handle)
-      val (success, msg) = batchSession.batchJobSubmissionOp.getKillMessage
-      new CloseBatchResponse(success, msg)
+      val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
+      new CloseBatchResponse(killed, msg)
     }.getOrElse {
       sessionManager.getBatchMetadata(batchId).map { metadata =>
-        if (userName != metadata.username) {
-          throw new WebApplicationException(
-            s"$userName is not allowed to close the session belong to 
${metadata.username}",
-            Status.METHOD_NOT_ALLOWED)
-        } else if 
(OperationState.isTerminal(OperationState.withName(metadata.state)) ||
-          metadata.kyuubiInstance == fe.connectionUrl) {
+        checkPermission(userName, metadata.username)
+        if 
(OperationState.isTerminal(OperationState.withName(metadata.state))) {
           new CloseBatchResponse(false, s"The batch[$metadata] has been 
terminated.")
-        } else {
+        } else if (metadata.kyuubiInstance != fe.connectionUrl) {
           info(s"Redirecting delete batch[$batchId] to 
${metadata.kyuubiInstance}")
           val internalRestClient = 
getInternalRestClient(metadata.kyuubiInstance)
           try {
@@ -428,20 +436,13 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
           } catch {
             case e: KyuubiRestException =>
               error(s"Error redirecting delete batch[$batchId] to 
${metadata.kyuubiInstance}", e)
-              val appMgrKillResp = 
sessionManager.applicationManager.killApplication(
-                metadata.clusterManager,
-                batchId)
-              info(
-                s"Marking batch[$batchId/${metadata.kyuubiInstance}] closed by 
${fe.connectionUrl}")
-              sessionManager.updateMetadata(Metadata(
-                identifier = batchId,
-                peerInstanceClosed = true))
-              if (appMgrKillResp._1) {
-                new CloseBatchResponse(appMgrKillResp._1, appMgrKillResp._2)
-              } else {
-                new CloseBatchResponse(false, Utils.stringifyException(e))
-              }
+              val (killed, msg) = forceKill(metadata.clusterManager, batchId)
+              new CloseBatchResponse(killed, if (killed) msg else 
Utils.stringifyException(e))
           }
+        } else { // should not happen, but handle this for safe
+          warn(s"Something wrong on deleting batch[$batchId], try forcibly 
killing application")
+          val (killed, msg) = forceKill(metadata.clusterManager, batchId)
+          new CloseBatchResponse(killed, msg)
         }
       }.getOrElse {
         error(s"Invalid batchId: $batchId")

Reply via email to