This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5b3123e41 [KYUUBI #2379][SUB-TASK][KPIP-4] Implement BatchesResource
DELETE /batches/${batchId}
5b3123e41 is described below
commit 5b3123e41cddcdac80478e2b91d02ee16d8b1453
Author: Fei Wang <[email protected]>
AuthorDate: Fri May 6 10:52:52 2022 +0800
[KYUUBI #2379][SUB-TASK][KPIP-4] Implement BatchesResource DELETE
/batches/${batchId}
### _Why are the changes needed?_
To close #2379
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
<img width="1185" alt="image"
src="https://user-images.githubusercontent.com/6757692/166098400-c977d9e2-c900-4f3b-a87c-bb9a6504124a.png">
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2528 from turboFei/kpip_4_2376_delete_batch.
Closes #2379
f099cbd6 [Fei Wang] comments
db3c4315 [Fei Wang] set to cluster mode
333e835d [Fei Wang] support proxy user
53d905d9 [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchesResource DELETE
/batches/${batchId}
Authored-by: Fei Wang <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/operation/BatchJobSubmission.scala | 6 ++-
.../kyuubi/server/api/v1/BatchesResource.scala | 50 +++++++++++++++++++++-
.../server/api/v1/BatchesResourceSuite.scala | 47 ++++++++++++++++++++
3 files changed, 101 insertions(+), 2 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index ce3b81a12..ca1ab10c4 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -26,7 +26,7 @@ import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{ApplicationOperation, ProcBuilder}
+import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse,
ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
@@ -70,6 +70,10 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
}
+ private[kyuubi] def killBatchApplication(): KillResponse = {
+ applicationManager.killApplication(builder.clusterManager(), batchId)
+ }
+
private val applicationCheckInterval =
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 4735afb11..32d5cd8bd 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.server.api.v1
import javax.ws.rs._
-import javax.ws.rs.core.MediaType
+import javax.ws.rs.core.{MediaType, Response}
import scala.util.control.NonFatal
@@ -31,6 +31,7 @@ import org.apache.kyuubi.Logging
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource.REST_BATCH_PROTOCOL
import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
+import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
@Tag(name = "Batch")
@@ -88,6 +89,53 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
fe.connectionUrl,
batchOp.getStatus.state.toString)
}
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description = "close a batch session")
+ @DELETE
+ @Path("{batchId}")
+ def closeBatchSession(
+ @PathParam("batchId") batchId: String,
+ @QueryParam("killApp") killApp: Boolean,
+ @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response =
{
+ var session: KyuubiBatchSessionImpl = null
+ try {
+ val sessionHandle = sessionManager.getBatchSessionHandle(batchId,
REST_BATCH_PROTOCOL)
+ session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ } catch {
+ case NonFatal(e) =>
+ error(s"Invalid batchId: $batchId", e)
+ throw new NotFoundException(s"Invalid batchId: $batchId")
+ }
+
+ val sessionConf = Option(hs2ProxyUser).filter(_.nonEmpty).map(proxyUser =>
+ Map(KyuubiAuthenticationFactory.HS2_PROXY_USER ->
proxyUser)).getOrElse(Map())
+
+ var userName: String = null
+ try {
+ userName = fe.getUserName(sessionConf)
+ } catch {
+ case t: Throwable =>
+ throw new NotAllowedException(t.getMessage)
+ }
+
+ if (!session.user.equals(userName)) {
+ throw new NotAllowedException(
+ s"$userName is not allowed to close the session belong to
${session.user}")
+ }
+
+ if (killApp) {
+ val killResponse = session.batchJobSubmissionOp.killBatchApplication()
+ sessionManager.closeSession(session.handle)
+ Response.ok().entity(killResponse).build()
+ } else {
+ sessionManager.closeSession(session.handle)
+ Response.ok().build()
+ }
+ }
}
object BatchesResource {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index fcfcc7777..e30a7f1fe 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -17,12 +17,15 @@
package org.apache.kyuubi.server.api.v1
+import java.util.Base64
+import java.util.UUID
import javax.ws.rs.client.Entity
import javax.ws.rs.core.MediaType
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL,
ENGINE_SPARK_MAX_LIFETIME}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -67,5 +70,49 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
.request(MediaType.APPLICATION_JSON_TYPE)
.get()
assert(404 == getBatchResponse.getStatus)
+
+ // invalid user name
+ val encodeAuthorization = new
String(Base64.getEncoder.encode(batch.id.getBytes()), "UTF-8")
+ var deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+ .queryParam("killApp", "true")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .delete()
+ assert(405 == deleteBatchResponse.getStatus)
+
+ // invalid batchId
+ deleteBatchResponse = webTarget.path(s"api/v1/batches/notValidUUID")
+ .queryParam("killApp", "true")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .delete()
+ assert(404 == deleteBatchResponse.getStatus)
+
+ // non-existed batch session
+ deleteBatchResponse =
webTarget.path(s"api/v1/batches/${UUID.randomUUID().toString}")
+ .queryParam("killApp", "true")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .delete()
+ assert(404 == deleteBatchResponse.getStatus)
+
+ // invalid proxy user
+ deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+ .queryParam("hive.server2.proxy.user", "invalidProxy")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .delete()
+ assert(405 == deleteBatchResponse.getStatus)
+
+ // killApp is true
+ deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+ .queryParam("killApp", "true")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .delete()
+ assert(200 == deleteBatchResponse.getStatus)
+ assert(deleteBatchResponse.hasEntity)
+
+ // close the closed batch session
+ deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .delete()
+ assert(404 == deleteBatchResponse.getStatus)
}
}