This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new 6c819ee [Bug] [dolphinscheduler-server] workflow is always running
when worker sendResult success but master not received (#7610)
6c819ee is described below
commit 6c819ee16d575e62039d98183b93fd3a06c628ed
Author: zwZjut <[email protected]>
AuthorDate: Fri Dec 24 16:04:15 2021 +0800
[Bug] [dolphinscheduler-server] workflow is always running when worker
sendResult success but master not received (#7610)
* [Feature][dolphinscheduler-api] parse traceId in http header for Cross
system delivery to #7237 (#7238)
* to #7237
* rerun test
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 05aef27 and handle conflicts
* to #7065: fix ExecutorService and schedulerService (#7072)
Co-authored-by: honghuo.zw <[email protected]>
* [Feature][dolphinscheduler-api] access control of taskDefinition and
taskInstance in project to #7081 (#7082)
* to #7081
* fix #7081
* to #7081
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 8ebe060 and handle conflicts
* cherry-pick 1f18444 and handle conflicts
* fix #6807: dolphinscheduler.zookeeper.env_vars - >
dolphinscheduler.registry.env_vars (#6808)
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
* add default constructor (#6780)
Co-authored-by: honghuo.zw <[email protected]>
* to #7108 (#7109)
* to #7609
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
---
.../server/worker/processor/DBTaskResponseProcessor.java | 6 ++++--
.../server/worker/processor/TaskCallbackService.java | 9 ++++-----
.../server/worker/processor/TaskKillProcessor.java | 2 ++
3 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index 6da9fdd..f9e206e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -47,13 +47,15 @@ public class DBTaskResponseProcessor implements
NettyRequestProcessor {
DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
command.getBody(), DBTaskResponseCommand.class);
- if (taskResponseCommand == null){
+ if (taskResponseCommand == null) {
return;
}
- if (taskResponseCommand.getStatus() ==
ExecutionStatus.SUCCESS.getCode()){
+ if (taskResponseCommand.getStatus() ==
ExecutionStatus.SUCCESS.getCode()) {
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
logger.debug("removeResponseCache: taskinstance id:{}",
taskResponseCommand.getTaskInstanceId());
+
TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
+ logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
taskResponseCommand.getTaskInstanceId());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 96ec36b..09b2c3a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,7 +124,7 @@ public class TaskCallbackService {
*
* @param taskInstanceId taskInstanceId
*/
- public void remove(int taskInstanceId) {
+ public static void remove(int taskInstanceId) {
REMOTE_CHANNELS.remove(taskInstanceId);
}
@@ -156,7 +155,7 @@ public class TaskCallbackService {
@Override
public void operationComplete(ChannelFuture future) throws
Exception {
if (future.isSuccess()) {
- remove(taskInstanceId);
+ // remove(taskInstanceId);
return;
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index c0ecd67..4341b8c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -99,6 +99,8 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
TaskKillResponseCommand taskKillResponseCommand =
buildKillTaskResponseCommand(killCommand, result);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(),
taskKillResponseCommand.convert2Command());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
+ TaskCallbackService.remove(killCommand.getTaskInstanceId());
+ logger.info("remove REMOTE_CHANNELS, task instance id:{}",
killCommand.getTaskInstanceId());
}
/**