This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new dcebf2648a [Improvement-17695][ProcedureTask] Support cancel procedure
task (#17696)
dcebf2648a is described below
commit dcebf2648a20b6412308e65c9ced5b054e5ff35a
Author: njnu-seafish <[email protected]>
AuthorDate: Thu Jan 22 18:00:41 2026 +0800
[Improvement-17695][ProcedureTask] Support cancel procedure task (#17696)
---
.../plugin/task/procedure/ProcedureTask.java | 35 +++++++++++++++++-----
1 file changed, 28 insertions(+), 7 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index d0b42aeecd..bca2d4bbf7 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
@@ -60,6 +61,8 @@ public class ProcedureTask extends AbstractTask {
private final ProcedureTaskExecutionContext procedureTaskExecutionContext;
+ private volatile Statement sessionStatement;
+
/**
* constructor
*
@@ -105,30 +108,48 @@ public class ProcedureTask extends AbstractTask {
}
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
- try (CallableStatement stmt =
connection.prepareCall(proceduerSql)) {
+ try (CallableStatement tmpStatement =
connection.prepareCall(proceduerSql)) {
+ sessionStatement = tmpStatement;
// set timeout
- setTimeout(stmt);
+ setTimeout(tmpStatement);
// outParameterMap
- Map<Integer, Property> outParameterMap =
getOutParameterMap(stmt, sqlParamsMap, paramsMap);
+ Map<Integer, Property> outParameterMap =
getOutParameterMap(tmpStatement, sqlParamsMap, paramsMap);
- stmt.executeUpdate();
+ tmpStatement.executeUpdate();
// print the output parameters to the log
- printOutParameter(stmt, outParameterMap);
+ printOutParameter(tmpStatement, outParameterMap);
setExitStatusCode(EXIT_CODE_SUCCESS);
}
} catch (Exception e) {
+ if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) {
+ log.info("This procedure task has been killed");
+ return;
+ }
setExitStatusCode(EXIT_CODE_FAILURE);
- log.error("procedure task error", e);
+ log.error("Failed to execute this procedure task", e);
throw new TaskException("Execute procedure task failed", e);
}
}
@Override
public void cancel() throws TaskException {
-
+ if (sessionStatement != null) {
+ try {
+ log.info("Try to cancel this procedure task");
+ sessionStatement.cancel();
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ log.info("This procedure task was canceled");
+ } catch (Exception ex) {
+ log.warn("Failed to cancel this procedure task", ex);
+ throw new TaskException("Cancel this procedure task failed",
ex);
+ }
+ } else {
+ log.info(
+ "Attempted to cancel this procedure task, but no active
statement exists. Possible reasons: task not started, already completed, or
canceled.");
+ }
}
private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String,
Property> paramsMap) {