This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f2539d2581 [KYUUBI #7248] JDBC engine should cancel the statement when 
receive cancel operation
f2539d2581 is described below

commit f2539d2581db1b1a1337b6834a7fbd79da471129
Author: ruanwenjun <[email protected]>
AuthorDate: Thu Nov 20 20:41:26 2025 +0800

    [KYUUBI #7248] JDBC engine should cancel the statement when receive cancel 
operation
    
    ### Why are the changes needed?
    close #7248
    - Cancel the jdbc statement when receive a kyuubi cancel operation
    
    ### How was this patch tested?
    Test by new ut case
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #7249 from ruanwenjun/dev_wenjun_fix7248.
    
    Closes #7248
    
    fa847ecbc [Cheng Pan] Apply suggestion from @pan3793
    734e6c467 [ruanwenjun] polish code
    76112f313 [ruanwenjun] change starrocks image to 3.3.13
    f2f9eed3e [ruanwenjun] add status assertion in new ut case
    33e0f0dd2 [ruanwenjun] add assert in ut
    c22feb5da [ruanwenjun] move todo to ExecuteStatement
    a132f36b8 [ruanwenjun] improve ut
    f97ceae4e [ruanwenjun] [KYUUBI #7248] Ensure jdbc engine statements are 
canceled when receive cancel operation
    
    Lead-authored-by: ruanwenjun <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/jdbc/dialect/JdbcDialect.scala   | 13 +++++++++++
 .../engine/jdbc/operation/ExecuteStatement.scala   | 23 +++++++++++++++++--
 .../engine/jdbc/operation/JdbcOperation.scala      |  4 ----
 .../jdbc/mysql/OperationWithEngineSuite.scala      | 25 +++++++++++++++++++++
 .../StarRocksOperationWithEngineSuite.scala        | 26 ++++++++++++++++++++++
 .../jdbc/starrocks/WithStarRocksContainer.scala    |  4 ++--
 .../kyuubi/operation/HiveJDBCTestHelper.scala      | 26 +++++++++++++++++-----
 7 files changed, 108 insertions(+), 13 deletions(-)

diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
index 6c2d3b1e09..ec2b070144 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
@@ -81,6 +81,19 @@ abstract class JdbcDialect extends SupportServiceLoader with 
Logging {
   def getTRowSetGenerator(): JdbcTRowSetGenerator
 
   def getSchemaHelper(): SchemaHelper
+
+  def cancelStatement(jdbcStatement: Statement): Unit = {
+    if (jdbcStatement != null) {
+      jdbcStatement.cancel()
+      jdbcStatement.close()
+    }
+  }
+
+  def closeStatement(jdbcStatement: Statement): Unit = {
+    if (jdbcStatement != null) {
+      jdbcStatement.close()
+    }
+  }
 }
 
 object JdbcDialects extends Logging {
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
index d75c7f408c..3d611448f2 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
@@ -120,12 +120,31 @@ class ExecuteStatement(
     super.validateFetchOrientation(order)
   }
 
+  override def cancel(): Unit = withLockRequired {
+    if (!isTerminalState(state)) {
+      setState(OperationState.CANCELED)
+      // TODO: If `shouldRunAsync` is true, the statement is initialized 
lazily.
+      // When a SQL is submitted and immediately canceled, `jdbcStatement` may 
still be null,
+      // which can lead to the cancellation not taking effect.
+      if (jdbcStatement != null) {
+        dialect.cancelStatement(jdbcStatement)
+        jdbcStatement = null
+      } else {
+        warn(s"Ignore cancel operation $statementId due to jdbcStatement is 
null.")
+      }
+    }
+  }
+
   override def cleanup(targetState: OperationState): Unit = withLockRequired {
     try {
       super.cleanup(targetState)
     } finally {
-      if (jdbcStatement != null && !jdbcStatement.isClosed) {
-        jdbcStatement.close()
+      if (jdbcStatement != null) {
+        if (targetState == OperationState.CANCELED) {
+          dialect.cancelStatement(jdbcStatement)
+        } else {
+          dialect.closeStatement(jdbcStatement)
+        }
         jdbcStatement = null
       }
     }
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 555725944c..450d79b034 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -62,10 +62,6 @@ abstract class JdbcOperation(session: Session) extends 
AbstractOperation(session
     resp
   }
 
-  override def cancel(): Unit = {
-    cleanup(OperationState.CANCELED)
-  }
-
   override def close(): Unit = {
     cleanup(OperationState.CLOSED)
   }
diff --git 
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
 
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
index b8264c0699..0b89848305 100644
--- 
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala
@@ -16,10 +16,14 @@
  */
 package org.apache.kyuubi.engine.jdbc.mysql
 
+import org.scalatest.concurrent.TimeLimits.failAfter
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import 
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE,
 RUNNING_STATE}
 
 class OperationWithEngineSuite extends MySQLOperationSuite with 
HiveJDBCTestHelper {
 
@@ -75,4 +79,25 @@ class OperationWithEngineSuite extends MySQLOperationSuite 
with HiveJDBCTestHelp
       assert(tFetchResultsResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
     }
   }
+
+  test("MySQL - JDBC ExecuteStatement cancel operation should kill SQL 
statement") {
+    failAfter(20.seconds) {
+      withSessionHandle { (client, handle) =>
+        val executeReq = new TExecuteStatementReq()
+        executeReq.setSessionHandle(handle)
+        // The SQL will sleep 120s
+        executeReq.setStatement("SELECT sleep(120)")
+        executeReq.setRunAsync(true)
+        val executeResp = client.ExecuteStatement(executeReq)
+        assert(executeResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
+
+        val operationHandle = executeResp.getOperationHandle
+        waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE))
+
+        val cancelResp = client.CancelOperation(new 
TCancelOperationReq(operationHandle))
+        assert(cancelResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
+        waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE))
+      }
+    }
+  }
 }
diff --git 
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
 
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
index acbc028f89..49cc52b41a 100644
--- 
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala
@@ -16,10 +16,15 @@
  */
 package org.apache.kyuubi.engine.jdbc.starrocks
 
+import scala.concurrent.duration.DurationInt
+
+import org.scalatest.concurrent.TimeLimits.failAfter
+
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import 
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE,
 RUNNING_STATE}
 
 class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with 
HiveJDBCTestHelper {
 
@@ -75,4 +80,25 @@ class StarRocksOperationWithEngineSuite extends 
StarRocksOperationSuite with Hiv
       assert(tFetchResultsResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
     }
   }
+
+  test("StarRocks - JDBC ExecuteStatement cancel operation should kill SQL 
statement") {
+    failAfter(20.seconds) {
+      withSessionHandle { (client, handle) =>
+        val executeReq = new TExecuteStatementReq()
+        executeReq.setSessionHandle(handle)
+        // The SQL will sleep 120s
+        executeReq.setStatement("SELECT sleep(120)")
+        executeReq.setRunAsync(true)
+        val executeResp = client.ExecuteStatement(executeReq)
+        assert(executeResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
+
+        val operationHandle = executeResp.getOperationHandle
+        waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE))
+
+        val cancelResp = client.CancelOperation(new 
TCancelOperationReq(operationHandle))
+        assert(cancelResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
+        waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE))
+      }
+    }
+  }
 }
