This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new bdc28acf4 [KYUUBI #5392] Add query timeout monitor on server-side in
ExecuteStatement
bdc28acf4 is described below
commit bdc28acf411e2cb48c1e5e7699f4aa2db286045a
Author: Bowen Liang <[email protected]>
AuthorDate: Wed Oct 18 21:46:06 2023 +0800
[KYUUBI #5392] Add query timeout monitor on server-side in ExecuteStatement
### _Why are the changes needed?_
As reported in #5392, currently the server is unable to guarantee that the
statement timed-out when the engine may have no proper response for the
server's request therefore the query timeout does not work.
Introduce a server-side statement query timeout monitor, to ensure the
time-out query statements are set to TIMEOUT state and help the JDBC client get
out of the blocked status.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5398 from bowenliang123/stmt-timeout.
Closes #5392
f5733b3f9 [Bowen Liang] use addTimeoutMonitor for server-side query timeout
checks
Authored-by: Bowen Liang <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 9 +++++++++
.../org/apache/kyuubi/operation/ExecuteStatement.scala | 13 ++++++++++++-
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 50006b95e..d96f536a8 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1876,6 +1876,15 @@ object KyuubiConf {
.checkValue(_ >= 1000, "must >= 1s if set")
.createOptional
+ val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
+ buildConf("kyuubi.operation.query.timeout.monitor.enabled")
+ .doc("Whether to monitor timeout query timeout check on server side.")
+ .version("1.8.0")
+ .serverOnly
+ .internal
+ .booleanConf
+ .createWithDefault(true)
+
val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] =
buildConf("kyuubi.operation.result.max.rows")
.doc("Max rows of Spark query results. Rows exceeding the limit would be
ignored. " +
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 4767cbf12..86bd3f8c8 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
+import
org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.log.OperationLog
@@ -58,6 +59,10 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}
+ private val isTimeoutMonitorEnabled: Boolean = confOverlay.getOrElse[String](
+ OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.key,
+ OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.defaultValStr).toBoolean
+
private def executeStatement(): Unit = {
try {
// We need to avoid executing query in sync mode, because there is no
heartbeat mechanism
@@ -84,7 +89,7 @@ class ExecuteStatement(
var lastStateUpdateTime: Long = 0L
val stateUpdateInterval =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL)
- while (!isComplete) {
+ while (!isComplete && !isTerminalState(state)) {
fetchQueryLog()
verifyTStatus(statusResp.getStatus)
if (statusResp.getProgressUpdateResponse != null) {
@@ -143,6 +148,9 @@ class ExecuteStatement(
// see if anymore log could be fetched
fetchQueryLog()
} catch onError()
+ finally {
+ shutdownTimeoutMonitor()
+ }
private def fetchQueryLog(): Unit = {
getOperationLog.foreach { logger =>
@@ -157,6 +165,9 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
+ if (isTimeoutMonitorEnabled) {
+ addTimeoutMonitor(queryTimeout)
+ }
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation: Runnable = () => waitStatementComplete()