This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 147c83bf5 [KYUUBI #2376][SUB-TASK][KPIP-4] Implement BatchesResource
GET /batches/${batchId}
147c83bf5 is described below
commit 147c83bf5532e202594ac97c220d53ef5396ddfe
Author: Fei Wang <[email protected]>
AuthorDate: Thu May 5 12:07:58 2022 +0800
[KYUUBI #2376][SUB-TASK][KPIP-4] Implement BatchesResource GET
/batches/${batchId}
### _Why are the changes needed?_
To close #2376
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2527 from turboFei/kpip_4_2376_get_batch.
Closes #2376
ea1dbd6c [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchesResource GET
/batches/${batchId}
Authored-by: Fei Wang <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/server/api/v1/BatchesResource.scala | 31 +++++++++++++++++++---
.../kyuubi/session/KyuubiSessionManager.scala | 4 +++
.../server/api/v1/BatchesResourceSuite.scala | 26 +++++++++++++-----
3 files changed, 51 insertions(+), 10 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index bf30bc88a..4735afb11 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -17,9 +17,11 @@
package org.apache.kyuubi.server.api.v1
-import javax.ws.rs.{Consumes, POST, Produces}
+import javax.ws.rs._
import javax.ws.rs.core.MediaType
+import scala.util.control.NonFatal
+
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
@@ -54,8 +56,31 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
ipAddress,
Option(request.conf).getOrElse(Map()),
request)
- val batchOp = sessionManager.getSession(sessionHandle).asInstanceOf[
- KyuubiBatchSessionImpl].batchJobSubmissionOp
+ val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ buildBatch(session)
+ }
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description = "get the batch info via batch id")
+ @GET
+ @Path("{batchId}")
+ def batchInfo(@PathParam("batchId") batchId: String): Batch = {
+ try {
+ val sessionHandle = sessionManager.getBatchSessionHandle(batchId,
REST_BATCH_PROTOCOL)
+ val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ buildBatch(session)
+ } catch {
+ case NonFatal(e) =>
+ error(s"Invalid batchId: $batchId", e)
+ throw new NotFoundException(s"Invalid batchId: $batchId")
+ }
+ }
+
+ private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
+ val batchOp = session.batchJobSubmissionOp
Batch(
batchOp.batchId,
batchOp.batchType,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 4d1b4d185..b147228ba 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -151,6 +151,10 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
SessionHandle(HandleIdentifier(UUID.randomUUID(),
STATIC_BATCH_SECRET_UUID), protocol)
}
+ def getBatchSessionHandle(batchId: String, protocol: TProtocolVersion):
SessionHandle = {
+ SessionHandle(HandleIdentifier(UUID.fromString(batchId),
STATIC_BATCH_SECRET_UUID), protocol)
+ }
+
override def start(): Unit = synchronized {
MetricsSystem.tracing { ms =>
ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index cabf3e1bb..fcfcc7777 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -42,18 +42,30 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
val response = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
-
assert(200 == response.getStatus)
-
- val batch = response.readEntity(classOf[Batch])
+ var batch = response.readEntity(classOf[Batch])
assert(batch.kyuubiInstance === fe.connectionUrl)
+ assert(batch.batchType === "spark")
- val requestObj2 = requestObj.copy(conf = requestObj.conf ++
+ val proxyUserRequest = requestObj.copy(conf = requestObj.conf ++
Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> "root"))
- val response2 = webTarget.path("api/v1/batches")
+ val proxyUserResponse = webTarget.path("api/v1/batches")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(proxyUserRequest, MediaType.APPLICATION_JSON_TYPE))
+ assert(500 == proxyUserResponse.getStatus)
+
+ var getBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")
.request(MediaType.APPLICATION_JSON_TYPE)
- .post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE))
+ .get()
+ assert(200 == getBatchResponse.getStatus)
+ batch = getBatchResponse.readEntity(classOf[Batch])
+ assert(batch.kyuubiInstance === fe.connectionUrl)
+ assert(batch.batchType === "spark")
- assert(500 == response2.getStatus)
+ // invalid batchId
+ getBatchResponse = webTarget.path(s"api/v1/batches/invalidBatchId")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .get()
+ assert(404 == getBatchResponse.getStatus)
}
}