This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e263d79f [KYUUBI #2403] [Improvement] move addTimeoutMonitor to 
AbstractOperation because it was used in multiple engines
9e263d79f is described below

commit 9e263d79f916c8125a53693c9c0df552889168e2
Author: Min Zhao <[email protected]>
AuthorDate: Thu May 5 12:15:50 2022 +0800

    [KYUUBI #2403] [Improvement] move addTimeoutMonitor to AbstractOperation 
because it was used in multiple engines
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2466 from zhaomin1423/timeoutMonitor.
    
    Closes #2403
    
    3266a02b [Min Zhao] modify statementTimeoutCleaner access modifier
    db73a42d [Min Zhao] add shutdownTimeoutMonitor
    4f1d0e10 [Min Zhao] clear unused import
    acda7e6c [Min Zhao] handle cleanup
    eca266a3 [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to 
AbstractOperation because it was used in multiple engines
    f5256053 [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to 
AbstractOperation because it was used in multiple engines
    8e7820bc [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to 
AbstractOperation because it was used in multiple engines
    9b19704e [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to 
AbstractOperation because it was used in multiple engines
    
    Authored-by: Min Zhao <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../engine/flink/operation/ExecuteStatement.scala  | 19 +++------------
 .../engine/flink/operation/FlinkOperation.scala    |  8 -------
 .../engine/spark/operation/ExecuteStatement.scala  | 25 +++-----------------
 .../engine/spark/operation/SparkOperation.scala    |  2 +-
 .../engine/trino/operation/ExecuteStatement.scala  | 27 ++++------------------
 .../engine/trino/operation/TrinoOperation.scala    |  8 -------
 .../kyuubi/operation/AbstractOperation.scala       | 26 ++++++++++++++++++++-
 7 files changed, 37 insertions(+), 78 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index e872dc628..98e4a4a32 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
-import java.util.concurrent.{RejectedExecutionException, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.RejectedExecutionException
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -35,7 +35,6 @@ import org.apache.kyuubi.engine.flink.result.ResultSet
 import org.apache.kyuubi.operation.{OperationState, OperationType}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.ThreadUtils
 
 class ExecuteStatement(
     session: Session,
@@ -48,8 +47,6 @@ class ExecuteStatement(
   private val operationLog: OperationLog =
     OperationLog.createOperationLog(session, getHandle)
 
-  private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
-
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
   @VisibleForTesting
@@ -70,7 +67,7 @@ class ExecuteStatement(
   }
 
   override protected def runInternal(): Unit = {
-    addTimeoutMonitor()
+    addTimeoutMonitor(queryTimeout)
     if (shouldRunAsync) {
       val asyncOperation = new Runnable {
         override def run(): Unit = {
@@ -122,7 +119,7 @@ class ExecuteStatement(
     } catch {
       onError(cancel = true)
     } finally {
-      statementTimeoutCleaner.foreach(_.shutdown())
+      shutdownTimeoutMonitor()
     }
   }
 
@@ -181,14 +178,4 @@ class ExecuteStatement(
         warn(s"Failed to clean result set $resultId in session $sessionId", t)
     }
   }
-
-  private def addTimeoutMonitor(): Unit = {
-    if (queryTimeout > 0) {
-      val timeoutExecutor =
-        
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
-      val action: Runnable = () => cleanup(OperationState.TIMEOUT)
-      timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
-      statementTimeoutCleaner = Some(timeoutExecutor)
-    }
-  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index d178d81ee..f70d755de 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -31,7 +31,6 @@ import org.apache.kyuubi.engine.flink.schema.RowSet
 import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
 import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
 import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, 
FETCH_PRIOR, FetchOrientation}
-import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.OperationType.OperationType
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
@@ -111,13 +110,6 @@ abstract class FlinkOperation(
 
   override def shouldRunAsync: Boolean = false
 
-  protected def cleanup(targetState: OperationState): Unit = 
state.synchronized {
-    if (!isTerminalState(state)) {
-      setState(targetState)
-      Option(getBackgroundHandle).foreach(_.cancel(true))
-    }
-  }
-
   protected def onError(cancel: Boolean = false): PartialFunction[Throwable, 
Unit] = {
     // We should use Throwable instead of Exception since 
`java.lang.NoClassDefFoundError`
     // could be thrown.
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 630de7706..8525ccdee 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.engine.spark.operation
 
-import java.util.concurrent.{RejectedExecutionException, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.RejectedExecutionException
 
 import scala.collection.JavaConverters._
 
@@ -34,7 +34,6 @@ import org.apache.kyuubi.operation.{ArrayFetchIterator, 
IterableFetchIterator, O
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.ThreadUtils
 
 class ExecuteStatement(
     session: Session,
@@ -44,8 +43,6 @@ class ExecuteStatement(
     incrementalCollect: Boolean)
   extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with 
Logging {
 
-  private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
-
   private val operationLog: OperationLog = 
OperationLog.createOperationLog(session, getHandle)
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
@@ -99,12 +96,12 @@ class ExecuteStatement(
     } catch {
       onError(cancel = true)
     } finally {
-      statementTimeoutCleaner.foreach(_.shutdown())
+      shutdownTimeoutMonitor()
     }
   }
 
   override protected def runInternal(): Unit = {
-    addTimeoutMonitor()
+    addTimeoutMonitor(queryTimeout)
     if (shouldRunAsync) {
       val asyncOperation = new Runnable {
         override def run(): Unit = {
@@ -130,22 +127,6 @@ class ExecuteStatement(
     }
   }
 
-  private def addTimeoutMonitor(): Unit = {
-    if (queryTimeout > 0) {
-      val timeoutExecutor =
-        
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
-      timeoutExecutor.schedule(
-        new Runnable {
-          override def run(): Unit = {
-            cleanup(OperationState.TIMEOUT)
-          }
-        },
-        queryTimeout,
-        TimeUnit.SECONDS)
-      statementTimeoutCleaner = Some(timeoutExecutor)
-    }
-  }
-
   override def cleanup(targetState: OperationState): Unit = {
     spark.sparkContext.removeSparkListener(operationListener)
     super.cleanup(targetState)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 314b2276b..24f7e31ed 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -59,7 +59,7 @@ abstract class SparkOperation(opType: OperationType, session: 
Session)
   override def redactedStatement: String =
     redact(spark.sessionState.conf.stringRedactionPattern, statement)
 
-  protected def cleanup(targetState: OperationState): Unit = 
state.synchronized {
+  override def cleanup(targetState: OperationState): Unit = state.synchronized 
{
     if (!isTerminalState(state)) {
       setState(targetState)
       Option(getBackgroundHandle).foreach(_.cancel(true))
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 2f7470713..112d139f3 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -17,20 +17,15 @@
 
 package org.apache.kyuubi.engine.trino.operation
 
-import java.util.concurrent.{RejectedExecutionException, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.RejectedExecutionException
 
-import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.engine.trino.TrinoStatement
 import org.apache.kyuubi.engine.trino.event.TrinoOperationEvent
 import org.apache.kyuubi.events.EventBus
-import org.apache.kyuubi.operation.ArrayFetchIterator
-import org.apache.kyuubi.operation.IterableFetchIterator
-import org.apache.kyuubi.operation.OperationState
-import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, 
OperationState, OperationType}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.ThreadUtils
 
 class ExecuteStatement(
     session: Session,
@@ -40,8 +35,6 @@ class ExecuteStatement(
     incrementalCollect: Boolean)
   extends TrinoOperation(OperationType.EXECUTE_STATEMENT, session) with 
Logging {
 
-  private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
-
   private val operationLog: OperationLog = 
OperationLog.createOperationLog(session, getHandle)
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
@@ -56,7 +49,7 @@ class ExecuteStatement(
   }
 
   override protected def runInternal(): Unit = {
-    addTimeoutMonitor()
+    addTimeoutMonitor(queryTimeout)
     val trinoStatement = TrinoStatement(trinoContext, 
session.sessionManager.getConf, statement)
     trino = trinoStatement.getTrinoClient
     if (shouldRunAsync) {
@@ -101,17 +94,7 @@ class ExecuteStatement(
     } catch {
       onError(cancel = true)
     } finally {
-      statementTimeoutCleaner.foreach(_.shutdown())
-    }
-  }
-
-  private def addTimeoutMonitor(): Unit = {
-    if (queryTimeout > 0) {
-      val timeoutExecutor =
-        
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
-      val action: Runnable = () => cleanup(OperationState.TIMEOUT)
-      timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
-      statementTimeoutCleaner = Some(timeoutExecutor)
+      shutdownTimeoutMonitor()
     }
   }
 
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index 0da6824e1..23e14464b 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -37,7 +37,6 @@ import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
 import org.apache.kyuubi.operation.FetchOrientation.FETCH_PRIOR
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationState
-import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.OperationType.OperationType
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
@@ -88,13 +87,6 @@ abstract class TrinoOperation(opType: OperationType, 
session: Session)
     cleanup(OperationState.CANCELED)
   }
 
-  protected def cleanup(targetState: OperationState): Unit = 
state.synchronized {
-    if (!isTerminalState(state)) {
-      setState(targetState)
-      Option(getBackgroundHandle).foreach(_.cancel(true))
-    }
-  }
-
   override def close(): Unit = {
     cleanup(OperationState.CLOSED)
     try {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 272095804..7a9736e03 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.operation
 
-import java.util.concurrent.Future
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, 
TTableSchema}
@@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.OperationState._
 import org.apache.kyuubi.operation.OperationType.OperationType
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThreadUtils
 
 abstract class AbstractOperation(opType: OperationType, session: Session)
   extends Operation with Logging {
@@ -41,6 +42,29 @@ abstract class AbstractOperation(opType: OperationType, 
session: Session)
 
   final private[kyuubi] val statementId = handle.identifier.toString
 
+  private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+
+  protected def cleanup(targetState: OperationState): Unit = 
state.synchronized {
+    if (!isTerminalState(state)) {
+      setState(targetState)
+      Option(getBackgroundHandle).foreach(_.cancel(true))
+    }
+  }
+
+  protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
+    if (queryTimeout > 0) {
+      val timeoutExecutor =
+        
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+      val action: Runnable = () => cleanup(OperationState.TIMEOUT)
+      timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
+      statementTimeoutCleaner = Some(timeoutExecutor)
+    }
+  }
+
+  protected def shutdownTimeoutMonitor(): Unit = {
+    statementTimeoutCleaner.foreach(_.shutdown())
+  }
+
   override def getOperationLog: Option[OperationLog] = None
 
   @volatile protected var state: OperationState = INITIALIZED

Reply via email to