This is an automated email from the ASF dual-hosted git repository.

weiraowang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3efcf0c5ac Fix message in MessageRetryRunner might disorder (#14725)
3efcf0c5ac is described below

commit 3efcf0c5acc55df05b09187e17042f7552847b04
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Aug 9 20:19:02 2023 +0800

    Fix message in MessageRetryRunner might disorder (#14725)
---
 .../server/worker/message/MessageRetryRunner.java  | 79 ++++++++++++++++------
 1 file changed, 60 insertions(+), 19 deletions(-)

diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index 830f470a44..44befbe31d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -27,12 +27,15 @@ import 
org.apache.dolphinscheduler.remote.command.MessageType;
 import org.apache.commons.collections4.MapUtils;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import lombok.Data;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
@@ -40,6 +43,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
+import com.google.common.base.Objects;
+
 @Component
 @Slf4j
 public class MessageRetryRunner extends BaseDaemonThread {
@@ -48,15 +53,15 @@ public class MessageRetryRunner extends BaseDaemonThread {
         super("WorkerMessageRetryRunnerThread");
     }
 
-    private static long MESSAGE_RETRY_WINDOW = 
Duration.ofMinutes(5L).toMillis();
+    private static final long MESSAGE_RETRY_WINDOW = 
Duration.ofMinutes(5L).toMillis();
 
     @Lazy
     @Autowired
     private List<MessageSender> messageSenders;
 
-    private Map<MessageType, MessageSender<BaseMessage>> messageSenderMap = 
new HashMap<>();
+    private final Map<MessageType, MessageSender<BaseMessage>> 
messageSenderMap = new HashMap<>();
 
-    private Map<Integer, Map<MessageType, BaseMessage>> needToRetryMessages = 
new ConcurrentHashMap<>();
+    private final Map<Integer, List<TaskInstanceMessage>> needToRetryMessages 
= new ConcurrentHashMap<>();
 
     @Override
     public synchronized void start() {
@@ -70,14 +75,14 @@ public class MessageRetryRunner extends BaseDaemonThread {
     }
 
     public void addRetryMessage(int taskInstanceId, @NonNull MessageType 
messageType, BaseMessage baseMessage) {
-        needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new 
ConcurrentHashMap<>()).put(messageType,
-                baseMessage);
+        needToRetryMessages.computeIfAbsent(taskInstanceId, k -> 
Collections.synchronizedList(new ArrayList<>()))
+                .add(TaskInstanceMessage.of(taskInstanceId, messageType, 
baseMessage));
     }
 
     public void removeRetryMessage(int taskInstanceId, @NonNull MessageType 
messageType) {
-        Map<MessageType, BaseMessage> retryMessages = 
needToRetryMessages.get(taskInstanceId);
-        if (retryMessages != null) {
-            retryMessages.remove(messageType);
+        List<TaskInstanceMessage> taskInstanceMessages = 
needToRetryMessages.get(taskInstanceId);
+        if (taskInstanceMessages != null) {
+            taskInstanceMessages.remove(TaskInstanceMessage.of(taskInstanceId, 
messageType, null));
         }
     }
 
@@ -86,10 +91,10 @@ public class MessageRetryRunner extends BaseDaemonThread {
     }
 
     public void updateMessageHost(int taskInstanceId, String 
messageReceiverHost) {
-        Map<MessageType, BaseMessage> needToRetryMessages = 
this.needToRetryMessages.get(taskInstanceId);
-        if (needToRetryMessages != null) {
-            needToRetryMessages.values().forEach(baseMessage -> {
-                baseMessage.setMessageReceiverAddress(messageReceiverHost);
+        List<TaskInstanceMessage> taskInstanceMessages = 
this.needToRetryMessages.get(taskInstanceId);
+        if (taskInstanceMessages != null) {
+            taskInstanceMessages.forEach(taskInstanceMessage -> {
+                
taskInstanceMessage.getMessage().setMessageReceiverAddress(messageReceiverHost);
             });
         }
     }
@@ -102,21 +107,21 @@ public class MessageRetryRunner extends BaseDaemonThread {
                 }
 
                 long now = System.currentTimeMillis();
-                Iterator<Map.Entry<Integer, Map<MessageType, BaseMessage>>> 
iterator =
+                Iterator<Map.Entry<Integer, List<TaskInstanceMessage>>> 
iterator =
                         needToRetryMessages.entrySet().iterator();
                 while (iterator.hasNext()) {
-                    Map.Entry<Integer, Map<MessageType, BaseMessage>> 
taskEntry = iterator.next();
+                    Map.Entry<Integer, List<TaskInstanceMessage>> taskEntry = 
iterator.next();
                     Integer taskInstanceId = taskEntry.getKey();
-                    Map<MessageType, BaseMessage> retryMessageMap = 
taskEntry.getValue();
-                    if (retryMessageMap.isEmpty()) {
+                    List<TaskInstanceMessage> taskInstanceMessages = 
taskEntry.getValue();
+                    if (taskInstanceMessages.isEmpty()) {
                         iterator.remove();
                         continue;
                     }
                     LogUtils.setTaskInstanceIdMDC(taskInstanceId);
                     try {
-                        for (Map.Entry<MessageType, BaseMessage> messageEntry 
: retryMessageMap.entrySet()) {
-                            MessageType messageType = messageEntry.getKey();
-                            BaseMessage message = messageEntry.getValue();
+                        for (TaskInstanceMessage taskInstanceMessage : 
taskInstanceMessages) {
+                            MessageType messageType = 
taskInstanceMessage.getMessageType();
+                            BaseMessage message = 
taskInstanceMessage.getMessage();
                             if (now - message.getMessageSendTime() > 
MESSAGE_RETRY_WINDOW) {
                                 log.info("Begin retry send message to master, 
message: {}", message);
                                 message.setMessageSendTime(now);
@@ -144,4 +149,40 @@ public class MessageRetryRunner extends BaseDaemonThread {
     public void clearMessage() {
         needToRetryMessages.clear();
     }
+
+    /**
+     * If two message has the same taskInstanceId and messageType they will be 
considered as the same message
+     */
+    @Data
+    public static class TaskInstanceMessage {
+
+        private long taskInstanceId;
+        private MessageType messageType;
+        private BaseMessage message;
+
+        public static TaskInstanceMessage of(long taskInstanceId, MessageType 
messageType, BaseMessage message) {
+            TaskInstanceMessage taskInstanceMessage = new 
TaskInstanceMessage();
+            taskInstanceMessage.setTaskInstanceId(taskInstanceId);
+            taskInstanceMessage.setMessageType(messageType);
+            taskInstanceMessage.setMessage(message);
+            return taskInstanceMessage;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TaskInstanceMessage that = (TaskInstanceMessage) o;
+            return taskInstanceId == that.taskInstanceId && messageType == 
that.messageType;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(taskInstanceId, messageType);
+        }
+    }
 }

Reply via email to