This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
new f034a09d25 [Bug-11650][worker] #11650 fix SQL type task, stop task
cause NPE (#11668) (#11958)
f034a09d25 is described below
commit f034a09d25188ed432572780041f804ad8e4f2c2
Author: caishunfeng <[email protected]>
AuthorDate: Thu Sep 15 14:24:34 2022 +0800
[Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668)
(#11958)
Co-authored-by: 冯剑 <[email protected]>
---
.../server/worker/processor/TaskKillProcessor.java | 39 ++++++++++++++--------
1 file changed, 26 insertions(+), 13 deletions(-)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 3aadb5a5cc..d73cc5b3db 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+
+import io.micrometer.core.lang.NonNull;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -43,6 +43,11 @@ import
org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -78,7 +83,7 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST ==
command.getType(),
- String.format("invalid command type : %s", command.getType()));
+ String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand =
JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
if (killCommand == null) {
logger.error("task kill request command is null");
@@ -88,7 +93,7 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext =
-
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}",
killCommand.getTaskInstanceId());
return;
@@ -110,7 +115,7 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
- result.getLeft() ? TaskExecutionStatus.SUCCESS :
TaskExecutionStatus.FAILURE);
+ result.getLeft() ? TaskExecutionStatus.SUCCESS :
TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA,
result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
@@ -123,7 +128,9 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
private void sendTaskKillResponseCommand(Channel channel,
TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = new
TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus());
-
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+ if (taskExecutionContext.getAppIds() != null) {
+
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+ }
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
@@ -138,14 +145,20 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
});
}
+ /**
+ * do kill
+ *
+ * @return kill result
+ */
private Pair<Boolean, List<String>> doKill(TaskExecutionContext
taskExecutionContext) {
// kill system process
boolean processFlag =
killProcess(taskExecutionContext.getTenantCode(),
taskExecutionContext.getProcessId());
+
// find log and kill yarn job
Pair<Boolean, List<String>> yarnResult =
killYarnJob(Host.of(taskExecutionContext.getHost()),
- taskExecutionContext.getLogPath(),
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTenantCode());
+ taskExecutionContext.getLogPath(),
+ taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getTenantCode());
return Pair.of(processFlag && yarnResult.getLeft(),
yarnResult.getRight());
}
@@ -200,8 +213,8 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
/**
* kill yarn job
*
- * @param host host
- * @param logPath logPath
+ * @param host host
+ * @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result
@@ -212,7 +225,7 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
String tenantCode) {
if (logPath == null || executePath == null || tenantCode == null) {
logger.error("Kill yarn job error, the input params is illegal,
host: {}, logPath: {}, executePath: {}, tenantCode: {}",
- host, logPath, executePath, tenantCode);
+ host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList());
}
try (LogClientService logClient = new LogClientService()) {
@@ -233,4 +246,4 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
return Pair.of(false, Collections.emptyList());
}
-}
+}
\ No newline at end of file