This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fb62772a [KYUUBI #2402] [Improvement] addTimeoutMonitor for trino 
engine when it run query async
9fb62772a is described below

commit 9fb62772abe13ab3a5febd76ad6c96f8648d6137
Author: Min Zhao <[email protected]>
AuthorDate: Mon Apr 18 10:56:03 2022 +0800

    [KYUUBI #2402] [Improvement] addTimeoutMonitor for trino engine when it run 
query async
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2404 from zhaomin1423/trino_async_addTimeoutMonitor.
    
    Closes #2402
    
    5e08e677 [Min Zhao] [KYUUBI #2402] [Improvement] addTimeoutMonitor for 
trino engine when it run query async
    
    Authored-by: Min Zhao <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../engine/trino/operation/ExecuteStatement.scala     | 19 ++++++++++++++++++-
 .../trino/operation/TrinoOperationManager.scala       |  3 ++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index e6ee4a1a5..1d7e28536 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.engine.trino.operation
 
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{RejectedExecutionException, 
ScheduledExecutorService, TimeUnit}
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.Logging
@@ -28,14 +28,18 @@ import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.operation.OperationType
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThreadUtils
 
 class ExecuteStatement(
     session: Session,
     override val statement: String,
     override val shouldRunAsync: Boolean,
+    queryTimeout: Long,
     incrementalCollect: Boolean)
   extends TrinoOperation(OperationType.EXECUTE_STATEMENT, session) with 
Logging {
 
+  private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+
   private val operationLog: OperationLog = 
OperationLog.createOperationLog(session, getHandle)
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
@@ -50,6 +54,7 @@ class ExecuteStatement(
   }
 
   override protected def runInternal(): Unit = {
+    addTimeoutMonitor()
     val trinoStatement = TrinoStatement(trinoContext, 
session.sessionManager.getConf, statement)
     trino = trinoStatement.getTrinoClient
     if (shouldRunAsync) {
@@ -93,6 +98,18 @@ class ExecuteStatement(
       setState(OperationState.FINISHED)
     } catch {
       onError(cancel = true)
+    } finally {
+      statementTimeoutCleaner.foreach(_.shutdown())
+    }
+  }
+
+  private def addTimeoutMonitor(): Unit = {
+    if (queryTimeout > 0) {
+      val timeoutExecutor =
+        
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+      val action: Runnable = () => cleanup(OperationState.TIMEOUT)
+      timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
+      statementTimeoutCleaner = Some(timeoutExecutor)
     }
   }
 }
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index 67367bba6..0ddf7ad08 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -35,7 +35,8 @@ class TrinoOperationManager extends 
OperationManager("TrinoOperationManager") {
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
     val incrementalCollect = 
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT)
-    val operation = new ExecuteStatement(session, statement, runAsync, 
incrementalCollect)
+    val operation =
+      new ExecuteStatement(session, statement, runAsync, queryTimeout, 
incrementalCollect)
     addOperation(operation)
   }
 

Reply via email to