This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 37f25ab [ISSUE #632 ] Fix NPE caused by using @
ExtRocketMQTemplateConfiguration annotation extension to send messages in v5
37f25ab is described below
commit 37f25ab6343ab0a4d03f5177fbdcc4e699ba763c
Author: lilinjiang <[email protected]>
AuthorDate: Mon Mar 4 14:17:28 2024 +0800
[ISSUE #632 ] Fix NPE caused by using @ ExtRocketMQTemplateConfiguration
annotation extension to send messages in v5
Co-authored-by: lilinjiang <[email protected]>
---
.../RocketMQMessageListenerBeanPostProcessor.java | 33 +++++++++++++++++++++-
.../ListenerContainerConfiguration.java | 26 ++++++++++++-----
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
index 61f3e1d..53d5cab 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.SmartLifecycle;
import org.springframework.core.OrderComparator;
import org.springframework.core.annotation.AnnotationUtils;
@@ -32,7 +33,7 @@ import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
-public class RocketMQMessageListenerBeanPostProcessor implements
ApplicationContextAware, BeanPostProcessor, InitializingBean {
+public class RocketMQMessageListenerBeanPostProcessor implements
ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {
private ApplicationContext applicationContext;
@@ -40,6 +41,8 @@ public class RocketMQMessageListenerBeanPostProcessor
implements ApplicationCont
private ListenerContainerConfiguration listenerContainerConfiguration;
+ private boolean running = false;
+
@Override
public Object postProcessBeforeInitialization(Object bean, String
beanName) throws BeansException {
return bean;
@@ -58,6 +61,34 @@ public class RocketMQMessageListenerBeanPostProcessor
implements ApplicationCont
return bean;
}
+ @Override
+ public int getPhase() {
+ return Integer.MAX_VALUE - 2000;
+ }
+
+ @Override
+ public void start() {
+ if (!isRunning()) {
+ this.setRunning(true);
+ listenerContainerConfiguration.startContainer();
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = applicationContext;
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
index 81c5b09..bfbb7f9 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
@@ -32,6 +32,8 @@ import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.Assert;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@Configuration
@@ -48,6 +50,8 @@ public class ListenerContainerConfiguration implements
ApplicationContextAware {
private RocketMQMessageConverter rocketMQMessageConverter;
+ private final List<DefaultListenerContainer> containers = new
ArrayList<>();
+
public ListenerContainerConfiguration(RocketMQMessageConverter
rocketMQMessageConverter,
ConfigurableEnvironment environment,
RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
@@ -68,15 +72,23 @@ public class ListenerContainerConfiguration implements
ApplicationContextAware {
genericApplicationContext.registerBean(containerBeanName,
DefaultListenerContainer.class, () ->
createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultListenerContainer container =
genericApplicationContext.getBean(containerBeanName,
DefaultListenerContainer.class);
- if (!container.isRunning()) {
- try {
- container.start();
- } catch (Exception e) {
- log.error("Started container failed. {}", container, e);
- throw new RuntimeException(e);
+
+ containers.add(container);
+
+ log.info("Register the listener to container, listenerBeanName:{},
containerBeanName:{}", beanName, containerBeanName);
+ }
+
+ public void startContainer() {
+ for (DefaultListenerContainer container : containers) {
+ if (!container.isRunning()) {
+ try {
+ container.start();
+ } catch (Exception e) {
+ log.error("Started container failed. {}", container, e);
+ throw new RuntimeException(e);
+ }
}
}
- log.info("Register the listener to container, listenerBeanName:{},
containerBeanName:{}", beanName, containerBeanName);
}
private DefaultListenerContainer createRocketMQListenerContainer(String
name, Object bean, RocketMQMessageListener annotation) {