This is an automated email from the ASF dual-hosted git repository.
lsm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 622190197d [KYUUBI #6843] [FOLLOWUP] Fix 'query-timeout-thread' thread
leak
622190197d is described below
commit 622190197ddacd1a756ae9e0209c649840adbcc2
Author: senmiaoliu <[email protected]>
AuthorDate: Fri Jan 10 10:30:00 2025 +0800
[KYUUBI #6843] [FOLLOWUP] Fix 'query-timeout-thread' thread leak
### Why are the changes needed?
If the session manager's ThreadPoolExecutor refuses to execute the
asyncOperation, then we need to shut down the query-timeout-thread in the catch
block. This should also be done in JDBC and the CHAT engine.
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
Closes #6873 from lsm1/branch-followup-6843.
Closes #6843
aed9088c8 [senmiaoliu] fix query timeout checker leak in chat engine and
jdbc engine
Authored-by: senmiaoliu <[email protected]>
Signed-off-by: senmiaoliu <[email protected]>
---
.../engine/chat/operation/ExecuteStatement.scala | 20 ++++++++++++++++----
.../engine/jdbc/operation/ExecuteStatement.scala | 17 ++++++++++++++---
2 files changed, 30 insertions(+), 7 deletions(-)
diff --git
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala
index 754a519324..f5d93d45d7 100644
---
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala
@@ -16,7 +16,9 @@
*/
package org.apache.kyuubi.engine.chat.operation
-import org.apache.kyuubi.Logging
+import java.util.concurrent.RejectedExecutionException
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.chat.provider.ChatProvider
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
@@ -41,9 +43,19 @@ class ExecuteStatement(
executeStatement()
}
}
- val chatSessionManager = session.sessionManager
- val backgroundHandle =
chatSessionManager.submitBackgroundOperation(asyncOperation)
- setBackgroundHandle(backgroundHandle)
+ try {
+ val chatSessionManager = session.sessionManager
+ val backgroundHandle =
chatSessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ val ke =
+ KyuubiSQLException("Error submitting query in background, query
rejected", rejected)
+ setOperationException(ke)
+ shutdownTimeoutMonitor()
+ throw ke
+ }
} else {
executeStatement()
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
index af9e9a1027..d75c7f408c 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.operation
import java.sql.{Connection, Statement, Types}
+import java.util.concurrent.RejectedExecutionException
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
@@ -50,9 +51,19 @@ class ExecuteStatement(
executeStatement()
}
}
- val jdbcSessionManager = session.sessionManager
- val backgroundHandle =
jdbcSessionManager.submitBackgroundOperation(asyncOperation)
- setBackgroundHandle(backgroundHandle)
+ try {
+ val jdbcSessionManager = session.sessionManager
+ val backgroundHandle =
jdbcSessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ val ke =
+ KyuubiSQLException("Error submitting query in background, query
rejected", rejected)
+ setOperationException(ke)
+ shutdownTimeoutMonitor()
+ throw ke
+ }
} else {
executeStatement()
}