This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new f437f52 [ISSUE #654] Support namespace for
rocketmq-v5-client-spring-boot and rocketmq-spring-boot (#655)
f437f52 is described below
commit f437f52f87891b1efe0f62cb46cf783f4c6a006d
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Mon May 27 15:34:18 2024 +0800
[ISSUE #654] Support namespace for rocketmq-v5-client-spring-boot and
rocketmq-spring-boot (#655)
* Add namespace
* Add namespace in toString
---
.../ExtRocketMQConsumerConfiguration.java | 5 +++++
.../ExtRocketMQTemplateConfiguration.java | 6 +++++
.../spring/annotation/RocketMQMessageListener.java | 5 +++++
.../ExtConsumerResetConfiguration.java | 2 ++
.../ExtProducerResetConfiguration.java | 2 ++
.../autoconfigure/RocketMQAutoConfiguration.java | 6 +++++
.../spring/autoconfigure/RocketMQProperties.java | 26 ++++++++++++++++++++++
.../support/DefaultRocketMQListenerContainer.java | 12 ++++++++++
.../RocketMQMessageListenerContainerRegistrar.java | 4 ++++
.../DefaultRocketMQListenerContainerTest.java | 2 ++
.../annotation/ExtConsumerResetConfiguration.java | 4 ++++
.../annotation/ExtProducerResetConfiguration.java | 4 ++++
.../client/annotation/RocketMQMessageListener.java | 5 ++++-
.../ExtConsumerResetConfiguration.java | 3 ++-
.../ExtTemplateResetConfiguration.java | 5 ++++-
.../ListenerContainerConfiguration.java | 1 +
.../client/autoconfigure/RocketMQProperties.java | 22 ++++++++++++++++++
.../client/support/DefaultListenerContainer.java | 13 ++++++++++-
.../rocketmq/client/support/RocketMQUtil.java | 11 ++++++---
19 files changed, 131 insertions(+), 7 deletions(-)
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
index 64a261f..2d6b273 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -116,6 +116,11 @@ public @interface ExtRocketMQConsumerConfiguration {
*/
String namespace() default "";
+ /**
+ * The namespace v2 version of consumer, it can not be used in combination
with namespace.
+ */
+ String namespaceV2() default "";
+
/**
* The property of "instanceName".
*/
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
index 504d5c0..a8ae90d 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@@ -89,11 +89,17 @@ public @interface ExtRocketMQTemplateConfiguration {
* The property of "tlsEnable" default false.
*/
String tlsEnable() default "false";
+
/**
* The namespace of producer.
*/
String namespace() default "";
+ /**
+ * The namespace v2 version of producer, it can not be used in combination
with namespace.
+ */
+ String namespaceV2() default "";
+
/**
* The property of "instanceName".
*/
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 4921515..0c1638c 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -142,6 +142,11 @@ public @interface RocketMQMessageListener {
*/
String namespace() default "";
+ /**
+ * The namespace V2 version of listener, it can not be used in combination
with namespace.
+ */
+ String namespaceV2() default "";
+
/**
* Message consume retry strategy in concurrently mode.
*
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
index 178285d..efe69ec 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -132,6 +132,8 @@ public class ExtConsumerResetConfiguration implements
ApplicationContextAware, S
litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(),
consumerConfig.getCustomizedTraceTopic()));
String namespace =
environment.resolvePlaceholders(annotation.namespace());
litePullConsumer.setNamespace(RocketMQUtil.getNamespace(namespace,
consumerConfig.getNamespace()));
+ String namespaceV2 =
environment.resolvePlaceholders(annotation.namespaceV2());
+ litePullConsumer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2,
consumerConfig.getNamespaceV2()));
litePullConsumer.setInstanceName(annotation.instanceName());
return litePullConsumer;
}
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index 35aee2a..1db3ae2 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -129,6 +129,8 @@ public class ExtProducerResetConfiguration implements
ApplicationContextAware, S
producer.setUseTLS(useTLS);
String namespace =
environment.resolvePlaceholders(annotation.namespace());
producer.setNamespace(RocketMQUtil.getNamespace(namespace,
producerConfig.getNamespace()));
+ String namespaceV2 =
environment.resolvePlaceholders(annotation.namespaceV2());
+ producer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2,
producerConfig.getNamespaceV2()));
producer.setInstanceName(annotation.instanceName());
return producer;
}
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 1bf9764..45d2664 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -118,6 +118,9 @@ public class RocketMQAutoConfiguration implements
ApplicationContextAware {
if (StringUtils.hasText(producerConfig.getNamespace())) {
producer.setNamespace(producerConfig.getNamespace());
}
+ if (StringUtils.hasText(producerConfig.getNamespaceV2())) {
+ producer.setNamespaceV2(producerConfig.getNamespaceV2());
+ }
producer.setInstanceName(producerConfig.getInstanceName());
log.info("a producer ({}) init on namesrv {}", groupName, nameServer);
return producer;
@@ -152,6 +155,9 @@ public class RocketMQAutoConfiguration implements
ApplicationContextAware {
if (StringUtils.hasText(consumerConfig.getNamespace())) {
litePullConsumer.setNamespace(consumerConfig.getNamespace());
}
+ if (StringUtils.hasText(consumerConfig.getNamespaceV2())) {
+ litePullConsumer.setNamespaceV2(consumerConfig.getNamespaceV2());
+ }
litePullConsumer.setInstanceName(consumerConfig.getInstanceName());
log.info("a pull consumer({} sub {}) init on namesrv {}", groupName,
topicName, nameServer);
return litePullConsumer;
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index e1b26a7..41752ae 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -108,6 +108,11 @@ public class RocketMQProperties {
*/
private String namespace;
+ /**
+ * The namespace v2 version of producer, it can not be used in
combination with namespace.
+ */
+ private String namespaceV2;
+
/**
* Millis of send message timeout.
*/
@@ -274,6 +279,14 @@ public class RocketMQProperties {
this.namespace = namespace;
}
+ public String getNamespaceV2() {
+ return namespaceV2;
+ }
+
+ public void setNamespaceV2(String namespaceV2) {
+ this.namespaceV2 = namespaceV2;
+ }
+
public String getInstanceName() {
return instanceName;
}
@@ -294,6 +307,11 @@ public class RocketMQProperties {
*/
private String namespace;
+ /**
+ * The namespace v2 version of consumer, it can not be used in
combination with namespace.
+ */
+ private String namespaceV2;
+
/**
* Topic name of consumer.
*/
@@ -445,6 +463,14 @@ public class RocketMQProperties {
this.namespace = namespace;
}
+ public String getNamespaceV2() {
+ return namespaceV2;
+ }
+
+ public void setNamespaceV2(String namespaceV2) {
+ this.namespaceV2 = namespaceV2;
+ }
+
public String getInstanceName() {
return instanceName;
}
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index fb7762e..ae095d0 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -135,6 +135,7 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
private int replyTimeout;
private String tlsEnable;
private String namespace;
+ private String namespaceV2;
private long awaitTerminationMillisWhenShutdown;
private String instanceName;
@@ -246,6 +247,7 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
this.replyTimeout = anno.replyTimeout();
this.tlsEnable = anno.tlsEnable();
this.namespace = anno.namespace();
+ this.namespaceV2 = anno.namespaceV2();
this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
this.suspendCurrentQueueTimeMillis =
anno.suspendCurrentQueueTimeMillis();
this.awaitTerminationMillisWhenShutdown = Math.max(0,
anno.awaitTerminationMillisWhenShutdown());
@@ -288,6 +290,14 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
this.namespace = namespace;
}
+ public String getNamespaceV2() {
+ return namespaceV2;
+ }
+
+ public void setNamespaceV2(String namespaceV2) {
+ this.namespaceV2 = namespaceV2;
+ }
+
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
@@ -394,6 +404,7 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
return "DefaultRocketMQListenerContainer{" +
"consumerGroup='" + consumerGroup + '\'' +
", namespace='" + namespace + '\'' +
+ ", namespaceV2='" + namespaceV2 + '\'' +
", nameServer='" + nameServer + '\'' +
", topic='" + topic + '\'' +
", consumeMode=" + consumeMode +
@@ -631,6 +642,7 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
+ consumer.setNamespaceV2(namespaceV2);
String customizedNameServer =
this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
index 11cdcd9..a27c187 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
@@ -151,6 +151,10 @@ public class RocketMQMessageListenerContainerRegistrar
implements ApplicationCon
String namespace =
environment.resolvePlaceholders(annotation.namespace());
container.setNamespace(RocketMQUtil.getNamespace(namespace,
rocketMQProperties.getConsumer().getNamespace()));
+
+ String namespaceV2 =
environment.resolvePlaceholders(annotation.namespaceV2());
+ container.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2,
+ rocketMQProperties.getConsumer().getNamespaceV2()));
return container;
}
diff --git
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index de15fcd..182d2fa 100644
---
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -251,6 +251,7 @@ public class DefaultRocketMQListenerContainerTest {
assertEquals(anno.selectorExpression(),
container.getSelectorExpression());
assertEquals(anno.tlsEnable(), container.getTlsEnable());
assertEquals(anno.namespace(), container.getNamespace());
+ assertEquals(anno.namespaceV2(), container.getNamespaceV2());
assertEquals(anno.delayLevelWhenNextConsume(),
container.getDelayLevelWhenNextConsume());
assertEquals(anno.suspendCurrentQueueTimeMillis(),
container.getSuspendCurrentQueueTimeMillis());
assertEquals(anno.instanceName(), container.getInstanceName());
@@ -264,6 +265,7 @@ public class DefaultRocketMQListenerContainerTest {
selectorExpression = "selectorExpression",
tlsEnable = "tlsEnable",
namespace = "namespace",
+ namespaceV2 = "namespaceV2",
delayLevelWhenNextConsume = 1234,
suspendCurrentQueueTimeMillis = 2345,
instanceName = "instanceName"
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
index 0bc9564..f0b942f 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
@@ -87,4 +87,8 @@ public @interface ExtConsumerResetConfiguration {
*/
int awaitDuration() default 5;
+ /**
+ * The namespace of consumer.
+ */
+ String namespace() default "";
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java
index 8849d0f..c6ab505 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java
@@ -75,4 +75,8 @@ public @interface ExtProducerResetConfiguration {
*/
int maxAttempts() default 3;
+ /**
+ * The namespace of producer.
+ */
+ String namespace() default "";
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java
index 3b8e1fa..67796a9 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java
@@ -82,5 +82,8 @@ public @interface RocketMQMessageListener {
int consumptionThreadCount() default 20;
-
+ /**
+ * The namespace of listener.
+ */
+ String namespace() default "";
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
index 23b70af..6e854a0 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
@@ -113,13 +113,14 @@ public class ExtConsumerResetConfiguration implements
ApplicationContextAware, S
String accessKey = resolvePlaceholders(annotation.accessKey(),
simpleConsumer.getAccessKey());
String secretKey = resolvePlaceholders(annotation.secretKey(),
simpleConsumer.getSecretKey());
String endPoints = resolvePlaceholders(annotation.endpoints(),
simpleConsumer.getEndpoints());
+ String namespace = resolvePlaceholders(annotation.namespace(),
simpleConsumer.getNamespace());
String tag = resolvePlaceholders(annotation.tag(),
simpleConsumer.getTag());
String filterExpressionType =
resolvePlaceholders(annotation.filterExpressionType(),
simpleConsumer.getFilterExpressionType());
Duration requestTimeout =
Duration.ofSeconds(annotation.requestTimeout());
int awaitDuration = annotation.awaitDuration();
Boolean sslEnabled = simpleConsumer.isSslEnabled();
Assert.hasText(topicName, "[topic] must not be null");
- ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled);
+ ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled, namespace);
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
FilterExpression filterExpression =
RocketMQUtil.createFilterExpression(tag, filterExpressionType);
Duration duration = Duration.ofSeconds(awaitDuration);
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
index 11ddf85..1973752 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
@@ -110,9 +110,12 @@ public class ExtTemplateResetConfiguration implements
ApplicationContextAware, S
accessKey = StringUtils.hasLength(accessKey) ? accessKey :
producerConfig.getAccessKey();
String secretKey =
environment.resolvePlaceholders(annotation.secretKey());
secretKey = StringUtils.hasLength(secretKey) ? secretKey :
producerConfig.getSecretKey();
+ String namespace =
environment.resolvePlaceholders(annotation.namespace());
+ namespace = StringUtils.hasLength(namespace) ? namespace :
producerConfig.getNamespace();
int requestTimeout = annotation.requestTimeout();
Boolean sslEnabled = producerConfig.isSslEnabled();
- ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints,
Duration.ofSeconds(requestTimeout), sslEnabled);
+ ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey,
+ endpoints, Duration.ofSeconds(requestTimeout), sslEnabled,
namespace);
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
ProducerBuilder producerBuilder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration).setMaxAttempts(annotation.maxAttempts())
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
index bfbb7f9..450d846 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
@@ -102,6 +102,7 @@ public class ListenerContainerConfiguration implements
ApplicationContextAware {
container.setTag(environment.resolvePlaceholders(annotation.tag()));
container.setEndpoints(environment.resolvePlaceholders(annotation.endpoints()));
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
+
container.setNamespace(environment.resolvePlaceholders(annotation.namespace()));
container.setRequestTimeout(Duration.ofSeconds(annotation.requestTimeout()));
container.setMaxCachedMessageCount(annotation.maxCachedMessageCount());
container.setConsumptionThreadCount(annotation.consumptionThreadCount());
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
index 18ed9d1..267b19a 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
@@ -79,6 +79,8 @@ public class RocketMQProperties {
*/
private int maxAttempts = 3;
+ private String namespace;
+
public String getAccessKey() {
return accessKey;
}
@@ -135,6 +137,14 @@ public class RocketMQProperties {
this.maxAttempts = maxAttempts;
}
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
@Override
public String toString() {
return "Producer{" +
@@ -144,6 +154,7 @@ public class RocketMQProperties {
", topic='" + topic + '\'' +
", requestTimeout=" + requestTimeout +
", sslEnabled=" + sslEnabled +
+ ", namespace='" + namespace + '\'' +
'}';
}
}
@@ -200,6 +211,8 @@ public class RocketMQProperties {
*/
private boolean sslEnabled = true;
+ private String namespace = "";
+
public String getAccessKey() {
return accessKey;
}
@@ -280,6 +293,14 @@ public class RocketMQProperties {
this.filterExpressionType = filterExpressionType;
}
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
@Override
public String toString() {
return "SimpleConsumer{" +
@@ -293,6 +314,7 @@ public class RocketMQProperties {
", requestTimeout=" + requestTimeout +
", filterExpressionType='" + filterExpressionType + '\'' +
", sslEnabled=" + sslEnabled +
+ ", namespace='" + namespace + '\'' +
'}';
}
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
index 45cdf7e..69bbe60 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
@@ -87,6 +87,8 @@ public class DefaultListenerContainer implements
InitializingBean,
Boolean sslEnabled;
+ String namespace;
+
public String getName() {
return name;
}
@@ -240,6 +242,14 @@ public class DefaultListenerContainer implements
InitializingBean,
this.sslEnabled = sslEnabled;
}
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
private void initRocketMQPushConsumer() {
if (rocketMQMessageListener == null) {
throw new IllegalArgumentException("Property
'rocketMQMessageListener' is required");
@@ -253,7 +263,7 @@ public class DefaultListenerContainer implements
InitializingBean,
filterExpression =
RocketMQUtil.createFilterExpression(this.getTag(),this.getType());
}
ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(),
- this.getEndpoints(), this.getRequestTimeout(),
this.sslEnabled);
+ this.getEndpoints(), this.getRequestTimeout(),
this.sslEnabled, this.namespace);
PushConsumerBuilder pushConsumerBuilder =
provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration);
@@ -354,6 +364,7 @@ public class DefaultListenerContainer implements
InitializingBean,
", maxCachedMessageCount=" + maxCachedMessageCount +
", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes +
", consumptionThreadCount=" + consumptionThreadCount +
+ ", namespace='" + namespace + '\'' +
'}';
}
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
index 9104828..4e95bf4 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
@@ -122,7 +122,8 @@ public class RocketMQUtil {
String endPoints = rocketMQProducer.getEndpoints();
Duration requestTimeout =
Duration.ofSeconds(rocketMQProducer.getRequestTimeout());
boolean sslEnabled = rocketMQProducer.isSslEnabled();
- return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled);
+ String namespace = rocketMQProducer.getNamespace();
+ return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled, namespace);
}
public static ClientConfiguration
createConsumerClientConfiguration(RocketMQProperties.SimpleConsumer
simpleConsumer) {
@@ -131,12 +132,13 @@ public class RocketMQUtil {
String endPoints = simpleConsumer.getEndpoints();
Duration requestTimeout =
Duration.ofSeconds(simpleConsumer.getRequestTimeout());
boolean sslEnabled = simpleConsumer.isSslEnabled();
- return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled);
+ String namespace = simpleConsumer.getNamespace();
+ return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled, namespace);
}
public static ClientConfiguration createClientConfiguration(String
accessKey, String secretKey, String endPoints,
- Duration
requestTimeout, Boolean sslEnabled) {
+ Duration
requestTimeout, Boolean sslEnabled, String namespace) {
SessionCredentialsProvider sessionCredentialsProvider = null;
if (StringUtils.hasLength(accessKey) &&
StringUtils.hasLength(secretKey)) {
@@ -154,6 +156,9 @@ public class RocketMQUtil {
if (Objects.nonNull(sslEnabled)) {
clientConfigurationBuilder.enableSsl(sslEnabled);
}
+ if (StringUtils.hasLength(namespace)) {
+ clientConfigurationBuilder.setNamespace(namespace);
+ }
return clientConfigurationBuilder.build();
}