diff --git 
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
 
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
index 9c229a636c..8aa90e7861 100644
--- 
a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala
@@ -26,7 +26,7 @@ import org.apache.kyuubi.engine.jdbc.WithJdbcServerContainer
 
 trait WithStarRocksContainer extends WithJdbcServerContainer {
 
-  private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.1.6"
+  private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.3.13"
 
   private val STARROCKS_FE_MYSQL_PORT = 9030
   private val STARROCKS_FE_HTTP_PORT = 8030
@@ -47,7 +47,7 @@ trait WithStarRocksContainer extends WithJdbcServerContainer {
       .withStrategy(Wait.forListeningPorts(ports: _*))
       .withStrategy(forLogMessage(".*broker service already added into FE 
service.*", 1))
       .withStrategy(
-        forLogMessage(".*Enjoy the journal to StarRocks blazing-fast 
lake-house engine.*", 1)))
+        forLogMessage(".*Enjoy the journey to StarRocks blazing-fast 
lake-house engine.*", 1)))
 
   protected def feJdbcUrl: String = withContainers { container =>
     val queryServerHost: String = container.host
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
index 02cb9a0030..8d89cd9784 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
@@ -124,11 +124,27 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
   }
 
   def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
-    val req = new TGetOperationStatusReq(op)
-    var state = client.GetOperationStatus(req).getOperationState
-    eventually(timeout(90.seconds), interval(100.milliseconds)) {
-      state = client.GetOperationStatus(req).getOperationState
-      assert(!Set(INITIALIZED_STATE, PENDING_STATE, 
RUNNING_STATE).contains(state))
+    waitForOperationStatusIn(
+      client,
+      op,
+      Set(
+        FINISHED_STATE,
+        CANCELED_STATE,
+        CLOSED_STATE,
+        ERROR_STATE,
+        UKNOWN_STATE,
+        TIMEDOUT_STATE),
+      timeoutMs = 90000)
+  }
+
+  def waitForOperationStatusIn(
+      client: Iface,
+      op: TOperationHandle,
+      status: Set[TOperationState],
+      timeoutMs: Int = 5000): Unit = {
+    eventually(timeout(timeoutMs.milliseconds), interval(100.milliseconds)) {
+      val state = client.GetOperationStatus(new 
TGetOperationStatusReq(op)).getOperationState
+      assert(status.contains(state))
     }
   }
 

Reply via email to