This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 9e263d79f [KYUUBI #2403] [Improvement] move addTimeoutMonitor to
AbstractOperation because it was used in multiple engines
9e263d79f is described below
commit 9e263d79f916c8125a53693c9c0df552889168e2
Author: Min Zhao <[email protected]>
AuthorDate: Thu May 5 12:15:50 2022 +0800
[KYUUBI #2403] [Improvement] move addTimeoutMonitor to AbstractOperation
because it was used in multiple engines
### _Why are the changes needed?_
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2466 from zhaomin1423/timeoutMonitor.
Closes #2403
3266a02b [Min Zhao] modify statementTimeoutCleaner access modifier
db73a42d [Min Zhao] add shutdownTimeoutMonitor
4f1d0e10 [Min Zhao] clear unused import
acda7e6c [Min Zhao] handle cleanup
eca266a3 [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to
AbstractOperation because it was used in multiple engines
f5256053 [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to
AbstractOperation because it was used in multiple engines
8e7820bc [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to
AbstractOperation because it was used in multiple engines
9b19704e [Min Zhao] [KYUUBI #2403] [Improvement] move addTimeoutMonitor to
AbstractOperation because it was used in multiple engines
Authored-by: Min Zhao <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../engine/flink/operation/ExecuteStatement.scala | 19 +++------------
.../engine/flink/operation/FlinkOperation.scala | 8 -------
.../engine/spark/operation/ExecuteStatement.scala | 25 +++-----------------
.../engine/spark/operation/SparkOperation.scala | 2 +-
.../engine/trino/operation/ExecuteStatement.scala | 27 ++++------------------
.../engine/trino/operation/TrinoOperation.scala | 8 -------
.../kyuubi/operation/AbstractOperation.scala | 26 ++++++++++++++++++++-
7 files changed, 37 insertions(+), 78 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index e872dc628..98e4a4a32 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.flink.operation
-import java.util.concurrent.{RejectedExecutionException,
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -35,7 +35,6 @@ import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
session: Session,
@@ -48,8 +47,6 @@ class ExecuteStatement(
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
- private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
-
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@VisibleForTesting
@@ -70,7 +67,7 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
- addTimeoutMonitor()
+ addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
@@ -122,7 +119,7 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
- statementTimeoutCleaner.foreach(_.shutdown())
+ shutdownTimeoutMonitor()
}
}
@@ -181,14 +178,4 @@ class ExecuteStatement(
warn(s"Failed to clean result set $resultId in session $sessionId", t)
}
}
-
- private def addTimeoutMonitor(): Unit = {
- if (queryTimeout > 0) {
- val timeoutExecutor =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
- val action: Runnable = () => cleanup(OperationState.TIMEOUT)
- timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
- statementTimeoutCleaner = Some(timeoutExecutor)
- }
- }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index d178d81ee..f70d755de 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -31,7 +31,6 @@ import org.apache.kyuubi.engine.flink.schema.RowSet
import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT,
FETCH_PRIOR, FetchOrientation}
-import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -111,13 +110,6 @@ abstract class FlinkOperation(
override def shouldRunAsync: Boolean = false
- protected def cleanup(targetState: OperationState): Unit =
state.synchronized {
- if (!isTerminalState(state)) {
- setState(targetState)
- Option(getBackgroundHandle).foreach(_.cancel(true))
- }
- }
-
protected def onError(cancel: Boolean = false): PartialFunction[Throwable,
Unit] = {
// We should use Throwable instead of Exception since
`java.lang.NoClassDefFoundError`
// could be thrown.
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 630de7706..8525ccdee 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
-import java.util.concurrent.{RejectedExecutionException,
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
@@ -34,7 +34,6 @@ import org.apache.kyuubi.operation.{ArrayFetchIterator,
IterableFetchIterator, O
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
session: Session,
@@ -44,8 +43,6 @@ class ExecuteStatement(
incrementalCollect: Boolean)
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with
Logging {
- private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
-
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@@ -99,12 +96,12 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
- statementTimeoutCleaner.foreach(_.shutdown())
+ shutdownTimeoutMonitor()
}
}
override protected def runInternal(): Unit = {
- addTimeoutMonitor()
+ addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
@@ -130,22 +127,6 @@ class ExecuteStatement(
}
}
- private def addTimeoutMonitor(): Unit = {
- if (queryTimeout > 0) {
- val timeoutExecutor =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
- timeoutExecutor.schedule(
- new Runnable {
- override def run(): Unit = {
- cleanup(OperationState.TIMEOUT)
- }
- },
- queryTimeout,
- TimeUnit.SECONDS)
- statementTimeoutCleaner = Some(timeoutExecutor)
- }
- }
-
override def cleanup(targetState: OperationState): Unit = {
spark.sparkContext.removeSparkListener(operationListener)
super.cleanup(targetState)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 314b2276b..24f7e31ed 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -59,7 +59,7 @@ abstract class SparkOperation(opType: OperationType, session:
Session)
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)
- protected def cleanup(targetState: OperationState): Unit =
state.synchronized {
+ override def cleanup(targetState: OperationState): Unit = state.synchronized
{
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 2f7470713..112d139f3 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -17,20 +17,15 @@
package org.apache.kyuubi.engine.trino.operation
-import java.util.concurrent.{RejectedExecutionException,
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.RejectedExecutionException
-import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.trino.TrinoStatement
import org.apache.kyuubi.engine.trino.event.TrinoOperationEvent
import org.apache.kyuubi.events.EventBus
-import org.apache.kyuubi.operation.ArrayFetchIterator
-import org.apache.kyuubi.operation.IterableFetchIterator
-import org.apache.kyuubi.operation.OperationState
-import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator,
OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
session: Session,
@@ -40,8 +35,6 @@ class ExecuteStatement(
incrementalCollect: Boolean)
extends TrinoOperation(OperationType.EXECUTE_STATEMENT, session) with
Logging {
- private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
-
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@@ -56,7 +49,7 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
- addTimeoutMonitor()
+ addTimeoutMonitor(queryTimeout)
val trinoStatement = TrinoStatement(trinoContext,
session.sessionManager.getConf, statement)
trino = trinoStatement.getTrinoClient
if (shouldRunAsync) {
@@ -101,17 +94,7 @@ class ExecuteStatement(
} catch {
onError(cancel = true)
} finally {
- statementTimeoutCleaner.foreach(_.shutdown())
- }
- }
-
- private def addTimeoutMonitor(): Unit = {
- if (queryTimeout > 0) {
- val timeoutExecutor =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
- val action: Runnable = () => cleanup(OperationState.TIMEOUT)
- timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
- statementTimeoutCleaner = Some(timeoutExecutor)
+ shutdownTimeoutMonitor()
}
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index 0da6824e1..23e14464b 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -37,7 +37,6 @@ import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.FetchOrientation.FETCH_PRIOR
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState
-import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -88,13 +87,6 @@ abstract class TrinoOperation(opType: OperationType,
session: Session)
cleanup(OperationState.CANCELED)
}
- protected def cleanup(targetState: OperationState): Unit =
state.synchronized {
- if (!isTerminalState(state)) {
- setState(targetState)
- Option(getBackgroundHandle).foreach(_.cancel(true))
- }
- }
-
override def close(): Unit = {
cleanup(OperationState.CLOSED)
try {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 272095804..7a9736e03 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.operation
-import java.util.concurrent.Future
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet,
TTableSchema}
@@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.OperationState._
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThreadUtils
abstract class AbstractOperation(opType: OperationType, session: Session)
extends Operation with Logging {
@@ -41,6 +42,29 @@ abstract class AbstractOperation(opType: OperationType,
session: Session)
final private[kyuubi] val statementId = handle.identifier.toString
+ private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+
+ protected def cleanup(targetState: OperationState): Unit =
state.synchronized {
+ if (!isTerminalState(state)) {
+ setState(targetState)
+ Option(getBackgroundHandle).foreach(_.cancel(true))
+ }
+ }
+
+ protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
+ if (queryTimeout > 0) {
+ val timeoutExecutor =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+ val action: Runnable = () => cleanup(OperationState.TIMEOUT)
+ timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
+ statementTimeoutCleaner = Some(timeoutExecutor)
+ }
+ }
+
+ protected def shutdownTimeoutMonitor(): Unit = {
+ statementTimeoutCleaner.foreach(_.shutdown())
+ }
+
override def getOperationLog: Option[OperationLog] = None
@volatile protected var state: OperationState = INITIALIZED