This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new dcebf2648a [Improvement-17695][ProcedureTask] Support cancel procedure 
task (#17696)
dcebf2648a is described below

commit dcebf2648a20b6412308e65c9ced5b054e5ff35a
Author: njnu-seafish <[email protected]>
AuthorDate: Thu Jan 22 18:00:41 2026 +0800

    [Improvement-17695][ProcedureTask] Support cancel procedure task (#17696)
---
 .../plugin/task/procedure/ProcedureTask.java       | 35 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index d0b42aeecd..bca2d4bbf7 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
 import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
 import java.util.HashMap;
 import java.util.Map;
@@ -60,6 +61,8 @@ public class ProcedureTask extends AbstractTask {
 
     private final ProcedureTaskExecutionContext procedureTaskExecutionContext;
 
+    private volatile Statement sessionStatement;
+
     /**
      * constructor
      *
@@ -105,30 +108,48 @@ public class ProcedureTask extends AbstractTask {
             }
             String proceduerSql = formatSql(sqlParamsMap, paramsMap);
             // call method
-            try (CallableStatement stmt = 
connection.prepareCall(proceduerSql)) {
+            try (CallableStatement tmpStatement = 
connection.prepareCall(proceduerSql)) {
+                sessionStatement = tmpStatement;
                 // set timeout
-                setTimeout(stmt);
+                setTimeout(tmpStatement);
 
                 // outParameterMap
-                Map<Integer, Property> outParameterMap = 
getOutParameterMap(stmt, sqlParamsMap, paramsMap);
+                Map<Integer, Property> outParameterMap = 
getOutParameterMap(tmpStatement, sqlParamsMap, paramsMap);
 
-                stmt.executeUpdate();
+                tmpStatement.executeUpdate();
 
                 // print the output parameters to the log
-                printOutParameter(stmt, outParameterMap);
+                printOutParameter(tmpStatement, outParameterMap);
 
                 setExitStatusCode(EXIT_CODE_SUCCESS);
             }
         } catch (Exception e) {
+            if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) {
+                log.info("This procedure task has been killed");
+                return;
+            }
             setExitStatusCode(EXIT_CODE_FAILURE);
-            log.error("procedure task error", e);
+            log.error("Failed to execute this procedure task", e);
             throw new TaskException("Execute procedure task failed", e);
         }
     }
 
     @Override
     public void cancel() throws TaskException {
-
+        if (sessionStatement != null) {
+            try {
+                log.info("Try to cancel this procedure task");
+                sessionStatement.cancel();
+                setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+                log.info("This procedure task was canceled");
+            } catch (Exception ex) {
+                log.warn("Failed to cancel this procedure task", ex);
+                throw new TaskException("Cancel this procedure task failed", 
ex);
+            }
+        } else {
+            log.info(
+                    "Attempted to cancel this procedure task, but no active 
statement exists. Possible reasons: task not started, already completed, or 
canceled.");
+        }
     }
 
     private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, 
Property> paramsMap) {

Reply via email to