This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b3123e41 [KYUUBI #2379][SUB-TASK][KPIP-4] Implement BatchesResource 
DELETE /batches/${batchId}
5b3123e41 is described below

commit 5b3123e41cddcdac80478e2b91d02ee16d8b1453
Author: Fei Wang <[email protected]>
AuthorDate: Fri May 6 10:52:52 2022 +0800

    [KYUUBI #2379][SUB-TASK][KPIP-4] Implement BatchesResource DELETE 
/batches/${batchId}
    
    ### _Why are the changes needed?_
    
    To close #2379
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    <img width="1185" alt="image" 
src="https://user-images.githubusercontent.com/6757692/166098400-c977d9e2-c900-4f3b-a87c-bb9a6504124a.png";>
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2528 from turboFei/kpip_4_2376_delete_batch.
    
    Closes #2379
    
    f099cbd6 [Fei Wang] comments
    db3c4315 [Fei Wang] set to cluster mode
    333e835d [Fei Wang] support proxy user
    53d905d9 [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchesResource DELETE 
/batches/${batchId}
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/operation/BatchJobSubmission.scala      |  6 ++-
 .../kyuubi/server/api/v1/BatchesResource.scala     | 50 +++++++++++++++++++++-
 .../server/api/v1/BatchesResourceSuite.scala       | 47 ++++++++++++++++++++
 3 files changed, 101 insertions(+), 2 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index ce3b81a12..ca1ab10c4 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -26,7 +26,7 @@ import org.apache.hive.service.rpc.thrift._
 
 import org.apache.kyuubi.KyuubiException
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{ApplicationOperation, ProcBuilder}
+import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, 
ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.log.OperationLog
@@ -70,6 +70,10 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
     applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
   }
 
+  private[kyuubi] def killBatchApplication(): KillResponse = {
+    applicationManager.killApplication(builder.clusterManager(), batchId)
+  }
+
   private val applicationCheckInterval =
     session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 4735afb11..32d5cd8bd 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.server.api.v1
 
 import javax.ws.rs._
-import javax.ws.rs.core.MediaType
+import javax.ws.rs.core.{MediaType, Response}
 
 import scala.util.control.NonFatal
 
@@ -31,6 +31,7 @@ import org.apache.kyuubi.Logging
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource.REST_BATCH_PROTOCOL
 import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
+import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
 import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
 
 @Tag(name = "Batch")
@@ -88,6 +89,53 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       fe.connectionUrl,
       batchOp.getStatus.state.toString)
   }
+
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON)),
+    description = "close a batch session")
+  @DELETE
+  @Path("{batchId}")
+  def closeBatchSession(
+      @PathParam("batchId") batchId: String,
+      @QueryParam("killApp") killApp: Boolean,
+      @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = 
{
+    var session: KyuubiBatchSessionImpl = null
+    try {
+      val sessionHandle = sessionManager.getBatchSessionHandle(batchId, 
REST_BATCH_PROTOCOL)
+      session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+    } catch {
+      case NonFatal(e) =>
+        error(s"Invalid batchId: $batchId", e)
+        throw new NotFoundException(s"Invalid batchId: $batchId")
+    }
+
+    val sessionConf = Option(hs2ProxyUser).filter(_.nonEmpty).map(proxyUser =>
+      Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> 
proxyUser)).getOrElse(Map())
+
+    var userName: String = null
+    try {
+      userName = fe.getUserName(sessionConf)
+    } catch {
+      case t: Throwable =>
+        throw new NotAllowedException(t.getMessage)
+    }
+
+    if (!session.user.equals(userName)) {
+      throw new NotAllowedException(
+        s"$userName is not allowed to close the session belong to 
${session.user}")
+    }
+
+    if (killApp) {
+      val killResponse = session.batchJobSubmissionOp.killBatchApplication()
+      sessionManager.closeSession(session.handle)
+      Response.ok().entity(killResponse).build()
+    } else {
+      sessionManager.closeSession(session.handle)
+      Response.ok().build()
+    }
+  }
 }
 
 object BatchesResource {
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index fcfcc7777..e30a7f1fe 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.kyuubi.server.api.v1
 
+import java.util.Base64
+import java.util.UUID
 import javax.ws.rs.client.Entity
 import javax.ws.rs.core.MediaType
 
 import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, 
ENGINE_SPARK_MAX_LIFETIME}
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import 
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
 
 class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -67,5 +70,49 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper {
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
     assert(404 == getBatchResponse.getStatus)
+
+    // invalid user name
+    val encodeAuthorization = new 
String(Base64.getEncoder.encode(batch.id.getBytes()), "UTF-8")
+    var deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+      .queryParam("killApp", "true")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .delete()
+    assert(405 == deleteBatchResponse.getStatus)
+
+    // invalid batchId
+    deleteBatchResponse = webTarget.path(s"api/v1/batches/notValidUUID")
+      .queryParam("killApp", "true")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    assert(404 == deleteBatchResponse.getStatus)
+
+    // non-existed batch session
+    deleteBatchResponse = 
webTarget.path(s"api/v1/batches/${UUID.randomUUID().toString}")
+      .queryParam("killApp", "true")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    assert(404 == deleteBatchResponse.getStatus)
+
+    // invalid proxy user
+    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+      .queryParam("hive.server2.proxy.user", "invalidProxy")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    assert(405 == deleteBatchResponse.getStatus)
+
+    // killApp is true
+    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+      .queryParam("killApp", "true")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    assert(200 == deleteBatchResponse.getStatus)
+    assert(deleteBatchResponse.hasEntity)
+
+    // close the closed batch session
+    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    assert(404 == deleteBatchResponse.getStatus)
   }
 }

Reply via email to