This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f8a44ff719 [Bug][Master] send ACK event timeout (#15346)
f8a44ff719 is described below
commit f8a44ff7192f7dc998f7568b229b48a2bf7ee769
Author: Gallardot <[email protected]>
AuthorDate: Thu Dec 21 18:54:44 2023 +0800
[Bug][Master] send ACK event timeout (#15346)
---
README.md | 5 ++---
README_zh_CN.md | 4 ++--
.../master/event/TaskResultEventHandler.java | 22 ++++++++++++++++------
3 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/README.md b/README.md
index d30b55d95d..d6ce0e2f9a 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
[](https://www.apache.org/licenses/LICENSE-2.0.html)

[](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler)
-[](https://twitter.com/dolphinschedule)
+[](https://twitter.com/dolphinschedule)
<!-- markdown-link-check-disable-line -->
[](https://s.apache.org/dolphinscheduler-slack)
[](README_zh_CN.md)
@@ -65,7 +65,7 @@ find the good first issue in
[here](https://github.com/apache/dolphinscheduler/c
Welcome to join the Apache DolphinScheduler community by:
- Join the [DolphinScheduler
Slack](https://s.apache.org/dolphinscheduler-slack) to keep in touch with the
community
-- Follow the [DolphinScheduler Twitter](https://twitter.com/dolphinschedule)
and get the latest news
+- Follow the [DolphinScheduler Twitter](https://twitter.com/dolphinschedule)
and get the latest news <!-- markdown-link-check-disable-line -->
- Subscribe DolphinScheduler mail list, [email protected] for
user and [email protected] for developer
# Landscapes
@@ -75,5 +75,4 @@ Welcome to join the Apache DolphinScheduler community by:
<img src="https://landscape.cncf.io/images/left-logo.svg"
width="150"/> <img
src="https://landscape.cncf.io/images/right-logo.svg" width="200"/>
<br/><br/>
DolphinScheduler enriches the <a
href="https://landscape.cncf.io/?landscape=observability-and-analysis&license=apache-license-2-0">CNCF
CLOUD NATIVE Landscape.</a >
-
</p >
diff --git a/README_zh_CN.md b/README_zh_CN.md
index 2e955811e2..dcadcc6427 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -3,7 +3,7 @@
[](https://www.apache.org/licenses/LICENSE-2.0.html)
[]()
[](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler)
-[](https://twitter.com/dolphinschedule)
+[](https://twitter.com/dolphinschedule)
<!-- markdown-link-check-disable-line -->
[](https://s.apache.org/dolphinscheduler-slack)
[](README.md)
@@ -61,7 +61,7 @@ DolphinScheduler 的主要特性如下:
欢迎通过以方式加入社区:
- 加入 [DolphinScheduler Slack](https://s.apache.org/dolphinscheduler-slack)
-- 关注 [DolphinScheduler Twitter](https://twitter.com/dolphinschedule) 来获取最新消息
+- 关注 [DolphinScheduler Twitter](https://twitter.com/dolphinschedule) 来获取最新消息
<!-- markdown-link-check-disable-line -->
- 订阅 DolphinScheduler 邮件列表, 用户订阅 [email protected] 开发者请订阅
[email protected]
# Landscapes
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index 854812264f..f3d3a7480a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -35,10 +35,13 @@ import
org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
+@Slf4j
public class TaskResultEventHandler implements TaskEventHandler {
@Autowired
@@ -99,11 +102,13 @@ public class TaskResultEventHandler implements
TaskEventHandler {
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
taskInstanceDao.updateById(taskInstance);
- sendAckToWorker(taskEvent);
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
throw new TaskEventHandleError("Handle task result event error,
save taskInstance to db error", ex);
}
+
+ sendAckToWorker(taskEvent);
+
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(taskEvent.getProcessInstanceId())
.taskInstanceId(taskEvent.getTaskInstanceId())
@@ -115,11 +120,16 @@ public class TaskResultEventHandler implements
TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
- ITaskInstanceExecutionEventAckListener
instanceExecutionEventAckListener =
- SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskEvent.getWorkerAddress(),
ITaskInstanceExecutionEventAckListener.class);
-
instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck(
-
TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId()));
+ try {
+ ITaskInstanceExecutionEventAckListener
instanceExecutionEventAckListener =
+ SingletonJdkDynamicRpcClientProxyFactory
+ .getProxyClient(taskEvent.getWorkerAddress(),
ITaskInstanceExecutionEventAckListener.class);
+
instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck(
+
TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId()));
+ } catch (Exception e) {
+ // master ignore the exception, worker will retry to send this
TaskEventType.RESULT event again.
+ log.warn("send ack to worker error, taskInstanceId: {}",
taskEvent.getTaskInstanceId(), e);
+ }
}
@Override