This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 3.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.4-prepare by this push:
new 299619c511 cherry-pick Fix MessageSender may be null due to class
initialize late #13446
299619c511 is described below
commit 299619c51161a95ff048aebdee4e669ee00a123f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Jan 28 20:53:32 2023 +0800
cherry-pick Fix MessageSender may be null due to class initialize late
#13446
---
.../server/worker/message/MessageRetryRunner.java | 22 +++++++++-------------
.../server/worker/rpc/WorkerMessageSender.java | 9 ++++-----
2 files changed, 13 insertions(+), 18 deletions(-)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index 8cf0550ba2..356bf42ce8 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -30,17 +30,17 @@ import org.apache.commons.collections.MapUtils;
import java.time.Duration;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.PostConstruct;
-
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@@ -60,25 +60,21 @@ public class MessageRetryRunner extends BaseDaemonThread {
private static long MESSAGE_RETRY_WINDOW =
Duration.ofMinutes(5L).toMillis();
+ @Lazy
@Autowired
- private ApplicationContext applicationContext;
+ private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender<BaseCommand>> messageSenderMap =
new HashMap<>();
private Map<Integer, Map<CommandType, BaseCommand>> needToRetryMessages =
new ConcurrentHashMap<>();
- @PostConstruct
- public void init() {
- Map<String, MessageSender> messageSenders =
applicationContext.getBeansOfType(MessageSender.class);
- messageSenders.values().forEach(messageSender -> {
- messageSenderMap.put(messageSender.getMessageType(),
messageSender);
- logger.info("Injected message sender: {}",
messageSender.getClass().getName());
- });
- }
-
@Override
public synchronized void start() {
logger.info("Message retry runner staring");
+ messageSenders.forEach(messageSender -> {
+ messageSenderMap.put(messageSender.getMessageType(),
messageSender);
+ logger.info("Injected message sender: {}",
messageSender.getClass().getName());
+ });
super.start();
logger.info("Message retry runner started");
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
index 5a2752ca34..393dd78b4d 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.message.MessageSender;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
@@ -32,7 +33,6 @@ import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import lombok.NonNull;
@@ -46,15 +46,14 @@ public class WorkerMessageSender {
private MessageRetryRunner messageRetryRunner;
@Autowired
- private ApplicationContext applicationContext;
+ private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
@PostConstruct
public void init() {
- Map<String, MessageSender> messageSenders =
applicationContext.getBeansOfType(MessageSender.class);
- messageSenders.values().forEach(messageSender ->
messageSenderMap.put(messageSender.getMessageType(),
-
messageSender));
+ messageSenders.forEach(messageSender ->
messageSenderMap.put(messageSender.getMessageType(),
+ messageSender));
}
// todo: use message rather than context