This is an automated email from the ASF dual-hosted git repository.
hongdd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b952b7b [KYUUBI #1821] Add trino ExecuteStatement
b952b7b is described below
commit b952b7b5d8d9ecc0a041bd91e1db8f20eb051495
Author: hongdongdong <[email protected]>
AuthorDate: Thu Feb 10 10:07:57 2022 +0800
[KYUUBI #1821] Add trino ExecuteStatement
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Add trino ExecuteStatement
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [X] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1830 from hddong/add-operation.
Closes #1821
067bde7a [hongdongdong] use flag instead breakable
f4d6cbb9 [hongdongdong] fix
351e2bc9 [hongdongdong] move context to impl
69d7d9b2 [hongdongdong] fix wrong func name
9cb757a9 [hongdongdong] fix
a20f2d0f [hongdongdong] fix time unit
c5072dbf [hongdongdong] [KYUUBI #1821] Add trino ExecuteStatement
Authored-by: hongdongdong <[email protected]>
Signed-off-by: hongdongdong <[email protected]>
---
externals/kyuubi-trino-engine/pom.xml | 7 +
.../kyuubi/engine/trino/TrinoBackendService.scala} | 17 +-
.../org/apache/kyuubi/engine/trino/TrinoConf.scala | 9 +
.../apache/kyuubi/engine/trino/TrinoContext.scala | 16 +-
.../kyuubi/engine/trino/TrinoSqlEngine.scala | 61 +++++-
.../kyuubi/engine/trino/TrinoStatement.scala | 69 +++----
.../engine/trino/operation/ExecuteStatement.scala | 98 ++++++++++
.../engine/trino/operation/TrinoOperation.scala | 134 +++++++++++++
.../trino/operation/TrinoOperationManager.scala | 70 +++++++
.../engine/trino/session/TrinoSessionImpl.scala | 102 ++++++++++
.../engine/trino/session/TrinoSessionManager.scala | 81 ++++++++
.../kyuubi/engine/trino/WithTrinoEngine.scala | 66 +++++++
.../trino/operation/TrinoOperationSuite.scala | 216 +++++++++++++++++++++
.../kyuubi/engine/trino/session/SessionSuite.scala | 42 ++++
.../src/main/scala/org/apache/kyuubi/Utils.scala | 1 +
15 files changed, 926 insertions(+), 63 deletions(-)
diff --git a/externals/kyuubi-trino-engine/pom.xml
b/externals/kyuubi-trino-engine/pom.xml
index d51d2e0..a97fa9a 100644
--- a/externals/kyuubi-trino-engine/pom.xml
+++ b/externals/kyuubi-trino-engine/pom.xml
@@ -82,6 +82,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/TrinoContextSuite.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoBackendService.scala
similarity index 64%
rename from
externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/TrinoContextSuite.scala
rename to
externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoBackendService.scala
index dd9f6c5..6cc4141 100644
---
a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/TrinoContextSuite.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoBackendService.scala
@@ -17,16 +17,13 @@
package org.apache.kyuubi.engine.trino
-class TrinoContextSuite extends WithTrinoContainerServer {
+import org.apache.kyuubi.engine.trino.session.TrinoSessionManager
+import org.apache.kyuubi.service.AbstractBackendService
+import org.apache.kyuubi.session.SessionManager
- test("set current schema") {
- withTrinoContainer { trinoContext =>
- val trinoStatement = TrinoStatement(trinoContext, kyuubiConf, "select 1")
- assert("tiny" === trinoStatement.getCurrentDatabase)
+class TrinoBackendService
+ extends AbstractBackendService("TrinoBackendService") {
+
+ override val sessionManager: SessionManager = new TrinoSessionManager()
- trinoContext.setCurrentSchema("sf1")
- val trinoStatement2 = TrinoStatement(trinoContext, kyuubiConf, "select
1")
- assert("sf1" === trinoStatement2.getCurrentDatabase)
- }
- }
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoConf.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoConf.scala
index 161a1d4..9441112 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoConf.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoConf.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.trino
+import java.time.Duration
+
import org.apache.kyuubi.config.ConfigBuilder
import org.apache.kyuubi.config.ConfigEntry
import org.apache.kyuubi.config.KyuubiConf
@@ -30,4 +32,11 @@ object TrinoConf {
.version("1.5.0")
.intConf
.createWithDefault(3)
+
+ val CLIENT_REQUEST_TIMEOUT: ConfigEntry[Long] =
+ buildConf("trino.client.request.timeout")
+ .doc("Timeout for Trino client request to trino cluster")
+ .version("1.5.0")
+ .timeConf
+ .createWithDefault(Duration.ofMinutes(2).toMillis)
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoContext.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoContext.scala
index cad7c97..ed7a98f 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoContext.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoContext.scala
@@ -22,19 +22,11 @@ import java.util.concurrent.atomic.AtomicReference
import io.trino.client.ClientSession
import okhttp3.OkHttpClient
-class TrinoContext(
- val httpClient: OkHttpClient,
- val clientSession: AtomicReference[ClientSession]) {
-
- def getClientSession: ClientSession = clientSession.get
-
- def setCurrentSchema(schema: String): Unit = {
-
clientSession.set(ClientSession.builder(clientSession.get).withSchema(schema).build())
- }
-
-}
+case class TrinoContext(
+ httpClient: OkHttpClient,
+ clientSession: AtomicReference[ClientSession])
object TrinoContext {
def apply(httpClient: OkHttpClient, clientSession: ClientSession):
TrinoContext =
- new TrinoContext(httpClient, new AtomicReference(clientSession))
+ TrinoContext(httpClient, new AtomicReference(clientSession))
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
index 73ec32a..d05bc64 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
@@ -17,19 +17,74 @@
package org.apache.kyuubi.engine.trino
+import java.util.concurrent.CountDownLatch
+
import org.apache.kyuubi.Logging
+import org.apache.kyuubi.Utils.TRINO_ENGINE_SHUTDOWN_PRIORITY
+import org.apache.kyuubi.Utils.addShutdownHook
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.trino.TrinoSqlEngine.countDownLatch
+import org.apache.kyuubi.engine.trino.TrinoSqlEngine.currentEngine
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
+import org.apache.kyuubi.ha.client.RetryPolicies
+import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
+case class TrinoSqlEngine()
+ extends Serverable("TrinoSQLEngine") {
+
+ override val backendService = new TrinoBackendService()
+
+ override val frontendServices = Seq(new TrinoTBinaryFrontendService(this))
+
+ override def start(): Unit = {
+ super.start()
+ // Start engine self-terminating checker after all services are ready and
it can be reached by
+ // all servers in engine spaces.
+ backendService.sessionManager.startTerminatingChecker(() => {
+ assert(currentEngine.isDefined)
+ currentEngine.get.stop()
+ })
+ }
+
+ override protected def stopServer(): Unit = {
+ countDownLatch.countDown()
+ }
+}
+
object TrinoSqlEngine extends Logging {
+ private val countDownLatch = new CountDownLatch(1)
val kyuubiConf: KyuubiConf = KyuubiConf()
+ var currentEngine: Option[TrinoSqlEngine] = None
+
+ def startEngine(): Unit = {
+ currentEngine = Some(new TrinoSqlEngine())
+ currentEngine.foreach { engine =>
+ engine.initialize(kyuubiConf)
+ engine.start()
+ addShutdownHook(() => engine.stop(), TRINO_ENGINE_SHUTDOWN_PRIORITY + 1)
+ }
+ }
+
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
- // TODO start engine
- warn("Trino engine under development...")
- info(kyuubiConf.getAll)
+ try {
+ kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY,
RetryPolicies.N_TIME.toString)
+
+ startEngine()
+ // blocking main thread
+ countDownLatch.await()
+ } catch {
+ case t: Throwable if currentEngine.isDefined =>
+ currentEngine.foreach { engine =>
+ error(t)
+ engine.stop()
+ }
+ case t: Throwable => error("Create Trino Engine Failed", t)
+ }
}
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
index c13ae49..c1b2472 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala
@@ -27,7 +27,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration
import scala.concurrent.duration.Duration
-import scala.util.control.Breaks._
import com.google.common.base.Verify
import io.trino.client.ClientSession
@@ -46,7 +45,7 @@ import org.apache.kyuubi.engine.trino.TrinoStatement._
class TrinoStatement(trinoContext: TrinoContext, kyuubiConf: KyuubiConf, sql:
String) {
private lazy val trino = StatementClientFactory
- .newStatementClient(trinoContext.httpClient,
trinoContext.getClientSession, sql)
+ .newStatementClient(trinoContext.httpClient,
trinoContext.clientSession.get, sql)
private lazy val dataProcessingPoolSize =
kyuubiConf.get(DATA_PROCESSING_POOL_SIZE)
@@ -55,7 +54,7 @@ class TrinoStatement(trinoContext: TrinoContext, kyuubiConf:
KyuubiConf, sql: St
def getTrinoClient: StatementClient = trino
- def getCurrentDatabase: String = trinoContext.getClientSession.getSchema
+ def getCurrentDatabase: String = trinoContext.clientSession.get.getSchema
def getColumns: List[Column] = {
while (trino.isRunning) {
@@ -99,50 +98,44 @@ class TrinoStatement(trinoContext: TrinoContext,
kyuubiConf: KyuubiConf, sql: St
val rowBuffer = new ArrayList[List[Any]](MAX_BUFFERED_ROWS)
var bufferStart = System.nanoTime()
val result = ArrayBuffer[List[Any]]()
- try {
- breakable {
- while (!dataProcessing.isCompleted) {
- val atEnd = drainDetectingEnd(rowQueue, rowBuffer,
MAX_BUFFERED_ROWS, END_TOKEN)
- if (!atEnd) {
- // Flush if needed
- if (rowBuffer.size() >= MAX_BUFFERED_ROWS ||
- Duration.fromNanos(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0)
{
- result ++= rowBuffer.asScala
- rowBuffer.clear()
- bufferStart = System.nanoTime()
- }
-
- val row = rowQueue.poll(MAX_BUFFER_TIME.toMillis,
duration.MILLISECONDS)
- row match {
- case END_TOKEN => break
- case null =>
- case _ => rowBuffer.add(row)
- }
- }
+
+ var getDataEnd = false
+ while (!dataProcessing.isCompleted && !getDataEnd) {
+ val atEnd = drainDetectingEnd(rowQueue, rowBuffer, MAX_BUFFERED_ROWS,
END_TOKEN)
+ if (!atEnd) {
+ // Flush if needed
+ if (rowBuffer.size() >= MAX_BUFFERED_ROWS ||
+ Duration.fromNanos(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0) {
+ result ++= rowBuffer.asScala
+ rowBuffer.clear()
+ bufferStart = System.nanoTime()
}
- }
- if (!rowQueue.isEmpty()) {
- drainDetectingEnd(rowQueue, rowBuffer, Integer.MAX_VALUE, END_TOKEN)
- }
- result ++= rowBuffer.asScala
- val finalStatus = trino.finalStatusInfo()
- if (finalStatus.getError() != null) {
- val exception = KyuubiSQLException(
- s"Query ${finalStatus.getId} failed:
${finalStatus.getError.getMessage}")
- throw exception
+ val row = rowQueue.poll(MAX_BUFFER_TIME.toMillis,
duration.MILLISECONDS)
+ row match {
+ case END_TOKEN => getDataEnd = true
+ case null =>
+ case _ => rowBuffer.add(row)
+ }
}
+ }
+ if (!rowQueue.isEmpty()) {
+ drainDetectingEnd(rowQueue, rowBuffer, Integer.MAX_VALUE, END_TOKEN)
+ }
+ result ++= rowBuffer.asScala
- updateTrinoContext()
- } catch {
- case e: Exception =>
- throw KyuubiSQLException(e)
+ val finalStatus = trino.finalStatusInfo()
+ if (finalStatus.getError() != null) {
+ throw KyuubiSQLException(
+ s"Query ${finalStatus.getId} failed:
${finalStatus.getError.getMessage}")
}
+ updateTrinoContext()
+
result
}
def updateTrinoContext(): Unit = {
- val session = trinoContext.getClientSession
+ val session = trinoContext.clientSession.get
var builder = ClientSession.builder(session)
// update catalog and schema
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
new file mode 100644
index 0000000..e6ee4a1
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.operation
+
+import java.util.concurrent.RejectedExecutionException
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.trino.TrinoStatement
+import org.apache.kyuubi.operation.ArrayFetchIterator
+import org.apache.kyuubi.operation.IterableFetchIterator
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+class ExecuteStatement(
+ session: Session,
+ override val statement: String,
+ override val shouldRunAsync: Boolean,
+ incrementalCollect: Boolean)
+ extends TrinoOperation(OperationType.EXECUTE_STATEMENT, session) with
Logging {
+
+ private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
+ override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+ override protected def beforeRun(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
+ setState(OperationState.PENDING)
+ setHasResultSet(true)
+ }
+
+ override protected def afterRun(): Unit = {
+ OperationLog.removeCurrentOperationLog()
+ }
+
+ override protected def runInternal(): Unit = {
+ val trinoStatement = TrinoStatement(trinoContext,
session.sessionManager.getConf, statement)
+ trino = trinoStatement.getTrinoClient
+ if (shouldRunAsync) {
+ val asyncOperation = new Runnable {
+ override def run(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
+ executeStatement(trinoStatement)
+ }
+ }
+
+ try {
+ val trinoSessionManager = session.sessionManager
+ val backgroundHandle =
trinoSessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ val ke =
+ KyuubiSQLException("Error submitting query in background, query
rejected", rejected)
+ setOperationException(ke)
+ throw ke
+ }
+ } else {
+ executeStatement(trinoStatement)
+ }
+ }
+
+ private def executeStatement(trinoStatement: TrinoStatement): Unit = {
+ setState(OperationState.RUNNING)
+ try {
+ schema = trinoStatement.getColumns
+ val resultSet = trinoStatement.execute()
+ iter =
+ if (incrementalCollect) {
+ info("Execute in incremental collect mode")
+ new IterableFetchIterator(resultSet)
+ } else {
+ info("Execute in full collect mode")
+ new ArrayFetchIterator(resultSet.toArray)
+ }
+ setState(OperationState.FINISHED)
+ } catch {
+ onError(cancel = true)
+ }
+ }
+}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
new file mode 100644
index 0000000..298034b
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.operation
+
+import java.io.IOException
+
+import io.trino.client.Column
+import io.trino.client.StatementClient
+import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.TTableSchema
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.trino.TrinoContext
+import org.apache.kyuubi.engine.trino.schema.RowSet
+import org.apache.kyuubi.engine.trino.schema.SchemaHelper
+import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl
+import org.apache.kyuubi.operation.AbstractOperation
+import org.apache.kyuubi.operation.FetchIterator
+import org.apache.kyuubi.operation.FetchOrientation.FETCH_FIRST
+import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
+import org.apache.kyuubi.operation.FetchOrientation.FETCH_PRIOR
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.operation.OperationState.OperationState
+import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+abstract class TrinoOperation(opType: OperationType, session: Session)
+ extends AbstractOperation(opType, session) {
+
+ protected val trinoContext: TrinoContext =
session.asInstanceOf[TrinoSessionImpl].trinoContext
+
+ protected var trino: StatementClient = _
+
+ protected var schema: List[Column] = _
+
+ protected var iter: FetchIterator[List[Any]] = _
+
+ override def getResultSetSchema: TTableSchema =
SchemaHelper.toTTableSchema(schema)
+
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ validateDefaultFetchOrientation(order)
+ assertState(OperationState.FINISHED)
+ setHasResultSet(true)
+ order match {
+ case FETCH_NEXT => iter.fetchNext()
+ case FETCH_PRIOR => iter.fetchPrior(rowSetSize);
+ case FETCH_FIRST => iter.fetchAbsolute(0);
+ }
+ val taken = iter.take(rowSetSize)
+ val resultRowSet = RowSet.toTRowSet(taken.toList, schema,
getProtocolVersion)
+ resultRowSet.setStartRowOffset(iter.getPosition)
+ resultRowSet
+ }
+
+ override protected def beforeRun(): Unit = {
+ setHasResultSet(true)
+ setState(OperationState.RUNNING)
+ }
+
+ override protected def afterRun(): Unit = {
+ state.synchronized {
+ if (!isTerminalState(state)) {
+ setState(OperationState.FINISHED)
+ }
+ }
+ OperationLog.removeCurrentOperationLog()
+ }
+
+ override def cancel(): Unit = {
+ cleanup(OperationState.CANCELED)
+ }
+
+ protected def cleanup(targetState: OperationState): Unit =
state.synchronized {
+ if (!isTerminalState(state)) {
+ setState(targetState)
+ Option(getBackgroundHandle).foreach(_.cancel(true))
+ }
+ }
+
+ override def close(): Unit = {
+ cleanup(OperationState.CLOSED)
+ try {
+ trino.close()
+ getOperationLog.foreach(_.close())
+ } catch {
+ case e: IOException =>
+ error(e.getMessage, e)
+ }
+ }
+
+ override def shouldRunAsync: Boolean = false
+
+ protected def onError(cancel: Boolean = false): PartialFunction[Throwable,
Unit] = {
+ // We should use Throwable instead of Exception since
`java.lang.NoClassDefFoundError`
+ // could be thrown.
+ case e: Throwable =>
+ if (cancel && trino.isRunning) trino.cancelLeafStage()
+ state.synchronized {
+ val errMsg = Utils.stringifyException(e)
+ if (state == OperationState.TIMEOUT) {
+ val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
+ setOperationException(ke)
+ throw ke
+ } else if (isTerminalState(state)) {
+ setOperationException(KyuubiSQLException(errMsg))
+ warn(s"Ignore exception in terminal state with $statementId:
$errMsg")
+ } else {
+ error(s"Error operating $opType: $errMsg", e)
+ val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
+ setOperationException(ke)
+ setState(OperationState.ERROR)
+ throw ke
+ }
+ }
+ }
+}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
new file mode 100644
index 0000000..0f3a12d
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.operation
+
+import java.util
+
+import org.apache.kyuubi.config.KyuubiConf.OPERATION_INCREMENTAL_COLLECT
+import org.apache.kyuubi.operation.Operation
+import org.apache.kyuubi.operation.OperationManager
+import org.apache.kyuubi.session.Session
+
+class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
+
+ def newExecuteStatementOperation(
+ session: Session,
+ statement: String,
+ confOverlay: Map[String, String],
+ runAsync: Boolean,
+ queryTimeout: Long): Operation = {
+ val incrementalCollect =
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT)
+ val operation = new ExecuteStatement(session, statement, runAsync,
incrementalCollect)
+ addOperation(operation)
+ }
+
+ override def newGetTypeInfoOperation(session: Session): Operation = null
+
+ override def newGetCatalogsOperation(session: Session): Operation = null
+
+ override def newGetSchemasOperation(
+ session: Session,
+ catalog: String,
+ schema: String): Operation = null
+
+ override def newGetTablesOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: util.List[String]): Operation = null
+
+ override def newGetTableTypesOperation(session: Session): Operation = null
+
+ override def newGetColumnsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String): Operation = null
+
+ override def newGetFunctionsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ functionName: String): Operation = null
+}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
new file mode 100644
index 0000000..1b68fc7
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.session
+
+import java.net.URI
+import java.time.ZoneId
+import java.util.Collections
+import java.util.Locale
+import java.util.Optional
+import java.util.concurrent.TimeUnit
+
+import io.airlift.units.Duration
+import io.trino.client.ClientSession
+import okhttp3.OkHttpClient
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.Utils.currentUser
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.trino.TrinoConf
+import org.apache.kyuubi.engine.trino.TrinoContext
+import org.apache.kyuubi.session.AbstractSession
+import org.apache.kyuubi.session.SessionHandle
+import org.apache.kyuubi.session.SessionManager
+
+class TrinoSessionImpl(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String],
+ sessionManager: SessionManager)
+ extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
+
+ var trinoContext: TrinoContext = _
+ private var clientSession: ClientSession = _
+
+ override val handle: SessionHandle = SessionHandle(protocol)
+
+ override def open(): Unit = {
+ normalizedConf.foreach {
+ case ("use:database", database) => clientSession =
createClientSession(database)
+ case _ => // do nothing
+ }
+
+ val httpClient = new OkHttpClient.Builder().build()
+
+ if (clientSession == null) {
+ clientSession = createClientSession()
+ }
+ trinoContext = TrinoContext(httpClient, clientSession)
+
+ super.open()
+ }
+
+ private def createClientSession(schema: String = null): ClientSession = {
+ val sessionConf = sessionManager.getConf
+ val connectionUrl =
sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse(
+ throw KyuubiSQLException("Trino server url can not be null!"))
+ val catalog =
sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
+ throw KyuubiSQLException("Trino default catalog can not be null!"))
+ val user =
sessionConf.getOption("kyuubi.trino.user").getOrElse(currentUser)
+ val clientRequestTimeout =
sessionConf.get(TrinoConf.CLIENT_REQUEST_TIMEOUT)
+
+ new ClientSession(
+ URI.create(connectionUrl),
+ user,
+ Optional.empty(),
+ "kyuubi",
+ Optional.empty(),
+ Collections.emptySet(),
+ null,
+ catalog,
+ schema,
+ null,
+ ZoneId.systemDefault(),
+ Locale.getDefault,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ null,
+ new Duration(clientRequestTimeout, TimeUnit.MILLISECONDS),
+ true)
+ }
+}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
new file mode 100644
index 0000000..83af04a
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.session
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_LOG_DIR_ROOT
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.trino.TrinoSqlEngine
+import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager
+import org.apache.kyuubi.session.SessionHandle
+import org.apache.kyuubi.session.SessionManager
+
+class TrinoSessionManager
+ extends SessionManager("TrinoSessionManager") {
+
+ val operationManager = new TrinoOperationManager()
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ val absPath =
Utils.getAbsolutePathFromWork(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT))
+ _operationLogRoot = Some(absPath.toAbsolutePath.toString)
+ super.initialize(conf)
+ }
+
+ override def openSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String]): SessionHandle = {
+ info(s"Opening session for $user@$ipAddress")
+ val sessionImpl =
+ new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
+
+ try {
+ val handle = sessionImpl.handle
+ sessionImpl.open()
+ setSession(handle, sessionImpl)
+ info(s"$user's trino session with $handle is opened, current opening
sessions" +
+ s" $getOpenSessionCount")
+ handle
+ } catch {
+ case e: Exception =>
+ sessionImpl.close()
+ throw KyuubiSQLException(e)
+ }
+ }
+
+ override def closeSession(sessionHandle: SessionHandle): Unit = {
+ super.closeSession(sessionHandle)
+ if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
+ info("Session stopped due to shared level is Connection.")
+ stopSession()
+ }
+ }
+
+ private def stopSession(): Unit = {
+ TrinoSqlEngine.currentEngine.foreach(_.stop())
+ }
+
+ override protected def isServer: Boolean = false
+}
diff --git
a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/WithTrinoEngine.scala
b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/WithTrinoEngine.scala
new file mode 100644
index 0000000..14fe3e1
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/WithTrinoEngine.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+trait WithTrinoEngine extends KyuubiFunSuite with WithTrinoContainerServer {
+
+ protected var engine: TrinoSqlEngine = _
+ protected var connectionUrl: String = _
+
+ override val kyuubiConf: KyuubiConf = TrinoSqlEngine.kyuubiConf
+
+ def withKyuubiConf: Map[String, String]
+
+ override def beforeAll(): Unit = {
+ withContainers { trinoContainer =>
+ val containerConnectionUrl =
trinoContainer.jdbcUrl.replace("jdbc:trino", "http")
+ startTrinoEngine(containerConnectionUrl)
+ super.beforeAll()
+ }
+ }
+
+ def startTrinoEngine(containerConnectionUrl: String): Unit = {
+ kyuubiConf.set(KyuubiConf.ENGINE_TRINO_CONNECTION_URL,
containerConnectionUrl)
+
+ withKyuubiConf.foreach { case (k, v) =>
+ System.setProperty(k, v)
+ kyuubiConf.set(k, v)
+ }
+
+ TrinoSqlEngine.startEngine()
+ engine = TrinoSqlEngine.currentEngine.get
+ connectionUrl = engine.frontendServices.head.connectionUrl
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ stopTrinoEngine()
+ }
+
+ def stopTrinoEngine(): Unit = {
+ if (engine != null) {
+ engine.stop()
+ engine = null
+ }
+ }
+
+ protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/$schema;"
+}
diff --git
a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
new file mode 100644
index 0000000..4e2289f
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq
+import org.apache.hive.service.rpc.thrift.TFetchOrientation
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq
+import org.apache.hive.service.rpc.thrift.TOperationState
+import org.apache.hive.service.rpc.thrift.TStatusCode
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG
+import org.apache.kyuubi.engine.trino.WithTrinoEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class TrinoOperationSuite extends WithTrinoEngine with HiveJDBCTestHelper {
+ override def withKyuubiConf: Map[String, String] = Map(
+ ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory")
+
+ // use default schema, do not set to 'default', since withSessionHandle
strip suffix '/;'
+ override protected val schema = ""
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("execute statement - select decimal") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT DECIMAL '1.2' as col1,
DECIMAL '1.23' AS col2")
+ assert(resultSet.next())
+ assert(resultSet.getBigDecimal("col1") === new
java.math.BigDecimal("1.2"))
+ assert(resultSet.getBigDecimal("col2") === new
java.math.BigDecimal("1.23"))
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.DECIMAL)
+ assert(metaData.getColumnType(2) === java.sql.Types.DECIMAL)
+ assert(metaData.getPrecision(1) == 2)
+ assert(metaData.getPrecision(2) == 3)
+ assert(metaData.getScale(1) == 1)
+ assert(metaData.getScale(2) == 2)
+ }
+ }
+
+ test("test fetch orientation") {
+ val sql = "SELECT id FROM (VALUES 0, 1) as t(id)"
+
+ withSessionHandle { (client, handle) =>
+ val req = new TExecuteStatementReq()
+ req.setSessionHandle(handle)
+ req.setStatement(sql)
+ val tExecuteStatementResp = client.ExecuteStatement(req)
+ val opHandle = tExecuteStatementResp.getOperationHandle
+ waitForOperationToComplete(client, opHandle)
+
+ // fetch next from before first row
+ val tFetchResultsReq1 = new TFetchResultsReq(opHandle,
TFetchOrientation.FETCH_NEXT, 1)
+ val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq1)
+ assert(tFetchResultsResp1.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ val idSeq1 =
tFetchResultsResp1.getResults.getColumns.get(0).getI32Val.getValues.asScala.toSeq
+ assertResult(Seq(0L))(idSeq1)
+
+ // fetch next from first row
+ val tFetchResultsReq2 = new TFetchResultsReq(opHandle,
TFetchOrientation.FETCH_NEXT, 1)
+ val tFetchResultsResp2 = client.FetchResults(tFetchResultsReq2)
+ assert(tFetchResultsResp2.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ val idSeq2 =
tFetchResultsResp2.getResults.getColumns.get(0).getI32Val.getValues.asScala.toSeq
+ assertResult(Seq(1L))(idSeq2)
+
+ // fetch prior from second row, expected got first row
+ val tFetchResultsReq3 = new TFetchResultsReq(opHandle,
TFetchOrientation.FETCH_PRIOR, 1)
+ val tFetchResultsResp3 = client.FetchResults(tFetchResultsReq3)
+ assert(tFetchResultsResp3.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ val idSeq3 =
tFetchResultsResp3.getResults.getColumns.get(0).getI32Val.getValues.asScala.toSeq
+ assertResult(Seq(0L))(idSeq3)
+
+ // fetch first
+ val tFetchResultsReq4 = new TFetchResultsReq(opHandle,
TFetchOrientation.FETCH_FIRST, 3)
+ val tFetchResultsResp4 = client.FetchResults(tFetchResultsReq4)
+ assert(tFetchResultsResp4.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ val idSeq4 =
tFetchResultsResp4.getResults.getColumns.get(0).getI32Val.getValues.asScala.toSeq
+ assertResult(Seq(0L, 1L))(idSeq4)
+ }
+ }
+
+ test("get operation status") {
+ val sql = "select date '2011-11-11' - interval '1' day"
+
+ withSessionHandle { (client, handle) =>
+ val req = new TExecuteStatementReq()
+ req.setSessionHandle(handle)
+ req.setStatement(sql)
+ val tExecuteStatementResp = client.ExecuteStatement(req)
+ val opHandle = tExecuteStatementResp.getOperationHandle
+ val tGetOperationStatusReq = new TGetOperationStatusReq()
+ tGetOperationStatusReq.setOperationHandle(opHandle)
+ val resp = client.GetOperationStatus(tGetOperationStatusReq)
+ val status = resp.getStatus
+ assert(status.getStatusCode === TStatusCode.SUCCESS_STATUS)
+ assert(resp.getOperationState === TOperationState.FINISHED_STATE)
+ assert(resp.isHasResultSet)
+ }
+ }
+
+ test("basic open | execute | close") {
+ withThriftClient { client =>
+ val req = new TOpenSessionReq()
+ req.setUsername("hongdd")
+ req.setPassword("anonymous")
+ val tOpenSessionResp = client.OpenSession(req)
+
+ val tExecuteStatementReq = new TExecuteStatementReq()
+ tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ tExecuteStatementReq.setRunAsync(true)
+ tExecuteStatementReq.setStatement("show session")
+ val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
+
+ val operationHandle = tExecuteStatementResp.getOperationHandle
+ waitForOperationToComplete(client, operationHandle)
+ val tFetchResultsReq = new TFetchResultsReq()
+ tFetchResultsReq.setOperationHandle(operationHandle)
+ tFetchResultsReq.setFetchType(1)
+ tFetchResultsReq.setMaxRows(1000)
+ val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
+ val logs =
tFetchResultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+
assert(logs.exists(_.contains(classOf[ExecuteStatement].getCanonicalName)))
+
+ tFetchResultsReq.setFetchType(0)
+ val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq)
+ val rs =
tFetchResultsResp1.getResults.getColumns.get(0).getStringVal.getValues.asScala
+ assert(rs.contains("aggregation_operator_unspill_memory_limit"))
+
+ val tCloseSessionReq = new TCloseSessionReq()
+ tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
+ assert(tCloseSessionResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ }
+ }
+
+ test("not allow to operate closed session or operation") {
+ withThriftClient { client =>
+ val req = new TOpenSessionReq()
+ req.setUsername("hongdd")
+ req.setPassword("anonymous")
+ val tOpenSessionResp = client.OpenSession(req)
+
+ val tExecuteStatementReq = new TExecuteStatementReq()
+ tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ tExecuteStatementReq.setStatement("show session")
+ val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
+
+ val tCloseOperationReq = new
TCloseOperationReq(tExecuteStatementResp.getOperationHandle)
+ val tCloseOperationResp = client.CloseOperation(tCloseOperationReq)
+ assert(tCloseOperationResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+
+ val tFetchResultsReq = new TFetchResultsReq()
+
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
+ tFetchResultsReq.setFetchType(0)
+ tFetchResultsReq.setMaxRows(1000)
+ val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
+ assert(tFetchResultsResp.getStatus.getStatusCode ===
TStatusCode.ERROR_STATUS)
+ assert(tFetchResultsResp.getStatus.getErrorMessage startsWith "Invalid
OperationHandle" +
+ " [type=EXECUTE_STATEMENT, identifier:")
+
+ val tCloseSessionReq = new TCloseSessionReq()
+ tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
+ assert(tCloseSessionResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ val tExecuteStatementResp1 =
client.ExecuteStatement(tExecuteStatementReq)
+
+ val status = tExecuteStatementResp1.getStatus
+ assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
+ assert(status.getErrorMessage startsWith s"Invalid SessionHandle [")
+ }
+ }
+
+ test("cancel operation") {
+ withThriftClient { client =>
+ val req = new TOpenSessionReq()
+ req.setUsername("hongdd")
+ req.setPassword("anonymous")
+ val tOpenSessionResp = client.OpenSession(req)
+
+ val tExecuteStatementReq = new TExecuteStatementReq()
+ tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
+ tExecuteStatementReq.setStatement("show session")
+ val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
+ val tCancelOperationReq = new
TCancelOperationReq(tExecuteStatementResp.getOperationHandle)
+ val tCancelOperationResp = client.CancelOperation(tCancelOperationReq)
+ assert(tCancelOperationResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ val tFetchResultsReq = new TFetchResultsReq()
+
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
+ tFetchResultsReq.setFetchType(0)
+ tFetchResultsReq.setMaxRows(1000)
+ val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
+ assert(tFetchResultsResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ }
+ }
+}
diff --git
a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/session/SessionSuite.scala
b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/session/SessionSuite.scala
new file mode 100644
index 0000000..f7637ef
--- /dev/null
+++
b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/session/SessionSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.session
+
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG
+import org.apache.kyuubi.engine.trino.WithTrinoEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class SessionSuite extends WithTrinoEngine with HiveJDBCTestHelper {
+ override def withKyuubiConf: Map[String, String] = Map(
+ ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory",
+ ENGINE_SHARE_LEVEL.key -> "SERVER")
+
+ override protected val schema = "default"
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("test session") {
+ withJdbcStatement() { statement =>
+ statement.executeQuery("create or replace view temp_view as select 1 as
id")
+ val resultSet = statement.executeQuery("select * from temp_view")
+ assert(resultSet.next())
+ }
+ }
+
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 018c59c..938219a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -203,6 +203,7 @@ object Utils extends Logging {
// Hooks need to be invoked before the SparkContext stopped shall use a
higher priority.
val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
val FLINK_ENGINE_SHUTDOWN_PRIORITY = 50
+ val TRINO_ENGINE_SHUTDOWN_PRIORITY = 50
/**
* Add some operations that you want into ShutdownHook