This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 9fb62772a [KYUUBI #2402] [Improvement] addTimeoutMonitor for trino
engine when it run query async
9fb62772a is described below
commit 9fb62772abe13ab3a5febd76ad6c96f8648d6137
Author: Min Zhao <[email protected]>
AuthorDate: Mon Apr 18 10:56:03 2022 +0800
[KYUUBI #2402] [Improvement] addTimeoutMonitor for trino engine when it run
query async
### _Why are the changes needed?_
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2404 from zhaomin1423/trino_async_addTimeoutMonitor.
Closes #2402
5e08e677 [Min Zhao] [KYUUBI #2402] [Improvement] addTimeoutMonitor for
trino engine when it run query async
Authored-by: Min Zhao <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../engine/trino/operation/ExecuteStatement.scala | 19 ++++++++++++++++++-
.../trino/operation/TrinoOperationManager.scala | 3 ++-
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index e6ee4a1a5..1d7e28536 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.trino.operation
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{RejectedExecutionException,
ScheduledExecutorService, TimeUnit}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Logging
@@ -28,14 +28,18 @@ import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
session: Session,
override val statement: String,
override val shouldRunAsync: Boolean,
+ queryTimeout: Long,
incrementalCollect: Boolean)
extends TrinoOperation(OperationType.EXECUTE_STATEMENT, session) with
Logging {
+ private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@@ -50,6 +54,7 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
+ addTimeoutMonitor()
val trinoStatement = TrinoStatement(trinoContext,
session.sessionManager.getConf, statement)
trino = trinoStatement.getTrinoClient
if (shouldRunAsync) {
@@ -93,6 +98,18 @@ class ExecuteStatement(
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
+ } finally {
+ statementTimeoutCleaner.foreach(_.shutdown())
+ }
+ }
+
+ private def addTimeoutMonitor(): Unit = {
+ if (queryTimeout > 0) {
+ val timeoutExecutor =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+ val action: Runnable = () => cleanup(OperationState.TIMEOUT)
+ timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
+ statementTimeoutCleaner = Some(timeoutExecutor)
}
}
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index 67367bba6..0ddf7ad08 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -35,7 +35,8 @@ class TrinoOperationManager extends
OperationManager("TrinoOperationManager") {
runAsync: Boolean,
queryTimeout: Long): Operation = {
val incrementalCollect =
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT)
- val operation = new ExecuteStatement(session, statement, runAsync,
incrementalCollect)
+ val operation =
+ new ExecuteStatement(session, statement, runAsync, queryTimeout,
incrementalCollect)
addOperation(operation)
}