This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 45694aa697 Fix MessageSender may be null due to class initialize late
(#13446)
45694aa697 is described below
commit 45694aa69791d0fa5447ec5e0223b1accbe94ec5
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Jan 28 20:53:32 2023 +0800
Fix MessageSender may be null due to class initialize late (#13446)
---
.../server/worker/message/MessageRetryRunner.java | 21 ++++++++-------------
.../server/worker/rpc/WorkerMessageSender.java | 7 +++----
2 files changed, 11 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index b18cb3bfee..31acf40009 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -29,17 +29,16 @@ import org.apache.commons.collections4.MapUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.PostConstruct;
-
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Component
@@ -53,25 +52,21 @@ public class MessageRetryRunner extends BaseDaemonThread {
private static long MESSAGE_RETRY_WINDOW =
Duration.ofMinutes(5L).toMillis();
+ @Lazy
@Autowired
- private ApplicationContext applicationContext;
+ private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender<BaseCommand>> messageSenderMap =
new HashMap<>();
private Map<Integer, Map<CommandType, BaseCommand>> needToRetryMessages =
new ConcurrentHashMap<>();
- @PostConstruct
- public void init() {
- Map<String, MessageSender> messageSenders =
applicationContext.getBeansOfType(MessageSender.class);
- messageSenders.values().forEach(messageSender -> {
- messageSenderMap.put(messageSender.getMessageType(),
messageSender);
- logger.info("Injected message sender: {}",
messageSender.getClass().getName());
- });
- }
-
@Override
public synchronized void start() {
logger.info("Message retry runner staring");
+ messageSenders.forEach(messageSender -> {
+ messageSenderMap.put(messageSender.getMessageType(),
messageSender);
+ logger.info("Injected message sender: {}",
messageSender.getClass().getName());
+ });
super.start();
logger.info("Message retry runner started");
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
index ae2fba6c7b..393586b799 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.message.MessageSender;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
@@ -34,7 +35,6 @@ import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@Component
@@ -46,14 +46,13 @@ public class WorkerMessageSender {
private MessageRetryRunner messageRetryRunner;
@Autowired
- private ApplicationContext applicationContext;
+ private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
@PostConstruct
public void init() {
- Map<String, MessageSender> messageSenders =
applicationContext.getBeansOfType(MessageSender.class);
- messageSenders.values().forEach(messageSender ->
messageSenderMap.put(messageSender.getMessageType(),
+ messageSenders.forEach(messageSender ->
messageSenderMap.put(messageSender.getMessageType(),
messageSender));
}