This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new acb5095c26 [KYUUBI #7248] JDBC engine should cancel the statement when
receive cancel operation
acb5095c26 is described below
commit acb5095c263cfc73b71787c84f2b3d16954a9642
Author: ruanwenjun <[email protected]>
AuthorDate: Thu Nov 20 20:41:26 2025 +0800
[KYUUBI #7248] JDBC engine should cancel the statement when receive cancel
operation
### Why are the changes needed?
close #7248
- Cancel the jdbc statement when receive a kyuubi cancel operation
### How was this patch tested?
Test by new ut case
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7249 from ruanwenjun/dev_wenjun_fix7248.
Closes #7248
fa847ecbc [Cheng Pan] Apply suggestion from @pan3793
734e6c467 [ruanwenjun] polish code
76112f313 [ruanwenjun] change starrocks image to 3.3.13
f2f9eed3e [ruanwenjun] add status assertion in new ut case
33e0f0dd2 [ruanwenjun] add assert in ut
c22feb5da [ruanwenjun] move todo to ExecuteStatement
a132f36b8 [ruanwenjun] improve ut
f97ceae4e [ruanwenjun] [KYUUBI #7248] Ensure jdbc engine statements are
canceled when receive cancel operation
Lead-authored-by: ruanwenjun <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit f2539d2581db1b1a1337b6834a7fbd79da471129)
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/jdbc/dialect/JdbcDialect.scala | 13 +++++++++++
.../engine/jdbc/operation/ExecuteStatement.scala | 23 +++++++++++++++++--
.../engine/jdbc/operation/JdbcOperation.scala | 4 ----
.../jdbc/mysql/OperationWithEngineSuite.scala | 25 +++++++++++++++++++++
.../StarRocksOperationWithEngineSuite.scala | 26 ++++++++++++++++++++++
.../jdbc/starrocks/WithStarRocksContainer.scala | 4 ++--
.../kyuubi/operation/HiveJDBCTestHelper.scala | 26 +++++++++++++++++-----
7 files changed, 108 insertions(+), 13 deletions(-)
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
index 6c2d3b1e09..ec2b070144 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
@@ -81,6 +81,19 @@ abstract class JdbcDialect extends SupportServiceLoader with
Logging {
def getTRowSetGenerator(): JdbcTRowSetGenerator
def getSchemaHelper(): SchemaHelper
+
+ def cancelStatement(jdbcStatement: Statement): Unit = {
+ if (jdbcStatement != null) {
+ jdbcStatement.cancel()
+ jdbcStatement.close()
+ }
+ }
+
+ def closeStatement(jdbcStatement: Statement): Unit = {
+ if (jdbcStatement != null) {
+ jdbcStatement.close()
+ }
+ }
}
object JdbcDialects extends Logging {
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
index d75c7f408c..3d611448f2 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
@@ -120,12 +120,31 @@ class ExecuteStatement(
super.validateFetchOrientation(order)
}
+ override def cancel(): Unit = withLockRequired {
+ if (!isTerminalState(state)) {
+ setState(OperationState.CANCELED)
+ // TODO: If `shouldRunAsync` is true, the statement is initialized
lazily.
+ // When a SQL is submitted and immediately canceled, `jdbcStatement` may
still be null,
+ // which can lead to the cancellation not taking effect.
+ if (jdbcStatement != null) {
+ dialect.cancelStatement(jdbcStatement)
+ jdbcStatement = null
+ } else {
+ warn(s"Ignore cancel operation $statementId due to jdbcStatement is
null.")
+ }
+ }
+ }
+
override def cleanup(targetState: OperationState): Unit = withLockRequired {
try {
super.cleanup(targetState)
} finally {
- if (jdbcStatement != null && !jdbcStatement.isClosed) {
- jdbcStatement.close()
+ if (jdbcStatement != null) {
+ if (targetState == OperationState.CANCELED) {
+ dialect.cancelStatement(jdbcStatement)
+ } else {
+ dialect.closeStatement(jdbcStatement)
+ }
jdbcStatement = null
}
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 555725944c..450d79b034 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -62,10 +62,6 @@ abstract class JdbcOperation(session: Session) extends
AbstractOperation(session
resp
}
- override def cancel(): Unit = {
- cleanup(OperationState.CANCELED)
- }
-
override def close(): Unit = {
cleanup(OperationState.CLOSED)
}
diff --git
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
index b8264c0699..0b89848305 100644
---
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
+++
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
@@ -16,10 +16,14 @@
*/
package org.apache.kyuubi.engine.jdbc.mysql
+import org.scalatest.concurrent.TimeLimits.failAfter
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE,
RUNNING_STATE}
class OperationWithEngineSuite extends MySQLOperationSuite with
HiveJDBCTestHelper {
@@ -75,4 +79,25 @@ class OperationWithEngineSuite extends MySQLOperationSuite
with HiveJDBCTestHelp
assert(tFetchResultsResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
}
}
+
+ test("MySQL - JDBC ExecuteStatement cancel operation should kill SQL
statement") {
+ failAfter(20.seconds) {
+ withSessionHandle { (client, handle) =>
+ val executeReq = new TExecuteStatementReq()
+ executeReq.setSessionHandle(handle)
+ // The SQL will sleep 120s
+ executeReq.setStatement("SELECT sleep(120)")
+ executeReq.setRunAsync(true)
+ val executeResp = client.ExecuteStatement(executeReq)
+ assert(executeResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+
+ val operationHandle = executeResp.getOperationHandle
+ waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE))
+
+ val cancelResp = client.CancelOperation(new
TCancelOperationReq(operationHandle))
+ assert(cancelResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE))
+ }
+ }
+ }
}
diff --git
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
index acbc028f89..49cc52b41a 100644
---
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
+++
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
@@ -16,10 +16,15 @@
*/
package org.apache.kyuubi.engine.jdbc.starrocks
+import scala.concurrent.duration.DurationInt
+
+import org.scalatest.concurrent.TimeLimits.failAfter
+
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE,
RUNNING_STATE}
class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with
HiveJDBCTestHelper {
@@ -75,4 +80,25 @@ class StarRocksOperationWithEngineSuite extends
StarRocksOperationSuite with Hiv
assert(tFetchResultsResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
}
}
+
+ test("StarRocks - JDBC ExecuteStatement cancel operation should kill SQL
statement") {
+ failAfter(20.seconds) {
+ withSessionHandle { (client, handle) =>
+ val executeReq = new TExecuteStatementReq()
+ executeReq.setSessionHandle(handle)
+ // The SQL will sleep 120s
+ executeReq.setStatement("SELECT sleep(120)")
+ executeReq.setRunAsync(true)
+ val executeResp = client.ExecuteStatement(executeReq)
+ assert(executeResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+
+ val operationHandle = executeResp.getOperationHandle
+ waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE))
+
+ val cancelResp = client.CancelOperation(new
TCancelOperationReq(operationHandle))
+ assert(cancelResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE))
+ }
+ }
+ }
}
diff --git
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
index 9c229a636c..8aa90e7861 100644
---
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
+++
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
@@ -26,7 +26,7 @@ import org.apache.kyuubi.engine.jdbc.WithJdbcServerContainer
trait WithStarRocksContainer extends WithJdbcServerContainer {
- private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.1.6"
+ private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.3.13"
private val STARROCKS_FE_MYSQL_PORT = 9030
private val STARROCKS_FE_HTTP_PORT = 8030
@@ -47,7 +47,7 @@ trait WithStarRocksContainer extends WithJdbcServerContainer {
.withStrategy(Wait.forListeningPorts(ports: _*))
.withStrategy(forLogMessage(".*broker service already added into FE
service.*", 1))
.withStrategy(
- forLogMessage(".*Enjoy the journal to StarRocks blazing-fast
lake-house engine.*", 1)))
+ forLogMessage(".*Enjoy the journey to StarRocks blazing-fast
lake-house engine.*", 1)))
protected def feJdbcUrl: String = withContainers { container =>
val queryServerHost: String = container.host
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
index 02cb9a0030..8d89cd9784 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
@@ -124,11 +124,27 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
}
def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
- val req = new TGetOperationStatusReq(op)
- var state = client.GetOperationStatus(req).getOperationState
- eventually(timeout(90.seconds), interval(100.milliseconds)) {
- state = client.GetOperationStatus(req).getOperationState
- assert(!Set(INITIALIZED_STATE, PENDING_STATE,
RUNNING_STATE).contains(state))
+ waitForOperationStatusIn(
+ client,
+ op,
+ Set(
+ FINISHED_STATE,
+ CANCELED_STATE,
+ CLOSED_STATE,
+ ERROR_STATE,
+ UKNOWN_STATE,
+ TIMEDOUT_STATE),
+ timeoutMs = 90000)
+ }
+
+ def waitForOperationStatusIn(
+ client: Iface,
+ op: TOperationHandle,
+ status: Set[TOperationState],
+ timeoutMs: Int = 5000): Unit = {
+ eventually(timeout(timeoutMs.milliseconds), interval(100.milliseconds)) {
+ val state = client.GetOperationStatus(new
TGetOperationStatusReq(op)).getOperationState
+ assert(status.contains(state))
}
}