This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new df0416c193 Fix taskInstance's host is not worker nettyServer address
(#10926)
df0416c193 is described below
commit df0416c1937b8a3a774c6070b958769c48a6a5e0
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jul 13 20:46:33 2022 +0800
Fix taskInstance's host is not worker nettyServer address (#10926)
* Fix taskInstance's host is not worker nettyServer address
* Remove unnecessary mock
---
.../server/master/processor/TaskExecuteResponseProcessor.java | 4 +++-
.../server/master/processor/TaskExecuteRunningProcessor.java | 4 +++-
.../server/master/processor/queue/TaskEvent.java | 9 ++++-----
.../server/master/processor/queue/TaskResponseServiceTest.java | 10 ++++++----
4 files changed, 16 insertions(+), 11 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index 7f438e1d9c..775aded34a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -60,7 +60,9 @@ public class TaskExecuteResponseProcessor implements
NettyRequestProcessor {
TaskExecuteResultCommand taskExecuteResultMessage =
JSONUtils.parseObject(command.getBody(),
TaskExecuteResultCommand.class);
- TaskEvent taskResultEvent =
TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
+ TaskEvent taskResultEvent =
TaskEvent.newResultEvent(taskExecuteResultMessage,
+ channel,
+
taskExecuteResultMessage.getMessageSenderAddress());
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
index 96ff1ca405..47a66a71f0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
@@ -57,7 +57,9 @@ public class TaskExecuteRunningProcessor implements
NettyRequestProcessor {
TaskExecuteRunningCommand taskExecuteRunningMessage =
JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}",
taskExecuteRunningMessage);
- TaskEvent taskEvent =
TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
+ TaskEvent taskEvent =
TaskEvent.newRunningEvent(taskExecuteRunningMessage,
+ channel,
+
taskExecuteRunningMessage.getMessageSenderAddress());
taskEventService.addEvent(taskEvent);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index e383cad612..a2cee4986c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -22,7 +22,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
-import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
@@ -106,7 +105,7 @@ public class TaskEvent {
return event;
}
- public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command,
Channel channel) {
+ public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command,
Channel channel, String workerAddress) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
@@ -115,12 +114,12 @@ public class TaskEvent {
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
event.setChannel(channel);
- event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+ event.setWorkerAddress(workerAddress);
event.setEvent(TaskEventType.RUNNING);
return event;
}
- public static TaskEvent newResultEvent(TaskExecuteResultCommand command,
Channel channel) {
+ public static TaskEvent newResultEvent(TaskExecuteResultCommand command,
Channel channel, String workerAddress) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
@@ -133,7 +132,7 @@ public class TaskEvent {
event.setAppIds(command.getAppIds());
event.setVarPool(command.getVarPool());
event.setChannel(channel);
- event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+ event.setWorkerAddress(workerAddress);
event.setEvent(TaskEventType.RESULT);
return event;
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 3854ad77b0..6f4fe9ceb7 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -75,8 +75,6 @@ public class TaskResponseServiceTest {
public void before() {
taskEventService.start();
-
Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1",
1234));
-
TaskExecuteRunningCommand taskExecuteRunningMessage = new
TaskExecuteRunningCommand("127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
@@ -88,7 +86,9 @@ public class TaskResponseServiceTest {
taskExecuteRunningMessage.setHost("127.*.*.*");
taskExecuteRunningMessage.setStartTime(new Date());
- ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
channel);
+ ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
+ channel,
+
taskExecuteRunningMessage.getMessageSenderAddress());
TaskExecuteResultCommand taskExecuteResultMessage = new
TaskExecuteResultCommand(NetUtils.getAddr(1234),
NetUtils.getAddr(5678),
@@ -100,7 +100,9 @@ public class TaskResponseServiceTest {
taskExecuteResultMessage.setVarPool("varPol");
taskExecuteResultMessage.setAppIds("ids");
taskExecuteResultMessage.setProcessId(1);
- resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel);
+ resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
+ channel,
+
taskExecuteResultMessage.getMessageSenderAddress());
taskInstance = new TaskInstance();
taskInstance.setId(22);