This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 812d7a8f267076df595c25765274b8b7853b42fc
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jul 13 20:46:33 2022 +0800

    Fix taskInstance's host is not worker nettyServer address (#10926)
    
    * Fix taskInstance's host is not worker nettyServer address
    
    * Remove unnecessary mock
    
    (cherry picked from commit df0416c1937b8a3a774c6070b958769c48a6a5e0)
---
 .../server/master/processor/TaskExecuteResponseProcessor.java  |  4 +++-
 .../server/master/processor/TaskExecuteRunningProcessor.java   |  4 +++-
 .../server/master/processor/queue/TaskEvent.java               |  9 ++++-----
 .../server/master/processor/queue/TaskResponseServiceTest.java | 10 ++++++----
 4 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index 7f438e1d9c..775aded34a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -60,7 +60,9 @@ public class TaskExecuteResponseProcessor implements 
NettyRequestProcessor {
 
         TaskExecuteResultCommand taskExecuteResultMessage = 
JSONUtils.parseObject(command.getBody(),
                                                                                
   TaskExecuteResultCommand.class);
-        TaskEvent taskResultEvent = 
TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
+        TaskEvent taskResultEvent = 
TaskEvent.newResultEvent(taskExecuteResultMessage,
+                                                             channel,
+                                                             
taskExecuteResultMessage.getMessageSenderAddress());
         try {
             
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
                                                         
taskResultEvent.getTaskInstanceId());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
index 96ff1ca405..47a66a71f0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
@@ -57,7 +57,9 @@ public class TaskExecuteRunningProcessor implements 
NettyRequestProcessor {
         TaskExecuteRunningCommand taskExecuteRunningMessage = 
JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
         logger.info("taskExecuteRunningCommand: {}", 
taskExecuteRunningMessage);
 
-        TaskEvent taskEvent = 
TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
+        TaskEvent taskEvent = 
TaskEvent.newRunningEvent(taskExecuteRunningMessage,
+                                                        channel,
+                                                        
taskExecuteRunningMessage.getMessageSenderAddress());
         taskEventService.addEvent(taskEvent);
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 7db0bc6696..797ccc8a7c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
-import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 
 import java.util.Date;
 
@@ -105,7 +104,7 @@ public class TaskEvent {
         return event;
     }
 
-    public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, 
Channel channel) {
+    public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, 
Channel channel, String workerAddress) {
         TaskEvent event = new TaskEvent();
         event.setProcessInstanceId(command.getProcessInstanceId());
         event.setTaskInstanceId(command.getTaskInstanceId());
@@ -114,12 +113,12 @@ public class TaskEvent {
         event.setExecutePath(command.getExecutePath());
         event.setLogPath(command.getLogPath());
         event.setChannel(channel);
-        event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+        event.setWorkerAddress(workerAddress);
         event.setEvent(TaskEventType.RUNNING);
         return event;
     }
 
-    public static TaskEvent newResultEvent(TaskExecuteResultCommand command, 
Channel channel) {
+    public static TaskEvent newResultEvent(TaskExecuteResultCommand command, 
Channel channel, String workerAddress) {
         TaskEvent event = new TaskEvent();
         event.setProcessInstanceId(command.getProcessInstanceId());
         event.setTaskInstanceId(command.getTaskInstanceId());
@@ -132,7 +131,7 @@ public class TaskEvent {
         event.setAppIds(command.getAppIds());
         event.setVarPool(command.getVarPool());
         event.setChannel(channel);
-        event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+        event.setWorkerAddress(workerAddress);
         event.setEvent(TaskEventType.RESULT);
         return event;
     }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 3854ad77b0..6f4fe9ceb7 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -75,8 +75,6 @@ public class TaskResponseServiceTest {
     public void before() {
         taskEventService.start();
 
-        
Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1",
 1234));
-
         TaskExecuteRunningCommand taskExecuteRunningMessage = new 
TaskExecuteRunningCommand("127.0.0.1:5678",
                                                                                
             "127.0.0.1:1234",
                                                                                
             System.currentTimeMillis());
@@ -88,7 +86,9 @@ public class TaskResponseServiceTest {
         taskExecuteRunningMessage.setHost("127.*.*.*");
         taskExecuteRunningMessage.setStartTime(new Date());
 
-        ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, 
channel);
+        ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
+                                             channel,
+                                             
taskExecuteRunningMessage.getMessageSenderAddress());
 
         TaskExecuteResultCommand taskExecuteResultMessage = new 
TaskExecuteResultCommand(NetUtils.getAddr(1234),
                                                                                
          NetUtils.getAddr(5678),
@@ -100,7 +100,9 @@ public class TaskResponseServiceTest {
         taskExecuteResultMessage.setVarPool("varPol");
         taskExecuteResultMessage.setAppIds("ids");
         taskExecuteResultMessage.setProcessId(1);
-        resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, 
channel);
+        resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
+                                               channel,
+                                               
taskExecuteResultMessage.getMessageSenderAddress());
 
         taskInstance = new TaskInstance();
         taskInstance.setId(22);

Reply via email to