This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new af9f20e [ISSUE #306] Support real LitePullMessage in RocketMQ-Spring
(#307)
af9f20e is described below
commit af9f20eabd2c4ae1245bed2c0b816592128b1388
Author: CharliePu <[email protected]>
AuthorDate: Fri Dec 4 14:06:53 2020 +0800
[ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307)
* Support LitePullMessage in RocketMQ-Spring
* Add a property in litePullConnsumer named pullBatchSize.
* Add necessary comments.
---
.../samples/springboot/ConsumerACLApplication.java | 17 ++-
.../src/main/resources/application.properties | 2 +
.../samples/springboot/ConsumerApplication.java | 24 +++-
...erApplication.java => ExtRocketMQTemplate.java} | 19 +--
.../src/main/resources/application.properties | 2 +
.../ExtRocketMQConsumerConfiguration.java | 97 ++++++++++++++
.../ExtConsumerResetConfiguration.java | 149 +++++++++++++++++++++
.../autoconfigure/RocketMQAutoConfiguration.java | 80 +++++++++--
.../spring/autoconfigure/RocketMQProperties.java | 105 +++++++++++++++
.../rocketmq/spring/core/RocketMQTemplate.java | 88 ++++++++++--
.../rocketmq/spring/support/RocketMQUtil.java | 56 +++++++-
.../RocketMQAutoConfigurationTest.java | 66 +++++++++
.../spring/core/ExtRocketMQTemplateTest.java | 18 ++-
.../rocketmq/spring/core/RocketMQTemplateTest.java | 16 ++-
14 files changed, 699 insertions(+), 40 deletions(-)
diff --git
a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
index 3bf266b..f3d7578 100644
---
a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
+++
b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
@@ -17,17 +17,32 @@
package org.apache.rocketmq.samples.springboot;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import javax.annotation.Resource;
+import java.util.List;
+
/**
* ConsumerApplication
*/
@SpringBootApplication
-public class ConsumerACLApplication {
+public class ConsumerACLApplication implements CommandLineRunner {
+
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ConsumerACLApplication.class, args);
}
+
+ @Override
+ public void run(String... args) throws Exception {
+ ////This is an example of pull consumer with access-key and secret-key.
+ List<String> messages = rocketMQTemplate.receive(String.class);
+ System.out.printf("receive from rocketMQTemplate, messages=%s %n",
messages);
+ }
}
diff --git
a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
index 057edeb..3fe6abb 100644
---
a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
+++
b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
@@ -17,6 +17,8 @@
spring.application.name=rocketmq-consume-acl-demo
rocketmq.name-server=Endpoint_of_Aliware_MQ
+rocketmq.consumer.group=my-group1
+rocketmq.consumer.topic=test
rocketmq.topic=normal_topic_define_in_Aliware_MQ
# properties used in application code
diff --git
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
index e4fbc8c..6435d47 100644
---
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
+++
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
@@ -17,17 +17,39 @@
package org.apache.rocketmq.samples.springboot;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import javax.annotation.Resource;
+import java.util.List;
+
/**
* ConsumerApplication
*/
@SpringBootApplication
-public class ConsumerApplication {
+public class ConsumerApplication implements CommandLineRunner {
+
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+
+ @Resource(name = "extRocketMQTemplate")
+ private RocketMQTemplate extRocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
+
+ @Override
+ public void run(String... args) throws Exception {
+ //This is an example of pull consumer using rocketMQTemplate.
+ List<String> messages = rocketMQTemplate.receive(String.class);
+ System.out.printf("receive from rocketMQTemplate, messages=%s %n",
messages);
+
+ //This is an example of pull consumer using extRocketMQTemplate.
+ messages = extRocketMQTemplate.receive(String.class);
+ System.out.printf("receive from extRocketMQTemplate, messages=%s %n",
messages);
+ }
}
diff --git
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
similarity index 72%
copy from
rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
copy to
rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
index e4fbc8c..5499ac1 100644
---
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
+++
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -14,20 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.rocketmq.samples.springboot;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-/**
- * ConsumerApplication
- */
-@SpringBootApplication
-public class ConsumerApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApplication.class, args);
- }
-}
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group =
"string_consumer")
+public class ExtRocketMQTemplate extends RocketMQTemplate {
+}
\ No newline at end of file
diff --git
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index 096cbb5..5953e48 100644
---
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -16,6 +16,8 @@
spring.application.name=rocketmq-consume-demo
rocketmq.name-server=localhost:9876
+rocketmq.consumer.group=my-group1
+rocketmq.consumer.topic=test
# properties used in application code
demo.rocketmq.topic=string-topic
demo.rocketmq.bytesRequestTopic=bytesRequestTopic
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
new file mode 100644
index 0000000..63e4e37
--- /dev/null
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.annotation;
+
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ExtRocketMQConsumerConfiguration {
+
+ String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
+ String GROUP_PLACEHOLDER = "${rocketmq.consumer.group:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.consumer.topic:}";
+ String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
+
+ /**
+ * The component name of the Producer configuration.
+ */
+ String value() default "";
+
+ /**
+ * The property of "name-server".
+ */
+ String nameServer() default NAME_SERVER_PLACEHOLDER;
+
+ /**
+ * The property of "access-channel".
+ */
+ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
+
+ /**
+ * Group name of consumer.
+ */
+ String group() default GROUP_PLACEHOLDER;
+
+ /**
+ * Topic name of consumer.
+ */
+ String topic() default TOPIC_PLACEHOLDER;
+
+ /**
+ * Control message mode, if you want all subscribers receive message all
message, broadcasting is a good choice.
+ */
+ MessageModel messageModel() default MessageModel.CLUSTERING;
+
+ /**
+ * Control how to selector message.
+ *
+ * @see SelectorType
+ */
+ SelectorType selectorType() default SelectorType.TAG;
+
+ /**
+ * Control which message can be select. Grammar please see {@link
SelectorType#TAG} and {@link SelectorType#SQL92}
+ */
+ String selectorExpression() default "*";
+
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * Maximum number of messages pulled each time.
+ */
+ int pullBatchSize() default 10;
+}
\ No newline at end of file
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
new file mode 100644
index 0000000..7e81b5b
--- /dev/null
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.autoconfigure;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.SelectorType;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.aop.scope.ScopedProxyUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import
org.springframework.beans.factory.support.BeanDefinitionValidationException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.env.StandardEnvironment;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Configuration
+public class ExtConsumerResetConfiguration implements ApplicationContextAware,
SmartInitializingSingleton {
+
+ private final static Logger log =
LoggerFactory.getLogger(ExtConsumerResetConfiguration.class);
+
+ private ConfigurableApplicationContext applicationContext;
+
+ private StandardEnvironment environment;
+
+ private RocketMQProperties rocketMQProperties;
+
+ private RocketMQMessageConverter rocketMQMessageConverter;
+
+ public ExtConsumerResetConfiguration(RocketMQMessageConverter
rocketMQMessageConverter,
+ StandardEnvironment environment, RocketMQProperties
rocketMQProperties) {
+ this.rocketMQMessageConverter = rocketMQMessageConverter;
+ this.environment = environment;
+ this.rocketMQProperties = rocketMQProperties;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
+ this.applicationContext = (ConfigurableApplicationContext)
applicationContext;
+ }
+
+ @Override
+ public void afterSingletonsInstantiated() {
+ Map<String, Object> beans = this.applicationContext
+ .getBeansWithAnnotation(ExtRocketMQConsumerConfiguration.class)
+ .entrySet().stream().filter(entry ->
!ScopedProxyUtils.isScopedTarget(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ beans.forEach(this::registerTemplate);
+ }
+
+ private void registerTemplate(String beanName, Object bean) {
+ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
+
+ if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
+ throw new IllegalStateException(clazz + " is not instance of " +
RocketMQTemplate.class.getName());
+ }
+
+ ExtRocketMQConsumerConfiguration annotation =
clazz.getAnnotation(ExtRocketMQConsumerConfiguration.class);
+ GenericApplicationContext genericApplicationContext =
(GenericApplicationContext) applicationContext;
+ validate(annotation, genericApplicationContext);
+
+ DefaultLitePullConsumer consumer = null;
+ try {
+ consumer = createConsumer(annotation);
+ // Set instanceName same as the beanName
+ consumer.setInstanceName(beanName);
+ consumer.start();
+ } catch (Exception e) {
+ log.error("Failed to startup PullConsumer for RocketMQTemplate
{}", beanName, e);
+ }
+ RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
+ rocketMQTemplate.setConsumer(consumer);
+
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
+ log.info("Set real consumer to :{} {}", beanName, annotation.value());
+ }
+
+ private DefaultLitePullConsumer
createConsumer(ExtRocketMQConsumerConfiguration annotation)
+ throws MQClientException {
+
+ RocketMQProperties.Consumer consumerConfig =
rocketMQProperties.getConsumer();
+ if (consumerConfig == null) {
+ consumerConfig = new RocketMQProperties.Consumer();
+ }
+ String nameServer = resolvePlaceholders(annotation.nameServer(),
rocketMQProperties.getNameServer());
+ String groupName = resolvePlaceholders(annotation.group(),
consumerConfig.getGroup());
+ String topicName = resolvePlaceholders(annotation.topic(),
consumerConfig.getTopic());
+ Assert.hasText(nameServer, "[nameServer] must not be null");
+ Assert.hasText(groupName, "[group] must not be null");
+ Assert.hasText(topicName, "[topic] must not be null");
+
+ String accessChannel = resolvePlaceholders(annotation.accessChannel(),
rocketMQProperties.getAccessChannel());
+ MessageModel messageModel = annotation.messageModel();
+ SelectorType selectorType = annotation.selectorType();
+ String selectorExpression = annotation.selectorExpression();
+ String ak = resolvePlaceholders(annotation.accessKey(),
consumerConfig.getAccessKey());
+ String sk = resolvePlaceholders(annotation.secretKey(),
consumerConfig.getSecretKey());
+ int pullBatchSize = annotation.pullBatchSize();
+
+ DefaultLitePullConsumer litePullConsumer =
RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
+ groupName, topicName, messageModel, selectorType,
selectorExpression, ak, sk, pullBatchSize);
+ return litePullConsumer;
+ }
+
+ private String resolvePlaceholders(String text, String defaultValue) {
+ String value = environment.resolvePlaceholders(text);
+ return StringUtils.isEmpty(value) ? defaultValue : value;
+ }
+
+ private void validate(ExtRocketMQConsumerConfiguration annotation,
+ GenericApplicationContext genericApplicationContext) {
+ if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
+ throw new BeanDefinitionValidationException(
+ String.format("Bean {} has been used in Spring Application
Context, " +
+ "please check the
@ExtRocketMQConsumerConfiguration",
+ annotation.value()));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index e6131e7..b9ec643 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,47 +17,66 @@
package org.apache.rocketmq.spring.autoconfigure;
-import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
+import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import
org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
+import javax.annotation.PostConstruct;
+
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server",
matchIfMissing = true)
-@Import({MessageConverterConfiguration.class,
ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class,
RocketMQTransactionConfiguration.class})
+@Import({MessageConverterConfiguration.class,
ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class,
ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
-public class RocketMQAutoConfiguration {
+public class RocketMQAutoConfiguration implements ApplicationContextAware {
private static final Logger log =
LoggerFactory.getLogger(RocketMQAutoConfiguration.class);
public static final String ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME =
"rocketMQTemplate";
+ public static final String PRODUCER_BEAN_NAME = "defaultMQProducer";
+ public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer";
@Autowired
private Environment environment;
+ private ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
@PostConstruct
public void checkProperties() {
String nameServer = environment.getProperty("rocketmq.name-server",
String.class);
@@ -67,7 +86,7 @@ public class RocketMQAutoConfiguration {
}
}
- @Bean
+ @Bean(PRODUCER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server",
"producer.group"})
public DefaultMQProducer defaultMQProducer(RocketMQProperties
rocketMQProperties) {
@@ -100,14 +119,59 @@ public class RocketMQAutoConfiguration {
return producer;
}
+ @Bean(CONSUMER_BEAN_NAME)
+ @ConditionalOnMissingBean(DefaultLitePullConsumer.class)
+ @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server",
"consumer.group", "consumer.topic"})
+ public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties
rocketMQProperties)
+ throws MQClientException {
+ RocketMQProperties.Consumer consumerConfig =
rocketMQProperties.getConsumer();
+ String nameServer = rocketMQProperties.getNameServer();
+ String groupName = consumerConfig.getGroup();
+ String topicName = consumerConfig.getTopic();
+ Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
+ Assert.hasText(groupName, "[rocketmq.consumer.group] must not be
null");
+ Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be
null");
+
+ String accessChannel = rocketMQProperties.getAccessChannel();
+ MessageModel messageModel =
MessageModel.valueOf(consumerConfig.getMessageModel());
+ SelectorType selectorType =
SelectorType.valueOf(consumerConfig.getSelectorType());
+ String selectorExpression = consumerConfig.getSelectorExpression();
+ String ak = consumerConfig.getAccessKey();
+ String sk = consumerConfig.getSecretKey();
+ int pullBatchSize = consumerConfig.getPullBatchSize();
+
+ DefaultLitePullConsumer litePullConsumer =
RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
+ groupName, topicName, messageModel, selectorType,
selectorExpression, ak, sk, pullBatchSize);
+ return litePullConsumer;
+ }
+
@Bean(destroyMethod = "destroy")
- @ConditionalOnBean(DefaultMQProducer.class)
+ @Conditional(ProducerOrConsumerPropertyCondition.class)
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
- public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
- RocketMQMessageConverter rocketMQMessageConverter) {
+ public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter
rocketMQMessageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
- rocketMQTemplate.setProducer(mqProducer);
+ if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
+ rocketMQTemplate.setProducer((DefaultMQProducer)
applicationContext.getBean(PRODUCER_BEAN_NAME));
+ }
+ if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
+ rocketMQTemplate.setConsumer((DefaultLitePullConsumer)
applicationContext.getBean(CONSUMER_BEAN_NAME));
+ }
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
return rocketMQTemplate;
}
+
+ static class ProducerOrConsumerPropertyCondition extends
AnyNestedCondition {
+
+ public ProducerOrConsumerPropertyCondition() {
+ super(ConfigurationPhase.REGISTER_BEAN);
+ }
+
+ @ConditionalOnBean(DefaultMQProducer.class)
+ static class DefaultMQProducerExistsCondition {
+ }
+
+ @ConditionalOnBean(DefaultLitePullConsumer.class)
+ static class DefaultLitePullConsumerExistsCondition {
+ }
+ }
}
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index c08b377..98a88f7 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -231,6 +231,47 @@ public class RocketMQProperties {
public static final class Consumer {
/**
+ * Group name of consumer.
+ */
+ private String group;
+
+ /**
+ * Topic name of consumer.
+ */
+ private String topic;
+
+ /**
+ * Control message mode, if you want all subscribers receive message
all message, broadcasting is a good choice.
+ */
+ private String messageModel = "CLUSTERING";
+
+ /**
+ * Control how to selector message.
+ *
+ */
+ private String selectorType = "TAG";
+
+ /**
+ * Control which message can be select.
+ */
+ private String selectorExpression = "*";
+
+ /**
+ * The property of "access-key".
+ */
+ private String accessKey;
+
+ /**
+ * The property of "secret-key".
+ */
+ private String secretKey;
+
+ /**
+ * Maximum number of messages pulled each time.
+ */
+ private int pullBatchSize = 10;
+
+ /**
* listener configuration container
* the pattern is like this:
* group1.topic1 = false
@@ -239,6 +280,70 @@ public class RocketMQProperties {
*/
private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(String messageModel) {
+ this.messageModel = messageModel;
+ }
+
+ public String getSelectorType() {
+ return selectorType;
+ }
+
+ public void setSelectorType(String selectorType) {
+ this.selectorType = selectorType;
+ }
+
+ public String getSelectorExpression() {
+ return selectorExpression;
+ }
+
+ public void setSelectorExpression(String selectorExpression) {
+ this.selectorExpression = selectorExpression;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public int getPullBatchSize() {
+ return pullBatchSize;
+ }
+
+ public void setPullBatchSize(int pullBatchSize) {
+ this.pullBatchSize = pullBatchSize;
+ }
+
public Map<String, Map<String, Boolean>> getListeners() {
return listeners;
}
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 78f0b2d..a18e781 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,15 +17,7 @@
package org.apache.rocketmq.spring.core;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
@@ -52,12 +44,24 @@ import
org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
@SuppressWarnings({"WeakerAccess", "unused"})
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
implements InitializingBean, DisposableBean {
private static final Logger log =
LoggerFactory.getLogger(RocketMQTemplate.class);
private DefaultMQProducer producer;
+ private DefaultLitePullConsumer consumer;
+
private String charset = "UTF-8";
private MessageQueueSelector messageQueueSelector = new
SelectMessageQueueByHash();
@@ -72,6 +76,14 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
this.producer = producer;
}
+ public DefaultLitePullConsumer getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(DefaultLitePullConsumer consumer) {
+ this.consumer = consumer;
+ }
+
public String getCharset() {
return charset;
}
@@ -860,6 +872,13 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
if (producer != null) {
producer.start();
}
+ if (Objects.nonNull(consumer)) {
+ try {
+ consumer.start();
+ } catch (Exception e) {
+ log.error("Failed to startup PullConsumer for
RocketMQTemplate", e);
+ }
+ }
}
@Override
@@ -883,6 +902,9 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
if (Objects.nonNull(producer)) {
producer.shutdown();
}
+ if (Objects.nonNull(consumer)) {
+ consumer.shutdown();
+ }
}
/**
@@ -967,4 +989,52 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
}
return Object.class;
}
+
+ /**
+ * receive message in pull mode.
+ *
+ * @param clazz message object type
+ * @param <T>
+ * @return message list
+ */
+ public <T> List<T> receive(Class<T> clazz) {
+ return receive(clazz, this.consumer.getPollTimeoutMillis());
+ }
+
+ /**
+ * Same to {@link #receive(Class<T>)} with receive timeout specified in
addition.
+ *
+ * @param clazz message object type
+ * @param timeout receive timeout with millis
+ * @param <T>
+ * @return message list
+ */
+ public <T> List<T> receive(Class<T> clazz, long timeout) {
+ List<MessageExt> messageExts = this.consumer.poll(timeout);
+ List<T> list = new ArrayList<>(messageExts.size());
+ for (MessageExt messageExt : messageExts) {
+ list.add(doConvertMessage(messageExt, clazz));
+ }
+ return list;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T doConvertMessage(MessageExt messageExt, Class<T>
messageType) {
+ if (Objects.equals(messageType, MessageExt.class)) {
+ return (T) messageExt;
+ } else {
+ String str = new String(messageExt.getBody(),
Charset.forName(charset));
+ if (Objects.equals(messageType, String.class)) {
+ return (T) str;
+ } else {
+ // If msgType not string, use objectMapper change it.
+ try {
+ return (T)
this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(),
messageType);
+ } catch (Exception e) {
+ log.info("convert failed. str:{}, msgType:{}", str,
messageType);
+ throw new RuntimeException("cannot convert message to " +
messageType, e);
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 082b0f6..a891fa7 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -17,12 +17,11 @@
package org.apache.rocketmq.spring.support;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.lang.reflect.Field;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.Objects;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
@@ -36,6 +35,8 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.slf4j.Logger;
@@ -48,6 +49,11 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Objects;
+
public class RocketMQUtil {
private final static Logger log =
LoggerFactory.getLogger(RocketMQUtil.class);
@@ -285,4 +291,46 @@ public class RocketMQUtil {
return instanceName.toString();
}
+ public static DefaultLitePullConsumer createDefaultLitePullConsumer(String
nameServer, String accessChannel,
+ String groupName, String topicName, MessageModel messageModel,
SelectorType selectorType,
+ String selectorExpression, String ak, String sk, int pullBatchSize)
+ throws MQClientException {
+ DefaultLitePullConsumer litePullConsumer = null;
+ if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+ litePullConsumer = new DefaultLitePullConsumer(groupName, new
AclClientRPCHook(new SessionCredentials(ak, sk)));
+ litePullConsumer.setVipChannelEnabled(false);
+ } else {
+ litePullConsumer = new DefaultLitePullConsumer(groupName);
+ }
+ litePullConsumer.setNamesrvAddr(nameServer);
+
litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
+ litePullConsumer.setPullBatchSize(pullBatchSize);
+ if (accessChannel != null) {
+
litePullConsumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
+ }
+
+ switch (messageModel) {
+ case BROADCASTING:
+
litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
+ break;
+ case CLUSTERING:
+
litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'messageModel'
was wrong.");
+ }
+
+ switch (selectorType) {
+ case SQL92:
+ litePullConsumer.subscribe(topicName,
MessageSelector.bySql(selectorExpression));
+ break;
+ case TAG:
+ litePullConsumer.subscribe(topicName, selectorExpression);
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'selectorType'
was wrong.");
+ }
+
+ return litePullConsumer;
+ }
}
diff --git
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 514c138..9f31b09 100644
---
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -19,8 +19,11 @@ package org.apache.rocketmq.spring.autoconfigure;
import java.util.ArrayList;
import java.util.List;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
@@ -52,6 +55,12 @@ public class RocketMQAutoConfigurationTest {
runner.run(context -> context.getBean(DefaultMQProducer.class));
}
+ @Test(expected = NoSuchBeanDefinitionException.class)
+ public void testDefaultLitePullConsumerNotCreatedByDefault() {
+ // You will see the WARN log message about missing
rocketmq.name-server spring property when running this test case.
+ runner.run(context -> context.getBean(DefaultLitePullConsumer.class));
+ }
+
@Test
public void testDefaultMQProducerWithRelaxPropertyName() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
@@ -65,6 +74,19 @@ public class RocketMQAutoConfigurationTest {
}
@Test
+ public void testDefaultLitePullConsumerWithRelaxPropertyName() {
+ runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
+ "rocketmq.consumer.group=spring_rocketmq",
+ "rocketmq.consumer.topic=test",
+ "rocketmq.accessChannel=LOCAL").
+ run((context) -> {
+
assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
+
assertThat(context).hasSingleBean(RocketMQProperties.class);
+ });
+
+ }
+
+ @Test
public void testBadAccessChannelProperty() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
"rocketmq.producer.group=spring_rocketmq",
@@ -73,6 +95,15 @@ public class RocketMQAutoConfigurationTest {
//Should throw exception for bad accessChannel property
assertThat(context).getFailure();
});
+
+ runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
+ "rocketmq.consumer.group=spring_rocketmq",
+ "rocketmq.consumer.topic=test",
+ "rocketmq.accessChannel=LOCAL123").
+ run((context) -> {
+ //Should throw exception for bad accessChannel property
+ assertThat(context).getFailure();
+ });
}
@Test
@@ -85,6 +116,16 @@ public class RocketMQAutoConfigurationTest {
}
@Test
+ public void testDefaultLitePullConsumer() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
+ "rocketmq.consumer.group=spring_rocketmq",
+ "rocketmq.consumer.topic=test").
+ run((context) -> {
+
assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
+ });
+ }
+
+ @Test
public void testExtRocketMQTemplate() {
runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
withUserConfiguration(TestExtRocketMQTemplateConfig.class,
CustomObjectMappersConfig.class).
@@ -95,6 +136,16 @@ public class RocketMQAutoConfigurationTest {
});
}
+
+ @Test
+ public void testExtRocketMQConsumer() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
+ withUserConfiguration(TestExtRocketMQConsumerConfig.class,
CustomObjectMappersConfig.class).
+ run((context) -> {
+
assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("consumer");
+ });
+ }
+
@Test
public void testConsumerListener() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
@@ -347,5 +398,20 @@ public class RocketMQAutoConfigurationTest {
}
}
+
+ @Configuration
+ static class TestExtRocketMQConsumerConfig {
+
+ @Bean
+ public RocketMQTemplate extRocketMQTemplate() {
+ return new TestExtRocketMQConsumer();
+ }
+
+ }
+
+ @ExtRocketMQConsumerConfiguration(topic = "test", group = "test",
nameServer = "127.0.0.1:9876")
+ static class TestExtRocketMQConsumer extends RocketMQTemplate {
+
+ }
}
diff --git
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java
index 7430ff5..dc88253 100644
---
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java
+++
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java
@@ -16,8 +16,8 @@
*/
package org.apache.rocketmq.spring.core;
-import javax.annotation.Resource;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
@@ -29,17 +29,22 @@ import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import javax.annotation.Resource;
+
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(properties = {
- "rocketmq.nameServer=127.0.0.1:9876",
"rocketmq.producer.group=extRocketMQTemplate-test-producer_group"}, classes =
{RocketMQAutoConfiguration.class, ExtRocketMQTemplate.class,
ExtTransactionListenerImpl.class})
+ "rocketmq.nameServer=127.0.0.1:9876",
"rocketmq.producer.group=extRocketMQTemplate-test-producer_group"}, classes =
{RocketMQAutoConfiguration.class, ExtRocketMQTemplate.class,
ExtTransactionListenerImpl.class, ExtRocketMQConsumer.class})
public class ExtRocketMQTemplateTest {
@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;
+ @Resource(name = "extRocketMQConsumer")
+ private ExtRocketMQConsumer extRocketMQConsumer;
+
@Resource
private RocketMQTemplate rocketMQTemplate;
@@ -49,6 +54,10 @@ public class ExtRocketMQTemplateTest {
assertThat(extRocketMQTemplate.getProducer().getProducerGroup()).isEqualTo("extRocketMQTemplate-test-group");
assertThat(extRocketMQTemplate.getProducer().getSendMsgTimeout()).isEqualTo(3000);
assertThat(extRocketMQTemplate.getProducer().getMaxMessageSize()).isEqualTo(4 *
1024);
+
+
assertThat(extRocketMQConsumer.getConsumer().getNamesrvAddr()).isEqualTo("172.0.0.1:9876");
+
assertThat(extRocketMQConsumer.getConsumer().getConsumerGroup()).isEqualTo("extRocketMQTemplate-test-group");
+
assertThat(extRocketMQConsumer.getConsumer().getPullBatchSize()).isEqualTo(3);
}
@Test
@@ -93,6 +102,11 @@ class ExtTransactionListenerImpl implements
RocketMQLocalTransactionListener {
}
}
+@ExtRocketMQConsumerConfiguration(nameServer = "172.0.0.1:9876", group =
"extRocketMQTemplate-test-group", topic = "test", pullBatchSize = 3)
+class ExtRocketMQConsumer extends RocketMQTemplate {
+
+}
+
diff --git
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 56a7de6..2103da2 100644
---
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -50,7 +50,8 @@ import static org.mockito.ArgumentMatchers.any;
"test.rocketmq.topic=test", "rocketmq.producer.access-key=test-ak",
"rocketmq.producer.secret-key=test-sk", "rocketmq.accessChannel=LOCAL",
"rocketmq.producer.sendMessageTimeout= 3500",
"rocketmq.producer.retryTimesWhenSendFailed=3",
- "rocketmq.producer.retryTimesWhenSendAsyncFailed=3"}, classes =
{RocketMQAutoConfiguration.class, TransactionListenerImpl.class})
+ "rocketmq.producer.retryTimesWhenSendAsyncFailed=3",
+ "rocketmq.consumer.group=spring_rocketmq",
"rocketmq.consumer.topic=test"}, classes = {RocketMQAutoConfiguration.class,
TransactionListenerImpl.class})
public class RocketMQTemplateTest {
@Resource
@@ -95,6 +96,15 @@ public class RocketMQTemplateTest {
}
@Test
+ public void testReceiveMessage() {
+ try {
+ rocketMQTemplate.receive(String.class);
+ } catch (MessagingException e) {
+
assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException:
connect to [127.0.0.1:9876] failed");
+ }
+ }
+
+ @Test
public void testSendMessage_withCustomAsyncSenderExecutor() {
ExecutorService executorService = new ThreadPoolExecutor(
2,
@@ -234,6 +244,10 @@ public class RocketMQTemplateTest {
assertThat(rocketMQTemplate.getProducer().getRetryTimesWhenSendAsyncFailed()).isEqualTo(3);
assertThat(rocketMQTemplate.getProducer().getRetryTimesWhenSendFailed()).isEqualTo(3);
assertThat(rocketMQTemplate.getProducer().getCompressMsgBodyOverHowmuch()).isEqualTo(1024
* 4);
+
+
assertThat(rocketMQTemplate.getConsumer().getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
+
assertThat(rocketMQTemplate.getConsumer().getConsumerGroup()).isEqualTo("spring_rocketmq");
+
assertThat(rocketMQTemplate.getConsumer().getAccessChannel()).isEqualTo(AccessChannel.LOCAL);
}
@Test