This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 178c39e  [KYUUBI #1351] Implement api: 
/${version}/sessions/${identifier}/operations
178c39e is described below

commit 178c39e221e81b9dc6797b58a03d81285e9a635e
Author: simon <[email protected]>
AuthorDate: Tue Nov 16 19:55:36 2021 +0800

    [KYUUBI #1351] Implement api: /${version}/sessions/${identifier}/operations
    
    ### _Why are the changes needed?_
    This is a subtask of umbrella issue #KPIP-1
    
    - /${version}/sessions/${identifier}/operations
      - mapping:
        - ICLIService#executeStatement
        - ICLIService#executeStatementAsync
        - ICLIService#getTypeInfo
        - ICLIService#getCatelogs
        - ICLIService#getSchemas
        - ICLIService#getTables
        - ICLIService#getTableTypes
        - ICLIService#getColumns
        - ICLIService#getFunctions
      - desc: create a new operation under a session
      - method: POST
      - params:
      - operation: e.g. GetTableTypes
      - addition: required params to create an Operation
      - returns: an instance of OperationHandle
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1353 from simon824/r1.
    
    Closes #1351
    
    98ffcda4 [simon] rename
    7c0c3b40 [simon] ut fix
    586a0b8a [simon] rename url
    1ea6f322 [simon] codestyle
    48b5d79d [simon] fix
    543caf3c [simon] separate
    0ff30af2 [simon] codestyle
    203cb164 [simon] Merge remote-tracking branch 'upstream/master' into r1
    e578cb42 [simon] fix codestype
    006ce4d4 [simon] bugfix
    aeb0de63 [simon] operationtype
    3b6dceec [simon] init
    
    Authored-by: simon <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/server/api/v1/SessionsResource.scala    | 108 ++++++++++++++++++++-
 .../org/apache/kyuubi/server/api/v1/dto.scala      |  31 ++++++
 .../server/api/v1/SessionsResourceSuite.scala      |  79 +++++++++++++++
 3 files changed, 216 insertions(+), 2 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index e39e647..7ac06dc 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -28,6 +28,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, 
TProtocolVersion}
 
 import org.apache.kyuubi.Utils.error
 import org.apache.kyuubi.cli.HandleIdentifier
+import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.session.SessionHandle
 
@@ -48,7 +49,6 @@ private[v1] class SessionsResource extends ApiRequestContext {
   @Path("{sessionHandle}")
   def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): 
SessionDetail = {
     val sessionHandle = getSessionHandle(sessionHandleStr)
-
     try {
       val session = backendService.sessionManager.getSession(sessionHandle)
       SessionDetail(session.user, session.ipAddress, session.createTime, 
sessionHandle,
@@ -109,6 +109,111 @@ private[v1] class SessionsResource extends 
ApiRequestContext {
     Response.ok().build()
   }
 
+  @POST
+  @Path("{sessionHandle}/operations/statement")
+  def executeStatement(@PathParam("sessionHandle") sessionHandleStr: String,
+                       request: StatementRequest): OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.executeStatement(sessionHandle, request.statement, 
request.runAsync,
+        request.queryTimeout)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error executing statement")
+    }
+  }
+
+  @POST
+  @Path("{sessionHandle}/operations/typeinfo")
+  def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String): 
OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.getTypeInfo(sessionHandle)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error getting type information")
+    }
+  }
+
+  @POST
+  @Path("{sessionHandle}/operations/catalogs")
+  def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String): 
OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.getCatalogs(sessionHandle)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error getting catalogs")
+    }
+  }
+
+  @POST
+  @Path("{sessionHandle}/operations/schemas")
+  def getSchemas(@PathParam("sessionHandle") sessionHandleStr: String,
+                 request: GetSchemasRequest): OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.getSchemas(sessionHandle, request.catalogName, 
request.schemaName)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error getting schemas")
+    }
+  }
+
+  @POST
+  @Path("{sessionHandle}/operations/tables")
+  def getTables(@PathParam("sessionHandle") sessionHandleStr: String,
+                request: GetTablesRequest): OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.getTables(sessionHandle, request.catalogName, 
request.schemaName,
+        request.tableName, request.tableTypes)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error getting tables")
+    }
+  }
+
+  @POST
+  @Path("{sessionHandle}/operations/tabletypes")
+  def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String): 
OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.getTableTypes(sessionHandle)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error getting table types")
+    }
+  }
+
+  @POST
+  @Path("{sessionHandle}/operations/columns")
+  def getColumns(@PathParam("sessionHandle") sessionHandleStr: String,
+                 request: GetColumnsRequest): OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.getColumns(sessionHandle, request.catalogName, 
request.schemaName,
+        request.tableName, request.columnName)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error getting columns")
+    }
+  }
+
+  @POST
+  @Path("{sessionHandle}/operations/functions")
+  def getFunctions(@PathParam("sessionHandle") sessionHandleStr: String,
+                   request: GetFunctionsRequest): OperationHandle = {
+    val sessionHandle = getSessionHandle(sessionHandleStr)
+    try {
+      backendService.getFunctions(sessionHandle, request.catalogName, 
request.schemaName,
+        request.functionName)
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error getting functions")
+    }
+  }
+
   def getSessionHandle(sessionHandleStr: String): SessionHandle = {
     try {
       val splitSessionHandle = sessionHandleStr.split("\\|")
@@ -125,6 +230,5 @@ private[v1] class SessionsResource extends 
ApiRequestContext {
         error(s"Error getting sessionHandle by $sessionHandleStr.", e)
         throw new NotFoundException(s"Error getting sessionHandle by 
$sessionHandleStr.")
     }
-
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index e3c2a8d..90fcde3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -55,3 +55,34 @@ case class SessionOpenRequest(
   ipAddr: String,
   configs: Map[String, String]
 )
+
+case class StatementRequest(
+  statement: String,
+  runAsync: Boolean,
+  queryTimeout: Long
+)
+
+case class GetSchemasRequest(
+  catalogName: String,
+  schemaName: String
+)
+
+case class GetTablesRequest(
+  catalogName: String,
+  schemaName: String,
+  tableName: String,
+  tableTypes: java.util.List[String]
+)
+
+case class GetColumnsRequest(
+  catalogName: String,
+  schemaName: String,
+  tableName: String,
+  columnName: String
+)
+
+case class GetFunctionsRequest(
+  catalogName: String,
+  schemaName: String,
+  functionName: String
+)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index aa78882..3deafbf 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -17,12 +17,14 @@
 
 package org.apache.kyuubi.server.api.v1
 
+import java.util
 import javax.ws.rs.client.Entity
 import javax.ws.rs.core.{MediaType, Response}
 
 import scala.concurrent.duration._
 
 import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
+import org.apache.kyuubi.operation.{OperationHandle, OperationType}
 import org.apache.kyuubi.session.SessionHandle
 
 class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper 
{
@@ -197,4 +199,81 @@ class SessionsResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper {
       assert(404 == response.getStatus)
     }
   }
+
+  test("test submit operation and get operation handle") {
+    val requestObj = SessionOpenRequest(
+      1, "admin", "123456", "localhost", Map("testConfig" -> "testValue"))
+
+    withKyuubiRestServer { (_, _, _, webTarget) =>
+      var response: Response = webTarget.path("api/v1/sessions")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+      val sessionHandle = response.readEntity(classOf[SessionHandle])
+      val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+        
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+      val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
+
+      val statementReq = StatementRequest("show tables", true, 3000)
+      response = webTarget
+        
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      var operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.EXECUTE_STATEMENT)
+
+      response = webTarget.path(s"$pathPrefix/operations/typeinfo").request()
+        .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_TYPE_INFO)
+
+      response = webTarget.path(s"$pathPrefix/operations/catalogs")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_CATALOGS)
+
+      val getSchemasReq = GetSchemasRequest("default", "default")
+      response = webTarget.path(s"$pathPrefix/operations/schemas")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(getSchemasReq, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_SCHEMAS)
+
+      val tableTypes = new util.ArrayList[String]()
+      val getTablesReq = GetTablesRequest("default", "default", "default", 
tableTypes)
+      response = webTarget.path(s"$pathPrefix/operations/tables")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(getTablesReq, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_TABLES)
+
+      response = webTarget.path(s"$pathPrefix/operations/tabletypes").request()
+        .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_TABLE_TYPES)
+
+      val getColumnsReq = GetColumnsRequest("default", "default", "default", 
"default")
+      response = webTarget.path(s"$pathPrefix/operations/columns")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(getColumnsReq, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_COLUMNS)
+
+      var getFunctionsReq = GetFunctionsRequest("default", "default", 
"default")
+      response = webTarget.path(s"$pathPrefix/operations/functions")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(getFunctionsReq, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
+    }
+  }
 }

Reply via email to