This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a6a366fb40 [Improvement-17697][SqlTask] Support cancel sql task 
(#17692)
a6a366fb40 is described below

commit a6a366fb40ba3c3305500343036727c40f784e22
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Nov 21 17:54:32 2025 +0800

    [Improvement-17697][SqlTask] Support cancel sql task (#17692)
---
 .../dolphinscheduler/plugin/task/sql/SqlTask.java  | 43 +++++++++++-----------
 1 file changed, 21 insertions(+), 22 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 0a866fc499..5b96c1d004 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -46,6 +46,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -81,12 +82,13 @@ public class SqlTask extends AbstractTask {
      */
     private static final int QUERY_LIMIT = 10000;
 
-    private SQLTaskExecutionContext sqlTaskExecutionContext;
-
-    public static final int TEST_FLAG_YES = 1;
+    private final SQLTaskExecutionContext sqlTaskExecutionContext;
 
     private final DbType dbType;
 
+    private Connection sessionConnection;
+    private Statement sessionStatement;
+
     public SqlTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
         this.taskExecutionContext = taskRequest;
@@ -153,6 +155,10 @@ public class SqlTask extends AbstractTask {
             setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
 
         } catch (Exception e) {
+            if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) {
+                log.info("sql task has been killed");
+                return;
+            }
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
             log.error("sql task error", e);
             throw new TaskException("Execute sql task failed", e);
@@ -161,7 +167,14 @@ public class SqlTask extends AbstractTask {
 
     @Override
     public void cancel() throws TaskException {
-
+        try {
+            if (sessionStatement != null) {
+                sessionStatement.cancel();
+            }
+            exitStatusCode = TaskConstants.EXIT_CODE_KILL;
+        } catch (Exception e) {
+            throw new TaskException("Cancel sql task failed", e);
+        }
     }
 
     /**
@@ -178,7 +191,7 @@ public class SqlTask extends AbstractTask {
                 Connection connection =
                         
DataSourceClientProvider.getAdHocConnection(DbType.valueOf(sqlParameters.getType()),
                                 baseConnectionParam)) {
-
+            sessionConnection = connection;
             // pre execute
             executeUpdate(connection, preStatementsBinds, "pre");
 
@@ -311,8 +324,8 @@ public class SqlTask extends AbstractTask {
                                  String handlerType) throws Exception {
         int result = 0;
         for (SqlBinds sqlBind : statementsBinds) {
-            try (PreparedStatement statement = 
prepareStatementAndBind(connection, sqlBind)) {
-                result = statement.executeUpdate();
+            try (PreparedStatement tmpStatement = 
prepareStatementAndBind(connection, sqlBind)) {
+                result = tmpStatement.executeUpdate();
                 log.info("{} statement execute update result: {}, for sql: 
{}", handlerType, result,
                         sqlBind.getSql());
             }
@@ -320,21 +333,6 @@ public class SqlTask extends AbstractTask {
         return String.valueOf(result);
     }
 
-    /**
-     * close jdbc resource
-     *
-     * @param connection connection
-     */
-    private void close(Connection connection) {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                log.error("close connection error : {}", e.getMessage(), e);
-            }
-        }
-    }
-
     /**
      * preparedStatement bind
      *
@@ -363,6 +361,7 @@ public class SqlTask extends AbstractTask {
             }
             log.info("prepare statement replace sql : {}, sql parameters : 
{}", sqlBinds.getSql(),
                     sqlBinds.getParamsMap());
+            sessionStatement = stmt;
             return stmt;
         } catch (Exception exception) {
             throw new TaskException("SQL task prepareStatementAndBind error", 
exception);

Reply via email to