This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 147c83bf5 [KYUUBI #2376][SUB-TASK][KPIP-4] Implement BatchesResource 
GET /batches/${batchId}
147c83bf5 is described below

commit 147c83bf5532e202594ac97c220d53ef5396ddfe
Author: Fei Wang <[email protected]>
AuthorDate: Thu May 5 12:07:58 2022 +0800

    [KYUUBI #2376][SUB-TASK][KPIP-4] Implement BatchesResource GET 
/batches/${batchId}
    
    ### _Why are the changes needed?_
    
    To close #2376
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2527 from turboFei/kpip_4_2376_get_batch.
    
    Closes #2376
    
    ea1dbd6c [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchesResource GET 
/batches/${batchId}
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/server/api/v1/BatchesResource.scala     | 31 +++++++++++++++++++---
 .../kyuubi/session/KyuubiSessionManager.scala      |  4 +++
 .../server/api/v1/BatchesResourceSuite.scala       | 26 +++++++++++++-----
 3 files changed, 51 insertions(+), 10 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index bf30bc88a..4735afb11 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -17,9 +17,11 @@
 
 package org.apache.kyuubi.server.api.v1
 
-import javax.ws.rs.{Consumes, POST, Produces}
+import javax.ws.rs._
 import javax.ws.rs.core.MediaType
 
+import scala.util.control.NonFatal
+
 import io.swagger.v3.oas.annotations.media.Content
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
@@ -54,8 +56,31 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       ipAddress,
       Option(request.conf).getOrElse(Map()),
       request)
-    val batchOp = sessionManager.getSession(sessionHandle).asInstanceOf[
-      KyuubiBatchSessionImpl].batchJobSubmissionOp
+    val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+    buildBatch(session)
+  }
+
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON)),
+    description = "get the batch info via batch id")
+  @GET
+  @Path("{batchId}")
+  def batchInfo(@PathParam("batchId") batchId: String): Batch = {
+    try {
+      val sessionHandle = sessionManager.getBatchSessionHandle(batchId, 
REST_BATCH_PROTOCOL)
+      val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+      buildBatch(session)
+    } catch {
+      case NonFatal(e) =>
+        error(s"Invalid batchId: $batchId", e)
+        throw new NotFoundException(s"Invalid batchId: $batchId")
+    }
+  }
+
+  private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
+    val batchOp = session.batchJobSubmissionOp
     Batch(
       batchOp.batchId,
       batchOp.batchType,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 4d1b4d185..b147228ba 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -151,6 +151,10 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     SessionHandle(HandleIdentifier(UUID.randomUUID(), 
STATIC_BATCH_SECRET_UUID), protocol)
   }
 
+  def getBatchSessionHandle(batchId: String, protocol: TProtocolVersion): 
SessionHandle = {
+    SessionHandle(HandleIdentifier(UUID.fromString(batchId), 
STATIC_BATCH_SECRET_UUID), protocol)
+  }
+
   override def start(): Unit = synchronized {
     MetricsSystem.tracing { ms =>
       ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index cabf3e1bb..fcfcc7777 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -42,18 +42,30 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper {
     val response = webTarget.path("api/v1/batches")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
-
     assert(200 == response.getStatus)
-
-    val batch = response.readEntity(classOf[Batch])
+    var batch = response.readEntity(classOf[Batch])
     assert(batch.kyuubiInstance === fe.connectionUrl)
+    assert(batch.batchType === "spark")
 
-    val requestObj2 = requestObj.copy(conf = requestObj.conf ++
+    val proxyUserRequest = requestObj.copy(conf = requestObj.conf ++
       Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> "root"))
-    val response2 = webTarget.path("api/v1/batches")
+    val proxyUserResponse = webTarget.path("api/v1/batches")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(proxyUserRequest, MediaType.APPLICATION_JSON_TYPE))
+    assert(500 == proxyUserResponse.getStatus)
+
+    var getBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
       .request(MediaType.APPLICATION_JSON_TYPE)
-      .post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE))
+      .get()
+    assert(200 == getBatchResponse.getStatus)
+    batch = getBatchResponse.readEntity(classOf[Batch])
+    assert(batch.kyuubiInstance === fe.connectionUrl)
+    assert(batch.batchType === "spark")
 
-    assert(500 == response2.getStatus)
+    // invalid batchId
+    getBatchResponse = webTarget.path(s"api/v1/batches/invalidBatchId")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .get()
+    assert(404 == getBatchResponse.getStatus)
   }
 }

Reply via email to