This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new b03f552 [ISSUE #632 ] Fix NPE caused by using
ExtRocketMQTemplateConfiguration annotation extension to send messages
b03f552 is described below
commit b03f5525a8c756ac0ff53fd783b0772b265fbf5d
Author: lilinjiang <[email protected]>
AuthorDate: Sun Mar 3 10:07:52 2024 +0800
[ISSUE #632 ] Fix NPE caused by using ExtRocketMQTemplateConfiguration
annotation extension to send messages
---
.../RocketMQMessageListenerBeanPostProcessor.java | 33 +++++++++++++++++++++-
.../RocketMQMessageListenerContainerRegistrar.java | 27 ++++++++++++------
2 files changed, 51 insertions(+), 9 deletions(-)
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
index bf53ecb..8dfb358 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.SmartLifecycle;
import org.springframework.core.OrderComparator;
import org.springframework.core.annotation.AnnotationUtils;
@@ -32,7 +33,7 @@ import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
-public class RocketMQMessageListenerBeanPostProcessor implements
ApplicationContextAware, BeanPostProcessor, InitializingBean {
+public class RocketMQMessageListenerBeanPostProcessor implements
ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {
private ApplicationContext applicationContext;
@@ -40,6 +41,8 @@ public class RocketMQMessageListenerBeanPostProcessor
implements ApplicationCont
private RocketMQMessageListenerContainerRegistrar
listenerContainerRegistrar;
+ private boolean running = false;
+
@Override
public Object postProcessBeforeInitialization(Object bean, String
beanName) throws BeansException {
return bean;
@@ -58,6 +61,34 @@ public class RocketMQMessageListenerBeanPostProcessor
implements ApplicationCont
return bean;
}
+ @Override
+ public int getPhase() {
+ return Integer.MAX_VALUE - 2000;
+ }
+
+ @Override
+ public void start() {
+ if (!isRunning()) {
+ this.setRunning(true);
+ listenerContainerRegistrar.startContainer();
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = applicationContext;
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
index c15d168..11cdcd9 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
@@ -36,7 +36,9 @@ import
org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.StringUtils;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class RocketMQMessageListenerContainerRegistrar implements
ApplicationContextAware {
@@ -52,6 +54,8 @@ public class RocketMQMessageListenerContainerRegistrar
implements ApplicationCon
private final RocketMQMessageConverter rocketMQMessageConverter;
+ private final List<DefaultRocketMQListenerContainer> containers = new
ArrayList<>();
+
public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter
rocketMQMessageConverter,
ConfigurableEnvironment
environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
@@ -97,18 +101,25 @@ public class RocketMQMessageListenerContainerRegistrar
implements ApplicationCon
genericApplicationContext.registerBean(containerBeanName,
DefaultRocketMQListenerContainer.class, () ->
createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container =
genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
- if (!container.isRunning()) {
- try {
- container.start();
- } catch (Exception e) {
- log.error("Started container failed. {}", container, e);
- throw new RuntimeException(e);
- }
- }
+
+ containers.add(container);
log.info("Register the listener to container, listenerBeanName:{},
containerBeanName:{}", beanName, containerBeanName);
}
+ public void startContainer() {
+ for (DefaultRocketMQListenerContainer container : containers) {
+ if (!container.isRunning()) {
+ try {
+ container.start();
+ } catch (Exception e) {
+ log.error("Started container failed. {}", container, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
private DefaultRocketMQListenerContainer
createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new
DefaultRocketMQListenerContainer();