This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 89896c5 [ISSUE #622] Add config enableSsl (#623)
89896c5 is described below
commit 89896c5926b6bd007cbdd48773dc0b5eca333422
Author: panzhi <[email protected]>
AuthorDate: Thu Feb 8 10:51:41 2024 +0800
[ISSUE #622] Add config enableSsl (#623)
---
.../autoconfigure/ExtConsumerResetConfiguration.java | 3 ++-
.../autoconfigure/ExtTemplateResetConfiguration.java | 3 ++-
.../client/autoconfigure/RocketMQAutoConfiguration.java | 8 +++++---
.../rocketmq/client/support/DefaultListenerContainer.java | 13 ++++++++++++-
.../org/apache/rocketmq/client/support/RocketMQUtil.java | 14 +++++++++-----
5 files changed, 30 insertions(+), 11 deletions(-)
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
index 3545dae..8a758b5 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
@@ -117,8 +117,9 @@ public class ExtConsumerResetConfiguration implements
ApplicationContextAware, S
String filterExpressionType =
resolvePlaceholders(annotation.filterExpressionType(),
simpleConsumer.getFilterExpressionType());
Duration requestTimeout = Duration.ofDays(annotation.requestTimeout());
int awaitDuration = annotation.awaitDuration();
+ Boolean sslEnabled = simpleConsumer.isSslEnabled();
Assert.hasText(topicName, "[topic] must not be null");
- ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout);
+ ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled);
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
FilterExpression filterExpression =
RocketMQUtil.createFilterExpression(tag, filterExpressionType);
Duration duration = Duration.ofSeconds(awaitDuration);
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
index 75bd449..bd301c7 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java
@@ -111,7 +111,8 @@ public class ExtTemplateResetConfiguration implements
ApplicationContextAware, S
String secretKey =
environment.resolvePlaceholders(annotation.secretKey());
secretKey = StringUtils.hasLength(secretKey) ? secretKey :
producerConfig.getSecretKey();
int requestTimeout = annotation.requestTimeout();
- ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints,
Duration.ofDays(requestTimeout));
+ Boolean sslEnabled = producerConfig.isSslEnabled();
+ ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints,
Duration.ofDays(requestTimeout), sslEnabled);
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
ProducerBuilder producerBuilder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration).setMaxAttempts(annotation.maxAttempts())
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
index 11763c8..e9bd8be 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
@@ -83,10 +83,12 @@ public class RocketMQAutoConfiguration implements
ApplicationContextAware {
ProducerBuilder producerBuilder;
producerBuilder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
- // Set the topic name(s), which is optional but recommended.
It makes producer could prefetch the topic
- // route before message publishing.
- .setTopics(rocketMQProducer.getTopic())
.setMaxAttempts(rocketMQProducer.getMaxAttempts());
+ if (StringUtils.hasLength(topic)) {
+ // Set the topic name(s), which is optional but recommended. It
makes producer could prefetch the topic
+ // route before message publishing.
+ producerBuilder.setTopics(rocketMQProducer.getTopic());
+ }
log.info(String.format("a producer init on proxy %s", endPoints));
return producerBuilder;
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
index c2466c0..45cdf7e 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
@@ -85,6 +85,8 @@ public class DefaultListenerContainer implements
InitializingBean,
int consumptionThreadCount = 20;
+ Boolean sslEnabled;
+
public String getName() {
return name;
}
@@ -230,6 +232,14 @@ public class DefaultListenerContainer implements
InitializingBean,
this.type = type;
}
+ public Boolean getSslEnabled() {
+ return sslEnabled;
+ }
+
+ public void setSslEnabled(Boolean sslEnabled) {
+ this.sslEnabled = sslEnabled;
+ }
+
private void initRocketMQPushConsumer() {
if (rocketMQMessageListener == null) {
throw new IllegalArgumentException("Property
'rocketMQMessageListener' is required");
@@ -242,7 +252,8 @@ public class DefaultListenerContainer implements
InitializingBean,
if (StringUtils.hasLength(this.getTag())) {
filterExpression =
RocketMQUtil.createFilterExpression(this.getTag(),this.getType());
}
- ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(this.getAccessKey(),
this.getSecretKey(), this.getEndpoints(), this.getRequestTimeout());
+ ClientConfiguration clientConfiguration =
RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(),
+ this.getEndpoints(), this.getRequestTimeout(),
this.sslEnabled);
PushConsumerBuilder pushConsumerBuilder =
provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration);
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
index 10d977a..fab2d46 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
@@ -121,8 +121,8 @@ public class RocketMQUtil {
String secretKey = rocketMQProducer.getSecretKey();
String endPoints = rocketMQProducer.getEndpoints();
Duration requestTimeout =
Duration.ofDays(rocketMQProducer.getRequestTimeout());
- // boolean sslEnabled = rocketMQProducer.isSslEnabled();
- return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout);
+ boolean sslEnabled = rocketMQProducer.isSslEnabled();
+ return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled);
}
public static ClientConfiguration
createConsumerClientConfiguration(RocketMQProperties.SimpleConsumer
simpleConsumer) {
@@ -130,12 +130,13 @@ public class RocketMQUtil {
String secretKey = simpleConsumer.getSecretKey();
String endPoints = simpleConsumer.getEndpoints();
Duration requestTimeout =
Duration.ofDays(simpleConsumer.getRequestTimeout());
- // boolean sslEnabled = rocketMQProducer.isSslEnabled();
- return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout);
+ boolean sslEnabled = simpleConsumer.isSslEnabled();
+ return createClientConfiguration(accessKey, secretKey, endPoints,
requestTimeout, sslEnabled);
}
- public static ClientConfiguration createClientConfiguration(String
accessKey, String secretKey, String endPoints, Duration requestTimeout) {
+ public static ClientConfiguration createClientConfiguration(String
accessKey, String secretKey, String endPoints,
+ Duration
requestTimeout, Boolean sslEnabled) {
SessionCredentialsProvider sessionCredentialsProvider = null;
if (StringUtils.hasLength(accessKey) &&
StringUtils.hasLength(secretKey)) {
@@ -150,6 +151,9 @@ public class RocketMQUtil {
if (Objects.nonNull(requestTimeout)) {
clientConfigurationBuilder.setRequestTimeout(requestTimeout);
}
+ if (Objects.nonNull(sslEnabled)) {
+ clientConfigurationBuilder.enableSsl(sslEnabled);
+ }
return clientConfigurationBuilder.build();
}