This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 178c39e [KYUUBI #1351] Implement api:
/${version}/sessions/${identifier}/operations
178c39e is described below
commit 178c39e221e81b9dc6797b58a03d81285e9a635e
Author: simon <[email protected]>
AuthorDate: Tue Nov 16 19:55:36 2021 +0800
[KYUUBI #1351] Implement api: /${version}/sessions/${identifier}/operations
### _Why are the changes needed?_
This is a subtask of umbrella issue #KPIP-1
- /${version}/sessions/${identifier}/operations
- mapping:
- ICLIService#executeStatement
- ICLIService#executeStatementAsync
- ICLIService#getTypeInfo
- ICLIService#getCatelogs
- ICLIService#getSchemas
- ICLIService#getTables
- ICLIService#getTableTypes
- ICLIService#getColumns
- ICLIService#getFunctions
- desc: create a new operation under a session
- method: POST
- params:
- operation: e.g. GetTableTypes
- addition: required params to create an Operation
- returns: an instance of OperationHandle
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1353 from simon824/r1.
Closes #1351
98ffcda4 [simon] rename
7c0c3b40 [simon] ut fix
586a0b8a [simon] rename url
1ea6f322 [simon] codestyle
48b5d79d [simon] fix
543caf3c [simon] separate
0ff30af2 [simon] codestyle
203cb164 [simon] Merge remote-tracking branch 'upstream/master' into r1
e578cb42 [simon] fix codestype
006ce4d4 [simon] bugfix
aeb0de63 [simon] operationtype
3b6dceec [simon] init
Authored-by: simon <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/server/api/v1/SessionsResource.scala | 108 ++++++++++++++++++++-
.../org/apache/kyuubi/server/api/v1/dto.scala | 31 ++++++
.../server/api/v1/SessionsResourceSuite.scala | 79 +++++++++++++++
3 files changed, 216 insertions(+), 2 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index e39e647..7ac06dc 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -28,6 +28,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType,
TProtocolVersion}
import org.apache.kyuubi.Utils.error
import org.apache.kyuubi.cli.HandleIdentifier
+import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.session.SessionHandle
@@ -48,7 +49,6 @@ private[v1] class SessionsResource extends ApiRequestContext {
@Path("{sessionHandle}")
def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String):
SessionDetail = {
val sessionHandle = getSessionHandle(sessionHandleStr)
-
try {
val session = backendService.sessionManager.getSession(sessionHandle)
SessionDetail(session.user, session.ipAddress, session.createTime,
sessionHandle,
@@ -109,6 +109,111 @@ private[v1] class SessionsResource extends
ApiRequestContext {
Response.ok().build()
}
+ @POST
+ @Path("{sessionHandle}/operations/statement")
+ def executeStatement(@PathParam("sessionHandle") sessionHandleStr: String,
+ request: StatementRequest): OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.executeStatement(sessionHandle, request.statement,
request.runAsync,
+ request.queryTimeout)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error executing statement")
+ }
+ }
+
+ @POST
+ @Path("{sessionHandle}/operations/typeinfo")
+ def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String):
OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.getTypeInfo(sessionHandle)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error getting type information")
+ }
+ }
+
+ @POST
+ @Path("{sessionHandle}/operations/catalogs")
+ def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String):
OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.getCatalogs(sessionHandle)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error getting catalogs")
+ }
+ }
+
+ @POST
+ @Path("{sessionHandle}/operations/schemas")
+ def getSchemas(@PathParam("sessionHandle") sessionHandleStr: String,
+ request: GetSchemasRequest): OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.getSchemas(sessionHandle, request.catalogName,
request.schemaName)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error getting schemas")
+ }
+ }
+
+ @POST
+ @Path("{sessionHandle}/operations/tables")
+ def getTables(@PathParam("sessionHandle") sessionHandleStr: String,
+ request: GetTablesRequest): OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.getTables(sessionHandle, request.catalogName,
request.schemaName,
+ request.tableName, request.tableTypes)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error getting tables")
+ }
+ }
+
+ @POST
+ @Path("{sessionHandle}/operations/tabletypes")
+ def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String):
OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.getTableTypes(sessionHandle)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error getting table types")
+ }
+ }
+
+ @POST
+ @Path("{sessionHandle}/operations/columns")
+ def getColumns(@PathParam("sessionHandle") sessionHandleStr: String,
+ request: GetColumnsRequest): OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.getColumns(sessionHandle, request.catalogName,
request.schemaName,
+ request.tableName, request.columnName)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error getting columns")
+ }
+ }
+
+ @POST
+ @Path("{sessionHandle}/operations/functions")
+ def getFunctions(@PathParam("sessionHandle") sessionHandleStr: String,
+ request: GetFunctionsRequest): OperationHandle = {
+ val sessionHandle = getSessionHandle(sessionHandleStr)
+ try {
+ backendService.getFunctions(sessionHandle, request.catalogName,
request.schemaName,
+ request.functionName)
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error getting functions")
+ }
+ }
+
def getSessionHandle(sessionHandleStr: String): SessionHandle = {
try {
val splitSessionHandle = sessionHandleStr.split("\\|")
@@ -125,6 +230,5 @@ private[v1] class SessionsResource extends
ApiRequestContext {
error(s"Error getting sessionHandle by $sessionHandleStr.", e)
throw new NotFoundException(s"Error getting sessionHandle by
$sessionHandleStr.")
}
-
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index e3c2a8d..90fcde3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -55,3 +55,34 @@ case class SessionOpenRequest(
ipAddr: String,
configs: Map[String, String]
)
+
+case class StatementRequest(
+ statement: String,
+ runAsync: Boolean,
+ queryTimeout: Long
+)
+
+case class GetSchemasRequest(
+ catalogName: String,
+ schemaName: String
+)
+
+case class GetTablesRequest(
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: java.util.List[String]
+)
+
+case class GetColumnsRequest(
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String
+)
+
+case class GetFunctionsRequest(
+ catalogName: String,
+ schemaName: String,
+ functionName: String
+)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index aa78882..3deafbf 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -17,12 +17,14 @@
package org.apache.kyuubi.server.api.v1
+import java.util
import javax.ws.rs.client.Entity
import javax.ws.rs.core.{MediaType, Response}
import scala.concurrent.duration._
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
+import org.apache.kyuubi.operation.{OperationHandle, OperationType}
import org.apache.kyuubi.session.SessionHandle
class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper
{
@@ -197,4 +199,81 @@ class SessionsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(404 == response.getStatus)
}
}
+
+ test("test submit operation and get operation handle") {
+ val requestObj = SessionOpenRequest(
+ 1, "admin", "123456", "localhost", Map("testConfig" -> "testValue"))
+
+ withKyuubiRestServer { (_, _, _, webTarget) =>
+ var response: Response = webTarget.path("api/v1/sessions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+ val sessionHandle = response.readEntity(classOf[SessionHandle])
+ val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+ val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
+
+ val statementReq = StatementRequest("show tables", true, 3000)
+ response = webTarget
+
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ var operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.EXECUTE_STATEMENT)
+
+ response = webTarget.path(s"$pathPrefix/operations/typeinfo").request()
+ .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_TYPE_INFO)
+
+ response = webTarget.path(s"$pathPrefix/operations/catalogs")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_CATALOGS)
+
+ val getSchemasReq = GetSchemasRequest("default", "default")
+ response = webTarget.path(s"$pathPrefix/operations/schemas")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(getSchemasReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_SCHEMAS)
+
+ val tableTypes = new util.ArrayList[String]()
+ val getTablesReq = GetTablesRequest("default", "default", "default",
tableTypes)
+ response = webTarget.path(s"$pathPrefix/operations/tables")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(getTablesReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_TABLES)
+
+ response = webTarget.path(s"$pathPrefix/operations/tabletypes").request()
+ .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_TABLE_TYPES)
+
+ val getColumnsReq = GetColumnsRequest("default", "default", "default",
"default")
+ response = webTarget.path(s"$pathPrefix/operations/columns")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(getColumnsReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_COLUMNS)
+
+ var getFunctionsReq = GetFunctionsRequest("default", "default",
"default")
+ response = webTarget.path(s"$pathPrefix/operations/functions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(getFunctionsReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
+ }
+ }
}