This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a6a366fb40 [Improvement-17697][SqlTask] Support cancel sql task
(#17692)
a6a366fb40 is described below
commit a6a366fb40ba3c3305500343036727c40f784e22
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Nov 21 17:54:32 2025 +0800
[Improvement-17697][SqlTask] Support cancel sql task (#17692)
---
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 43 +++++++++++-----------
1 file changed, 21 insertions(+), 22 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 0a866fc499..5b96c1d004 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -46,6 +46,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -81,12 +82,13 @@ public class SqlTask extends AbstractTask {
*/
private static final int QUERY_LIMIT = 10000;
- private SQLTaskExecutionContext sqlTaskExecutionContext;
-
- public static final int TEST_FLAG_YES = 1;
+ private final SQLTaskExecutionContext sqlTaskExecutionContext;
private final DbType dbType;
+ private Connection sessionConnection;
+ private Statement sessionStatement;
+
public SqlTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskExecutionContext = taskRequest;
@@ -153,6 +155,10 @@ public class SqlTask extends AbstractTask {
setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
} catch (Exception e) {
+ if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) {
+ log.info("sql task has been killed");
+ return;
+ }
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
log.error("sql task error", e);
throw new TaskException("Execute sql task failed", e);
@@ -161,7 +167,14 @@ public class SqlTask extends AbstractTask {
@Override
public void cancel() throws TaskException {
-
+ try {
+ if (sessionStatement != null) {
+ sessionStatement.cancel();
+ }
+ exitStatusCode = TaskConstants.EXIT_CODE_KILL;
+ } catch (Exception e) {
+ throw new TaskException("Cancel sql task failed", e);
+ }
}
/**
@@ -178,7 +191,7 @@ public class SqlTask extends AbstractTask {
Connection connection =
DataSourceClientProvider.getAdHocConnection(DbType.valueOf(sqlParameters.getType()),
baseConnectionParam)) {
-
+ sessionConnection = connection;
// pre execute
executeUpdate(connection, preStatementsBinds, "pre");
@@ -311,8 +324,8 @@ public class SqlTask extends AbstractTask {
String handlerType) throws Exception {
int result = 0;
for (SqlBinds sqlBind : statementsBinds) {
- try (PreparedStatement statement =
prepareStatementAndBind(connection, sqlBind)) {
- result = statement.executeUpdate();
+ try (PreparedStatement tmpStatement =
prepareStatementAndBind(connection, sqlBind)) {
+ result = tmpStatement.executeUpdate();
log.info("{} statement execute update result: {}, for sql:
{}", handlerType, result,
sqlBind.getSql());
}
@@ -320,21 +333,6 @@ public class SqlTask extends AbstractTask {
return String.valueOf(result);
}
- /**
- * close jdbc resource
- *
- * @param connection connection
- */
- private void close(Connection connection) {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- log.error("close connection error : {}", e.getMessage(), e);
- }
- }
- }
-
/**
* preparedStatement bind
*
@@ -363,6 +361,7 @@ public class SqlTask extends AbstractTask {
}
log.info("prepare statement replace sql : {}, sql parameters :
{}", sqlBinds.getSql(),
sqlBinds.getParamsMap());
+ sessionStatement = stmt;
return stmt;
} catch (Exception exception) {
throw new TaskException("SQL task prepareStatementAndBind error",
exception);