This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 62d814748d2 CAMEL-14831: Add camel-rocketmq component (#8820)
62d814748d2 is described below
commit 62d814748d2d8ca208bc93b3b17e24048ebf03d1
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed Dec 7 16:43:48 2022 +0800
CAMEL-14831: Add camel-rocketmq component (#8820)
* CAMEL-14831: Initial commit of RocketMQ component
* CAMEL-14831: Add rocketmq-component.adoc
* CAMEL-14831: Add camel-rocketmq to camel-parent
* CAMEL-14831: Format RocketMQConstants and document
* CAMEL-14831: Generate files for camel-rocketmq
* CAMEL-14831: Fix indent of pom.xml
* CAMEL-14831: Rename header keys
* CAMEL-14831: Regenerate rocketmq.json
* CAMEL-14831: Revise MetaData and code cleanup
* CAMEL-14831: Code cleanup and refactor
* CAMEL-14831: Add log marker to ReplyTimeoutMap
* CAMEL-14831: Perform code cleanup and format
* CAMEL-14831: Regenerate codes
* CAMEL-14831: Revise log in ReplyTimeoutMap
* CAMEL-14831: Clarify RocketMQ version compatibility in doc
---
catalog/camel-allcomponents/pom.xml | 5 +
components/camel-rocketmq/pom.xml | 93 ++++++++
.../rocketmq/RocketMQComponentConfigurer.java | 133 ++++++++++++
.../rocketmq/RocketMQEndpointConfigurer.java | 139 ++++++++++++
.../rocketmq/RocketMQEndpointUriFactory.java | 87 ++++++++
.../services/org/apache/camel/component.properties | 7 +
.../services/org/apache/camel/component/rocketmq | 2 +
.../org/apache/camel/configurer/rocketmq-component | 2 +
.../org/apache/camel/configurer/rocketmq-endpoint | 2 +
.../org/apache/camel/urifactory/rocketmq-endpoint | 2 +
.../apache/camel/component/rocketmq/rocketmq.json | 82 +++++++
.../src/main/docs/rocketmq-component.adoc | 112 ++++++++++
.../camel/component/rocketmq/RocketMQAclUtils.java | 36 ++++
.../component/rocketmq/RocketMQComponent.java | 216 +++++++++++++++++++
.../component/rocketmq/RocketMQConstants.java | 75 +++++++
.../camel/component/rocketmq/RocketMQConsumer.java | 95 +++++++++
.../camel/component/rocketmq/RocketMQEndpoint.java | 237 +++++++++++++++++++++
.../rocketmq/RocketMQMessageConverter.java | 47 ++++
.../camel/component/rocketmq/RocketMQProducer.java | 235 ++++++++++++++++++++
.../component/rocketmq/SendFailedException.java | 25 +++
.../component/rocketmq/reply/ReplyHandler.java | 27 +++
.../component/rocketmq/reply/ReplyHolder.java | 78 +++++++
.../component/rocketmq/reply/ReplyManager.java | 40 ++++
.../component/rocketmq/reply/ReplyTimeoutMap.java | 66 ++++++
.../rocketmq/reply/RocketMQReplyHandler.java | 58 +++++
.../reply/RocketMQReplyManagerSupport.java | 199 +++++++++++++++++
.../rocketmq/RocketMQRequestReplyRouteTest.java | 148 +++++++++++++
.../component/rocketmq/RocketMQRouteTest.java | 100 +++++++++
.../rocketmq/infra/EmbeddedRocketMQServer.java | 101 +++++++++
.../src/test/resources/logback-test.xml | 32 +++
components/pom.xml | 1 +
parent/pom.xml | 6 +
32 files changed, 2488 insertions(+)
diff --git a/catalog/camel-allcomponents/pom.xml
b/catalog/camel-allcomponents/pom.xml
index df0989d51d5..da2229d144d 100644
--- a/catalog/camel-allcomponents/pom.xml
+++ b/catalog/camel-allcomponents/pom.xml
@@ -1493,6 +1493,11 @@
<artifactId>camel-robotframework</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-rocketmq</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-rss</artifactId>
diff --git a/components/camel-rocketmq/pom.xml
b/components/camel-rocketmq/pom.xml
new file mode 100644
index 00000000000..365e3b2eacd
--- /dev/null
+++ b/components/camel-rocketmq/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>3.20.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-rocketmq</artifactId>
+ <packaging>jar</packaging>
+ <name>Camel :: RocketMQ</name>
+ <description>Camel RocketMQ Component</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-support</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-acl</artifactId>
+ <version>${rocketmq-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-namesrv</artifactId>
+ <version>${rocketmq-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-broker</artifactId>
+ <version>${rocketmq-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-test</artifactId>
+ <version>${rocketmq-version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>--add-opens java.base/java.nio=ALL-UNNAMED
--add-opens java.base/jdk.internal.ref=ALL-UNNAMED</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
new file mode 100644
index 00000000000..502e614c84f
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
@@ -0,0 +1,133 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class RocketMQComponentConfigurer extends PropertyConfigurerSupport
implements GeneratedPropertyConfigurer, PropertyConfigurerGetter {
+
+ @Override
+ public boolean configure(CamelContext camelContext, Object obj, String
name, Object value, boolean ignoreCase) {
+ RocketMQComponent target = (RocketMQComponent) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "accesskey":
+ case "accessKey": target.setAccessKey(property(camelContext,
java.lang.String.class, value)); return true;
+ case "autowiredenabled":
+ case "autowiredEnabled":
target.setAutowiredEnabled(property(camelContext, boolean.class, value));
return true;
+ case "bridgeerrorhandler":
+ case "bridgeErrorHandler":
target.setBridgeErrorHandler(property(camelContext, boolean.class, value));
return true;
+ case "consumergroup":
+ case "consumerGroup": target.setConsumerGroup(property(camelContext,
java.lang.String.class, value)); return true;
+ case "lazystartproducer":
+ case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
+ case "namesrvaddr":
+ case "namesrvAddr": target.setNamesrvAddr(property(camelContext,
java.lang.String.class, value)); return true;
+ case "producergroup":
+ case "producerGroup": target.setProducerGroup(property(camelContext,
java.lang.String.class, value)); return true;
+ case "replytoconsumergroup":
+ case "replyToConsumerGroup":
target.setReplyToConsumerGroup(property(camelContext, java.lang.String.class,
value)); return true;
+ case "replytotopic":
+ case "replyToTopic": target.setReplyToTopic(property(camelContext,
java.lang.String.class, value)); return true;
+ case "requesttimeoutcheckerintervalmillis":
+ case "requestTimeoutCheckerIntervalMillis":
target.setRequestTimeoutCheckerIntervalMillis(property(camelContext,
long.class, value)); return true;
+ case "requesttimeoutmillis":
+ case "requestTimeoutMillis":
target.setRequestTimeoutMillis(property(camelContext, long.class, value));
return true;
+ case "secretkey":
+ case "secretKey": target.setSecretKey(property(camelContext,
java.lang.String.class, value)); return true;
+ case "sendtag":
+ case "sendTag": target.setSendTag(property(camelContext,
java.lang.String.class, value)); return true;
+ case "subscribetags":
+ case "subscribeTags": target.setSubscribeTags(property(camelContext,
java.lang.String.class, value)); return true;
+ case "waitforsendresult":
+ case "waitForSendResult":
target.setWaitForSendResult(property(camelContext, boolean.class, value));
return true;
+ default: return false;
+ }
+ }
+
+ @Override
+ public Class<?> getOptionType(String name, boolean ignoreCase) {
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "accesskey":
+ case "accessKey": return java.lang.String.class;
+ case "autowiredenabled":
+ case "autowiredEnabled": return boolean.class;
+ case "bridgeerrorhandler":
+ case "bridgeErrorHandler": return boolean.class;
+ case "consumergroup":
+ case "consumerGroup": return java.lang.String.class;
+ case "lazystartproducer":
+ case "lazyStartProducer": return boolean.class;
+ case "namesrvaddr":
+ case "namesrvAddr": return java.lang.String.class;
+ case "producergroup":
+ case "producerGroup": return java.lang.String.class;
+ case "replytoconsumergroup":
+ case "replyToConsumerGroup": return java.lang.String.class;
+ case "replytotopic":
+ case "replyToTopic": return java.lang.String.class;
+ case "requesttimeoutcheckerintervalmillis":
+ case "requestTimeoutCheckerIntervalMillis": return long.class;
+ case "requesttimeoutmillis":
+ case "requestTimeoutMillis": return long.class;
+ case "secretkey":
+ case "secretKey": return java.lang.String.class;
+ case "sendtag":
+ case "sendTag": return java.lang.String.class;
+ case "subscribetags":
+ case "subscribeTags": return java.lang.String.class;
+ case "waitforsendresult":
+ case "waitForSendResult": return boolean.class;
+ default: return null;
+ }
+ }
+
+ @Override
+ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+ RocketMQComponent target = (RocketMQComponent) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "accesskey":
+ case "accessKey": return target.getAccessKey();
+ case "autowiredenabled":
+ case "autowiredEnabled": return target.isAutowiredEnabled();
+ case "bridgeerrorhandler":
+ case "bridgeErrorHandler": return target.isBridgeErrorHandler();
+ case "consumergroup":
+ case "consumerGroup": return target.getConsumerGroup();
+ case "lazystartproducer":
+ case "lazyStartProducer": return target.isLazyStartProducer();
+ case "namesrvaddr":
+ case "namesrvAddr": return target.getNamesrvAddr();
+ case "producergroup":
+ case "producerGroup": return target.getProducerGroup();
+ case "replytoconsumergroup":
+ case "replyToConsumerGroup": return target.getReplyToConsumerGroup();
+ case "replytotopic":
+ case "replyToTopic": return target.getReplyToTopic();
+ case "requesttimeoutcheckerintervalmillis":
+ case "requestTimeoutCheckerIntervalMillis": return
target.getRequestTimeoutCheckerIntervalMillis();
+ case "requesttimeoutmillis":
+ case "requestTimeoutMillis": return target.getRequestTimeoutMillis();
+ case "secretkey":
+ case "secretKey": return target.getSecretKey();
+ case "sendtag":
+ case "sendTag": return target.getSendTag();
+ case "subscribetags":
+ case "subscribeTags": return target.getSubscribeTags();
+ case "waitforsendresult":
+ case "waitForSendResult": return target.isWaitForSendResult();
+ default: return null;
+ }
+ }
+}
+
diff --git
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
new file mode 100644
index 00000000000..51cb6e6dc67
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
@@ -0,0 +1,139 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class RocketMQEndpointConfigurer extends PropertyConfigurerSupport
implements GeneratedPropertyConfigurer, PropertyConfigurerGetter {
+
+ @Override
+ public boolean configure(CamelContext camelContext, Object obj, String
name, Object value, boolean ignoreCase) {
+ RocketMQEndpoint target = (RocketMQEndpoint) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "accesskey":
+ case "accessKey": target.setAccessKey(property(camelContext,
java.lang.String.class, value)); return true;
+ case "bridgeerrorhandler":
+ case "bridgeErrorHandler":
target.setBridgeErrorHandler(property(camelContext, boolean.class, value));
return true;
+ case "consumergroup":
+ case "consumerGroup": target.setConsumerGroup(property(camelContext,
java.lang.String.class, value)); return true;
+ case "exceptionhandler":
+ case "exceptionHandler":
target.setExceptionHandler(property(camelContext,
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
+ case "exchangepattern":
+ case "exchangePattern":
target.setExchangePattern(property(camelContext,
org.apache.camel.ExchangePattern.class, value)); return true;
+ case "lazystartproducer":
+ case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
+ case "namesrvaddr":
+ case "namesrvAddr": target.setNamesrvAddr(property(camelContext,
java.lang.String.class, value)); return true;
+ case "producergroup":
+ case "producerGroup": target.setProducerGroup(property(camelContext,
java.lang.String.class, value)); return true;
+ case "replytoconsumergroup":
+ case "replyToConsumerGroup":
target.setReplyToConsumerGroup(property(camelContext, java.lang.String.class,
value)); return true;
+ case "replytotopic":
+ case "replyToTopic": target.setReplyToTopic(property(camelContext,
java.lang.String.class, value)); return true;
+ case "requesttimeoutcheckerintervalmillis":
+ case "requestTimeoutCheckerIntervalMillis":
target.setRequestTimeoutCheckerIntervalMillis(property(camelContext,
long.class, value)); return true;
+ case "requesttimeoutmillis":
+ case "requestTimeoutMillis":
target.setRequestTimeoutMillis(property(camelContext, long.class, value));
return true;
+ case "secretkey":
+ case "secretKey": target.setSecretKey(property(camelContext,
java.lang.String.class, value)); return true;
+ case "sendtag":
+ case "sendTag": target.setSendTag(property(camelContext,
java.lang.String.class, value)); return true;
+ case "subscribetags":
+ case "subscribeTags": target.setSubscribeTags(property(camelContext,
java.lang.String.class, value)); return true;
+ case "waitforsendresult":
+ case "waitForSendResult":
target.setWaitForSendResult(property(camelContext, boolean.class, value));
return true;
+ default: return false;
+ }
+ }
+
+ @Override
+ public Class<?> getOptionType(String name, boolean ignoreCase) {
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "accesskey":
+ case "accessKey": return java.lang.String.class;
+ case "bridgeerrorhandler":
+ case "bridgeErrorHandler": return boolean.class;
+ case "consumergroup":
+ case "consumerGroup": return java.lang.String.class;
+ case "exceptionhandler":
+ case "exceptionHandler": return
org.apache.camel.spi.ExceptionHandler.class;
+ case "exchangepattern":
+ case "exchangePattern": return org.apache.camel.ExchangePattern.class;
+ case "lazystartproducer":
+ case "lazyStartProducer": return boolean.class;
+ case "namesrvaddr":
+ case "namesrvAddr": return java.lang.String.class;
+ case "producergroup":
+ case "producerGroup": return java.lang.String.class;
+ case "replytoconsumergroup":
+ case "replyToConsumerGroup": return java.lang.String.class;
+ case "replytotopic":
+ case "replyToTopic": return java.lang.String.class;
+ case "requesttimeoutcheckerintervalmillis":
+ case "requestTimeoutCheckerIntervalMillis": return long.class;
+ case "requesttimeoutmillis":
+ case "requestTimeoutMillis": return long.class;
+ case "secretkey":
+ case "secretKey": return java.lang.String.class;
+ case "sendtag":
+ case "sendTag": return java.lang.String.class;
+ case "subscribetags":
+ case "subscribeTags": return java.lang.String.class;
+ case "waitforsendresult":
+ case "waitForSendResult": return boolean.class;
+ default: return null;
+ }
+ }
+
+ @Override
+ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+ RocketMQEndpoint target = (RocketMQEndpoint) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "accesskey":
+ case "accessKey": return target.getAccessKey();
+ case "bridgeerrorhandler":
+ case "bridgeErrorHandler": return target.isBridgeErrorHandler();
+ case "consumergroup":
+ case "consumerGroup": return target.getConsumerGroup();
+ case "exceptionhandler":
+ case "exceptionHandler": return target.getExceptionHandler();
+ case "exchangepattern":
+ case "exchangePattern": return target.getExchangePattern();
+ case "lazystartproducer":
+ case "lazyStartProducer": return target.isLazyStartProducer();
+ case "namesrvaddr":
+ case "namesrvAddr": return target.getNamesrvAddr();
+ case "producergroup":
+ case "producerGroup": return target.getProducerGroup();
+ case "replytoconsumergroup":
+ case "replyToConsumerGroup": return target.getReplyToConsumerGroup();
+ case "replytotopic":
+ case "replyToTopic": return target.getReplyToTopic();
+ case "requesttimeoutcheckerintervalmillis":
+ case "requestTimeoutCheckerIntervalMillis": return
target.getRequestTimeoutCheckerIntervalMillis();
+ case "requesttimeoutmillis":
+ case "requestTimeoutMillis": return target.getRequestTimeoutMillis();
+ case "secretkey":
+ case "secretKey": return target.getSecretKey();
+ case "sendtag":
+ case "sendTag": return target.getSendTag();
+ case "subscribetags":
+ case "subscribeTags": return target.getSubscribeTags();
+ case "waitforsendresult":
+ case "waitForSendResult": return target.isWaitForSendResult();
+ default: return null;
+ }
+ }
+}
+
diff --git
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
new file mode 100644
index 00000000000..d79d54d1f11
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
@@ -0,0 +1,87 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.rocketmq;
+
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.spi.EndpointUriFactory;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+public class RocketMQEndpointUriFactory extends
org.apache.camel.support.component.EndpointUriFactorySupport implements
EndpointUriFactory {
+
+ private static final String BASE = ":topicName";
+
+ private static final Set<String> PROPERTY_NAMES;
+ private static final Set<String> SECRET_PROPERTY_NAMES;
+ private static final Set<String> MULTI_VALUE_PREFIXES;
+ static {
+ Set<String> props = new HashSet<>(17);
+ props.add("accessKey");
+ props.add("bridgeErrorHandler");
+ props.add("consumerGroup");
+ props.add("exceptionHandler");
+ props.add("exchangePattern");
+ props.add("lazyStartProducer");
+ props.add("namesrvAddr");
+ props.add("producerGroup");
+ props.add("replyToConsumerGroup");
+ props.add("replyToTopic");
+ props.add("requestTimeoutCheckerIntervalMillis");
+ props.add("requestTimeoutMillis");
+ props.add("secretKey");
+ props.add("sendTag");
+ props.add("subscribeTags");
+ props.add("topicName");
+ props.add("waitForSendResult");
+ PROPERTY_NAMES = Collections.unmodifiableSet(props);
+ Set<String> secretProps = new HashSet<>(2);
+ secretProps.add("accessKey");
+ secretProps.add("secretKey");
+ SECRET_PROPERTY_NAMES = Collections.unmodifiableSet(secretProps);
+ MULTI_VALUE_PREFIXES = Collections.emptySet();
+ }
+
+ @Override
+ public boolean isEnabled(String scheme) {
+ return "rocketmq".equals(scheme);
+ }
+
+ @Override
+ public String buildUri(String scheme, Map<String, Object> properties,
boolean encode) throws URISyntaxException {
+ String syntax = scheme + BASE;
+ String uri = syntax;
+
+ Map<String, Object> copy = new HashMap<>(properties);
+
+ uri = buildPathParameter(syntax, uri, "topicName", null, true, copy);
+ uri = buildQueryParameters(uri, copy, encode);
+ return uri;
+ }
+
+ @Override
+ public Set<String> propertyNames() {
+ return PROPERTY_NAMES;
+ }
+
+ @Override
+ public Set<String> secretPropertyNames() {
+ return SECRET_PROPERTY_NAMES;
+ }
+
+ @Override
+ public Set<String> multiValuePrefixes() {
+ return MULTI_VALUE_PREFIXES;
+ }
+
+ @Override
+ public boolean isLenientProperties() {
+ return false;
+ }
+}
+
diff --git
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component.properties
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component.properties
new file mode 100644
index 00000000000..3e47499498b
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+components=rocketmq
+groupId=org.apache.camel
+artifactId=camel-rocketmq
+version=3.20.0-SNAPSHOT
+projectName=Camel :: RocketMQ
+projectDescription=Camel RocketMQ Component
diff --git
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component/rocketmq
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component/rocketmq
new file mode 100644
index 00000000000..6333b860582
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component/rocketmq
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQComponent
diff --git
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-component
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-component
new file mode 100644
index 00000000000..418db6c2d00
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-component
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQComponentConfigurer
diff --git
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-endpoint
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-endpoint
new file mode 100644
index 00000000000..bbcf03ee7e6
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQEndpointConfigurer
diff --git
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/urifactory/rocketmq-endpoint
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/urifactory/rocketmq-endpoint
new file mode 100644
index 00000000000..4fbd39b3303
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/urifactory/rocketmq-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQEndpointUriFactory
diff --git
a/components/camel-rocketmq/src/generated/resources/org/apache/camel/component/rocketmq/rocketmq.json
b/components/camel-rocketmq/src/generated/resources/org/apache/camel/component/rocketmq/rocketmq.json
new file mode 100644
index 00000000000..ed2629825c3
--- /dev/null
+++
b/components/camel-rocketmq/src/generated/resources/org/apache/camel/component/rocketmq/rocketmq.json
@@ -0,0 +1,82 @@
+{
+ "component": {
+ "kind": "component",
+ "name": "rocketmq",
+ "title": "RocketMQ",
+ "description": "Send and receive messages from RocketMQ cluster.",
+ "deprecated": false,
+ "firstVersion": "3.20.0",
+ "label": "messaging",
+ "javaType": "org.apache.camel.component.rocketmq.RocketMQComponent",
+ "supportLevel": "Preview",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-rocketmq",
+ "version": "3.20.0-SNAPSHOT",
+ "scheme": "rocketmq",
+ "extendsScheme": "",
+ "syntax": "rocketmq:topicName",
+ "async": true,
+ "api": false,
+ "consumerOnly": false,
+ "producerOnly": false,
+ "lenientProperties": false
+ },
+ "componentProperties": {
+ "namesrvAddr": { "kind": "property", "displayName": "Namesrv Addr",
"group": "common", "label": "common", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "localhost:9876", "description": "Name server
address of RocketMQ cluster." },
+ "sendTag": { "kind": "property", "displayName": "Send Tag", "group":
"common", "label": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Each message would be sent with this tag." },
+ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error
Handler", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Allows for bridging the
consumer to the Camel routing Error Handler, which mean any exceptions occurred
while the consumer is trying to pickup incoming messages, or the likes, will
now be processed as a me [...]
+ "consumerGroup": { "kind": "property", "displayName": "Consumer Group",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Consumer group name." },
+ "subscribeTags": { "kind": "property", "displayName": "Subscribe Tags",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "*", "description": "Subscribe tags of
consumer. Multiple tags could be split by , such as TagATagB" },
+ "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start
Producer", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether the producer
should be started lazy (on the first message). By starting lazy you can use
this to allow CamelContext and routes to startup in situations where a producer
may otherwise fail during star [...]
+ "producerGroup": { "kind": "property", "displayName": "Producer Group",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Producer group name." },
+ "replyToConsumerGroup": { "kind": "property", "displayName": "Reply To
Consumer Group", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Consumer group name used
for receiving response." },
+ "replyToTopic": { "kind": "property", "displayName": "Reply To Topic",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Topic used for receiving response when using
in-out pattern." },
+ "waitForSendResult": { "kind": "property", "displayName": "Wait For Send
Result", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether waiting for
send result before routing to next endpoint." },
+ "autowiredEnabled": { "kind": "property", "displayName": "Autowired
Enabled", "group": "advanced", "label": "advanced", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "description": "Whether autowiring is
enabled. This is used for automatic autowiring options (the option must be
marked as autowired) by looking up in the registry to find if there is a single
instance of matching type, which t [...]
+ "requestTimeoutCheckerIntervalMillis": { "kind": "property",
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced",
"label": "advanced", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000,
"description": "Check interval milliseconds of request timeout." },
+ "requestTimeoutMillis": { "kind": "property", "displayName": "Request
Timeout Millis", "group": "advanced", "label": "advanced", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10000, "description": "Timeout milliseconds of
receiving response when using in-out pattern." },
+ "accessKey": { "kind": "property", "displayName": "Access Key", "group":
"secret", "label": "secret", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": true,
"description": "Access key for RocketMQ ACL." },
+ "secretKey": { "kind": "property", "displayName": "Secret Key", "group":
"secret", "label": "secret", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": true,
"description": "Secret key for RocketMQ ACL." }
+ },
+ "headers": {
+ "CamelRockerMQTopic": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Topic of message", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#TOPIC" },
+ "CamelRockerMQTag": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Tag of message", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#TAG" },
+ "CamelRockerMQKey": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Key of message", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#KEY" },
+ "CamelRockerMQOverrideTopicName": { "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "If this header is set, the message will be
routed to the topic specified by this header instead of the origin topic in
endpoint.", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#OVERRIDE_TOPIC_NAME" },
+ "CamelRockerMQOverrideTag": { "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "If this header is set, the message's tag will
be set to value specified by this header instead of the sendTag defined in
endpoint.", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#OVERRIDE_TAG" },
+ "CamelRockerMQOverrideMessageKey": { "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Set keys for the message. When using in-out
pattern, the value will be prepended to the generated keys", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#OVERRIDE_MESSAGE_KEY" },
+ "CamelRockerMQBrokerName": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Broker name", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#BROKER_NAME" },
+ "CamelRockerMQQueueId": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "int",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Queue ID", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#QUEUE_ID" },
+ "CamelRockerMQStoreSize": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "int",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Store size", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#STORE_SIZE" },
+ "CamelRockerMQQueueOffset": { "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Queue offset", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#QUEUE_OFFSET" },
+ "CamelRockerMQSysFlag": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "int",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Sys flag", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#SYS_FLAG" },
+ "CamelRockerMQBornTimestamp": { "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Born timestamp", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#BORN_TIMESTAMP" },
+ "CamelRockerMQBornHost": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType":
"java.net.SocketAddress", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "Born host",
"constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#BORN_HOST" },
+ "CamelRockerMQStoreTimestamp": { "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Store timestamp", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#STORE_TIMESTAMP" },
+ "CamelRockerMQStoreHost": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType":
"java.net.SocketAddress", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "Store host",
"constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#STORE_HOST" },
+ "CamelRockerMQMsgId": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Msg ID", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#MSG_ID" },
+ "CamelRockerMQCommitLogOffset": { "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Commit log offset", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#COMMIT_LOG_OFFSET" },
+ "CamelRockerMQBodyCrc": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "int",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Body CRC", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#BODY_CRC" },
+ "CamelRockerMQReconsumeTimes": { "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType": "int",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Reconsume times", "constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#RECONSUME_TIMES" },
+ "CamelRockerMQPreparedTransactionOffset": { "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Prepard transaction offset",
"constantName":
"org.apache.camel.component.rocketmq.RocketMQConstants#PREPARED_TRANSACTION_OFFSET"
}
+ },
+ "properties": {
+ "topicName": { "kind": "path", "displayName": "Topic Name", "group":
"common", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Topic name of this endpoint." },
+ "namesrvAddr": { "kind": "parameter", "displayName": "Namesrv Addr",
"group": "common", "label": "common", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "localhost:9876", "description": "Name server
address of RocketMQ cluster." },
+ "consumerGroup": { "kind": "parameter", "displayName": "Consumer Group",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Consumer group name." },
+ "subscribeTags": { "kind": "parameter", "displayName": "Subscribe Tags",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "*", "description": "Subscribe tags of
consumer. Multiple tags could be split by , such as TagATagB" },
+ "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error
Handler", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": false,
"description": "Allows for bridging the consumer to the Camel routing Error
Handler, which mean any exceptions occurred while the consumer is trying to
pickup incoming messages, or the likes, will now [...]
+ "exceptionHandler": { "kind": "parameter", "displayName": "Exception
Handler", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By default the
con [...]
+ "exchangePattern": { "kind": "parameter", "displayName": "Exchange
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut",
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the exchange pattern when the consumer creates an
exchange." },
+ "producerGroup": { "kind": "parameter", "displayName": "Producer Group",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Producer group name." },
+ "replyToConsumerGroup": { "kind": "parameter", "displayName": "Reply To
Consumer Group", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Consumer group name used
for receiving response." },
+ "replyToTopic": { "kind": "parameter", "displayName": "Reply To Topic",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Topic used for receiving response when using
in-out pattern." },
+ "sendTag": { "kind": "parameter", "displayName": "Send Tag", "group":
"producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Each message would be sent with this tag." },
+ "waitForSendResult": { "kind": "parameter", "displayName": "Wait For Send
Result", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether waiting for
send result before routing to next endpoint." },
+ "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start
Producer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": false,
"description": "Whether the producer should be started lazy (on the first
message). By starting lazy you can use this to allow CamelContext and routes to
startup in situations where a producer may other [...]
+ "requestTimeoutCheckerIntervalMillis": { "kind": "parameter",
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced",
"label": "advanced", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000,
"description": "Check interval milliseconds of request timeout." },
+ "requestTimeoutMillis": { "kind": "parameter", "displayName": "Request
Timeout Millis", "group": "advanced", "label": "advanced", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10000, "description": "Timeout milliseconds of
receiving response when using in-out pattern." },
+ "accessKey": { "kind": "parameter", "displayName": "Access Key", "group":
"security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Access key for RocketMQ ACL." },
+ "secretKey": { "kind": "parameter", "displayName": "Secret Key", "group":
"security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Secret key for RocketMQ ACL." }
+ }
+}
diff --git a/components/camel-rocketmq/src/main/docs/rocketmq-component.adoc
b/components/camel-rocketmq/src/main/docs/rocketmq-component.adoc
new file mode 100644
index 00000000000..4822aa4b8ee
--- /dev/null
+++ b/components/camel-rocketmq/src/main/docs/rocketmq-component.adoc
@@ -0,0 +1,112 @@
+= RocketMQ Component
+:doctitle: RocketMQ
+:shortname: rocketmq
+:artifactid: camel-rocketmq
+:description: Send and receive messages from RocketMQ cluster.
+:since: 3.20
+:supportlevel: Preview
+:component-header: Both producer and consumer are supported
+
+*Since Camel {since}*
+
+*{component-header}*
+
+The RocketMQ component allows you to produce and consume messages from
+https://rocketmq.apache.org/[RocketMQ] instances.
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,xml]
+----
+<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-rocketmq</artifactId>
+ <version>x.x.x</version>
+ <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+[NOTE]
+====
+Since RocketMQ 5.x API is compatible with 4.x, this component works with both
RocketMQ 4.x and 5.x.
+Users could change RocketMQ dependencies on their own.
+====
+
+== URI format
+
+----
+rocketmq:topicName?[options]
+----
+
+The topic name determines the topic to which the produced
+messages will be sent to. In the case of consumers, the topic name
+determines the topic will be subscribed.
+This component uses RocketMQ push consumer by default.
+
+// component-configure options: START
+
+// component-configure options: END
+
+// component options: START
+include::partial$component-configure-options.adoc[]
+include::partial$component-endpoint-options.adoc[]
+// component options: END
+
+// endpoint options: START
+
+// endpoint options: END
+
+// component headers: START
+include::partial$component-endpoint-headers.adoc[]
+// component headers: END
+
+== InOut Pattern
+
+InOut Pattern based on Message Key. When the producer sending the message, a
messageKey will be generated and append to the message's key.
+
+After the message sent, a consumer will listen to the topic configured by the
parameter `ReplyToTopic`.
+
+When a message from `ReplyToTpic` contains the key, it means that the reply
received and continue routing.
+
+If `requestTimeoutMillis` elapsed and no reply received, an exception will be
thrown.
+
+[source,java]
+----
+from("rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1")
+
+.to(ExchangePattern.InOut, "rocketmq:INTERMEDIATE_TOPIC" +
+ "?producerGroup=intermediaProducer" +
+ "&consumerGroup=intermediateConsumer" +
+ "&replyToTopic=REPLY_TO_TOPIC" +
+ "&replyToConsumerGroup=replyToConsumerGroup" +
+ "&requestTimeoutMillis=30000")
+
+.to("log:InOutRoute?showAll=true")
+----
+
+== Examples
+
+Receive messages from a topic named `from_topic`, route to `to_topic`.
+
+[source,java]
+----
+from("rocketmq:FROM_TOPIC?namesrvAddr=localhost:9876&consumerGroup=consumer")
+ .to("rocketmq:TO_TOPIC?namesrvAddr=localhost:9876&producerGroup=producer");
+----
+
+Setting specific headers can change routing behaviour. For example, if header
`RocketMQConstants.OVERRIDE_TOPIC_NAME` was set,
+the message will be sent to `ACTUAL_TARGET` instead of `ORIGIN_TARGET`.
+
+[source,java]
+----
+from("rocketmq:FROM?consumerGroup=consumer")
+ .process(exchange -> {
+
exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME,
"ACTUAL_TARGET");
+ exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_TAG,
"OVERRIDE_TAG");
+
exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY,
"OVERRIDE_MESSAGE_KEY");
+ }
+)
+.to("rocketmq:ORIGIN_TARGET?producerGroup=producer")
+.to("log:RocketRoute?showAll=true")
+----
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQAclUtils.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQAclUtils.java
new file mode 100644
index 00000000000..453c04e84e7
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQAclUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public final class RocketMQAclUtils {
+
+ private RocketMQAclUtils() {
+ }
+
+ public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+ if (StringUtils.isNotBlank(accessKey) &&
StringUtils.isNotBlank(secretKey)) {
+ return new AclClientRPCHook(new SessionCredentials(accessKey,
secretKey));
+ }
+ return null;
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
new file mode 100644
index 00000000000..08063c8a152
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+@Component("rocketmq")
+public class RocketMQComponent extends DefaultComponent {
+
+ @Metadata(label = "producer")
+ private String producerGroup;
+
+ @Metadata(label = "consumer")
+ private String consumerGroup;
+
+ @Metadata(label = "consumer", defaultValue = "*")
+ private String subscribeTags = "*";
+
+ @Metadata(label = "common")
+ private String sendTag = "";
+
+ @Metadata(label = "common", defaultValue = "localhost:9876")
+ private String namesrvAddr = "localhost:9876";
+
+ @Metadata(label = "producer")
+ private String replyToTopic;
+
+ @Metadata(label = "producer")
+ private String replyToConsumerGroup;
+
+ @Metadata(label = "advanced", defaultValue = "10000")
+ private long requestTimeoutMillis = 10000L;
+
+ @Metadata(label = "advanced", defaultValue = "1000")
+ private long requestTimeoutCheckerIntervalMillis = 1000L;
+
+ @Metadata(label = "producer", defaultValue = "false")
+ private boolean waitForSendResult;
+
+ @Metadata(label = "secret", secret = true)
+ private String accessKey;
+
+ @Metadata(label = "secret", secret = true)
+ private String secretKey;
+
+ @Override
+ protected RocketMQEndpoint createEndpoint(String uri, String remaining,
Map<String, Object> parameters) throws Exception {
+ RocketMQEndpoint endpoint = new RocketMQEndpoint(uri, this);
+ endpoint.setProducerGroup(getProducerGroup());
+ endpoint.setConsumerGroup(getConsumerGroup());
+ endpoint.setSubscribeTags(getSubscribeTags());
+ endpoint.setNamesrvAddr(getNamesrvAddr());
+ endpoint.setSendTag(getSendTag());
+ endpoint.setReplyToTopic(getReplyToTopic());
+ endpoint.setReplyToConsumerGroup(getReplyToConsumerGroup());
+ endpoint.setRequestTimeoutMillis(getRequestTimeoutMillis());
+
endpoint.setRequestTimeoutCheckerIntervalMillis(getRequestTimeoutCheckerIntervalMillis());
+ endpoint.setWaitForSendResult(isWaitForSendResult());
+ endpoint.setAccessKey(getAccessKey());
+ endpoint.setSecretKey(getSecretKey());
+ setProperties(endpoint, parameters);
+ endpoint.setTopicName(remaining);
+ return endpoint;
+ }
+
+ public String getSubscribeTags() {
+ return subscribeTags;
+ }
+
+ /**
+ * Subscribe tags of consumer. Multiple tags could be split by "||", such
as "TagA||TagB"
+ */
+ public void setSubscribeTags(String subscribeTags) {
+ this.subscribeTags = subscribeTags;
+ }
+
+ public String getSendTag() {
+ return sendTag;
+ }
+
+ /**
+ * Each message would be sent with this tag.
+ */
+ public void setSendTag(String sendTag) {
+ this.sendTag = sendTag;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ /**
+ * Name server address of RocketMQ cluster.
+ */
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ /**
+ * Producer group name.
+ */
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ /**
+ * Consumer group name.
+ */
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getReplyToTopic() {
+ return replyToTopic;
+ }
+
+ /**
+ * Topic used for receiving response when using in-out pattern.
+ */
+ public void setReplyToTopic(String replyToTopic) {
+ this.replyToTopic = replyToTopic;
+ }
+
+ public String getReplyToConsumerGroup() {
+ return replyToConsumerGroup;
+ }
+
+ /**
+ * Consumer group name used for receiving response.
+ */
+ public void setReplyToConsumerGroup(String replyToConsumerGroup) {
+ this.replyToConsumerGroup = replyToConsumerGroup;
+ }
+
+ public long getRequestTimeoutMillis() {
+ return requestTimeoutMillis;
+ }
+
+ /**
+ * Timeout milliseconds of receiving response when using in-out pattern.
+ */
+ public void setRequestTimeoutMillis(long requestTimeoutMillis) {
+ this.requestTimeoutMillis = requestTimeoutMillis;
+ }
+
+ public long getRequestTimeoutCheckerIntervalMillis() {
+ return requestTimeoutCheckerIntervalMillis;
+ }
+
+ /**
+ * Check interval milliseconds of request timeout.
+ */
+ public void setRequestTimeoutCheckerIntervalMillis(long
requestTimeoutCheckerIntervalMillis) {
+ this.requestTimeoutCheckerIntervalMillis =
requestTimeoutCheckerIntervalMillis;
+ }
+
+ public boolean isWaitForSendResult() {
+ return waitForSendResult;
+ }
+
+ /**
+ * Whether waiting for send result before routing to next endpoint.
+ */
+ public void setWaitForSendResult(boolean waitForSendResult) {
+ this.waitForSendResult = waitForSendResult;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ /**
+ * Access key for RocketMQ ACL.
+ */
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ /**
+ * Secret key for RocketMQ ACL.
+ */
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java
new file mode 100644
index 00000000000..6c0ab3140b4
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.spi.Metadata;
+
+public final class RocketMQConstants {
+
+ @Metadata(label = "consumer", description = "Topic of message", javaType =
"String")
+ public static final String TOPIC = "CamelRockerMQTopic";
+ @Metadata(label = "consumer", description = "Tag of message", javaType =
"String")
+ public static final String TAG = "CamelRockerMQTag";
+ @Metadata(label = "consumer", description = "Key of message", javaType =
"String")
+ public static final String KEY = "CamelRockerMQKey";
+ @Metadata(label = "producer",
+ description = "If this header is set, the message will be routed
to the topic specified by this header\n" +
+ "instead of the origin topic in endpoint.",
+ javaType = "String")
+ public static final String OVERRIDE_TOPIC_NAME =
"CamelRockerMQOverrideTopicName";
+ @Metadata(label = "producer",
+ description = "If this header is set, the message's tag will be
set to value specified by this header\n" +
+ "instead of the sendTag defined in endpoint.",
+ javaType = "String")
+ public static final String OVERRIDE_TAG = "CamelRockerMQOverrideTag";
+ @Metadata(label = "producer", description = "Set keys for the message.
When using in-out pattern,\n" +
+ "the value will be prepended
to the generated keys",
+ javaType = "String")
+ public static final String OVERRIDE_MESSAGE_KEY =
"CamelRockerMQOverrideMessageKey";
+ @Metadata(label = "consumer", description = "Broker name", javaType =
"String")
+ public static final String BROKER_NAME = "CamelRockerMQBrokerName";
+ @Metadata(label = "consumer", description = "Queue ID", javaType = "int")
+ public static final String QUEUE_ID = "CamelRockerMQQueueId";
+ @Metadata(label = "consumer", description = "Store size", javaType = "int")
+ public static final String STORE_SIZE = "CamelRockerMQStoreSize";
+ @Metadata(label = "consumer", description = "Queue offset", javaType =
"long")
+ public static final String QUEUE_OFFSET = "CamelRockerMQQueueOffset";
+ @Metadata(label = "consumer", description = "Sys flag", javaType = "int")
+ public static final String SYS_FLAG = "CamelRockerMQSysFlag";
+ @Metadata(label = "consumer", description = "Born timestamp", javaType =
"long")
+ public static final String BORN_TIMESTAMP = "CamelRockerMQBornTimestamp";
+ @Metadata(label = "consumer", description = "Born host", javaType =
"java.net.SocketAddress")
+ public static final String BORN_HOST = "CamelRockerMQBornHost";
+ @Metadata(label = "consumer", description = "Store timestamp", javaType =
"long")
+ public static final String STORE_TIMESTAMP = "CamelRockerMQStoreTimestamp";
+ @Metadata(label = "consumer", description = "Store host", javaType =
"java.net.SocketAddress")
+ public static final String STORE_HOST = "CamelRockerMQStoreHost";
+ @Metadata(label = "consumer", description = "Msg ID", javaType = "String")
+ public static final String MSG_ID = "CamelRockerMQMsgId";
+ @Metadata(label = "consumer", description = "Commit log offset", javaType
= "long")
+ public static final String COMMIT_LOG_OFFSET =
"CamelRockerMQCommitLogOffset";
+ @Metadata(label = "consumer", description = "Body CRC", javaType = "int")
+ public static final String BODY_CRC = "CamelRockerMQBodyCrc";
+ @Metadata(label = "consumer", description = "Reconsume times", javaType =
"int")
+ public static final String RECONSUME_TIMES = "CamelRockerMQReconsumeTimes";
+ @Metadata(label = "consumer", description = "Prepard transaction offset",
javaType = "long")
+ public static final String PREPARED_TRANSACTION_OFFSET =
"CamelRockerMQPreparedTransactionOffset";
+
+ private RocketMQConstants() {
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
new file mode 100644
index 00000000000..6cfa996b861
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Suspendable;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class RocketMQConsumer extends DefaultConsumer implements Suspendable {
+
+ private final RocketMQEndpoint endpoint;
+
+ private DefaultMQPushConsumer mqPushConsumer;
+
+ public RocketMQConsumer(RocketMQEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ }
+
+ private void startConsumer() throws MQClientException {
+ mqPushConsumer = new DefaultMQPushConsumer(
+ null, endpoint.getConsumerGroup(),
+ RocketMQAclUtils.getAclRPCHook(getEndpoint().getAccessKey(),
getEndpoint().getSecretKey()));
+ mqPushConsumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+ mqPushConsumer.subscribe(endpoint.getTopicName(),
endpoint.getSubscribeTags());
+ mqPushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ MessageExt messageExt = msgs.get(0);
+ Exchange exchange =
endpoint.createRocketExchange(messageExt.getBody());
+
RocketMQMessageConverter.populateHeadersByMessageExt(exchange.getIn(),
messageExt);
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ mqPushConsumer.start();
+ }
+
+ private void stopConsumer() {
+ if (mqPushConsumer != null) {
+ mqPushConsumer.shutdown();
+ mqPushConsumer = null;
+ }
+ }
+
+ @Override
+ public RocketMQEndpoint getEndpoint() {
+ return (RocketMQEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected void doSuspend() {
+ stopConsumer();
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ startConsumer();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ startConsumer();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ stopConsumer();
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
new file mode 100644
index 00000000000..a77ec99f018
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.AsyncEndpoint;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultMessage;
+
+/**
+ * Send and receive messages from <a
href="https://rocketmq.apache.org/">RocketMQ</a> cluster.
+ */
+@UriEndpoint(firstVersion = "3.20.0", scheme = "rocketmq", syntax =
"rocketmq:topicName", title = "RocketMQ",
+ category = Category.MESSAGING, headersClass =
RocketMQConstants.class)
+public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint
{
+
+ @UriPath
+ @Metadata(required = true)
+ private String topicName;
+ @UriParam(label = "producer")
+ private String producerGroup;
+ @UriParam(label = "consumer")
+ private String consumerGroup;
+ @UriParam(label = "consumer", defaultValue = "*")
+ private String subscribeTags = "*";
+ @UriParam(label = "producer")
+ private String sendTag = "";
+ @UriParam(label = "producer")
+ private String replyToTopic;
+ @UriParam(label = "producer")
+ private String replyToConsumerGroup;
+ @UriParam(label = "common", defaultValue = "localhost:9876")
+ private String namesrvAddr = "localhost:9876";
+ @UriParam(label = "advanced", defaultValue = "10000")
+ private long requestTimeoutMillis = 10000L;
+ @UriParam(label = "advanced", defaultValue = "1000")
+ private long requestTimeoutCheckerIntervalMillis = 1000L;
+ @UriParam(label = "producer", defaultValue = "false")
+ private boolean waitForSendResult;
+ @UriParam(label = "security", secret = true)
+ private String accessKey;
+ @UriParam(label = "security", secret = true)
+ private String secretKey;
+
+ public RocketMQEndpoint() {
+ }
+
+ public RocketMQEndpoint(String endpointUri, RocketMQComponent component) {
+ super(endpointUri, component);
+ }
+
+ @Override
+ public Producer createProducer() {
+ return new RocketMQProducer(this);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ RocketMQConsumer consumer = new RocketMQConsumer(this, processor);
+ configureConsumer(consumer);
+ return consumer;
+ }
+
+ public Exchange createRocketExchange(byte[] body) {
+ Exchange exchange = super.createExchange();
+ DefaultMessage message = new DefaultMessage(exchange.getContext());
+ message.setBody(body);
+ exchange.setIn(message);
+ return exchange;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ /**
+ * Topic name of this endpoint.
+ */
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public String getSubscribeTags() {
+ return subscribeTags;
+ }
+
+ /**
+ * Subscribe tags of consumer. Multiple tags could be split by "||", such
as "TagA||TagB"
+ */
+ public void setSubscribeTags(String subscribeTags) {
+ this.subscribeTags = subscribeTags;
+ }
+
+ public String getSendTag() {
+ return sendTag;
+ }
+
+ /**
+ * Each message would be sent with this tag.
+ */
+ public void setSendTag(String sendTag) {
+ this.sendTag = sendTag;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ /**
+ * Name server address of RocketMQ cluster.
+ */
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ /**
+ * Producer group name.
+ */
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ /**
+ * Consumer group name.
+ */
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getReplyToTopic() {
+ return replyToTopic;
+ }
+
+ /**
+ * Topic used for receiving response when using in-out pattern.
+ */
+ public void setReplyToTopic(String replyToTopic) {
+ this.replyToTopic = replyToTopic;
+ }
+
+ public String getReplyToConsumerGroup() {
+ return replyToConsumerGroup;
+ }
+
+ /**
+ * Consumer group name used for receiving response.
+ */
+ public void setReplyToConsumerGroup(String replyToConsumerGroup) {
+ this.replyToConsumerGroup = replyToConsumerGroup;
+ }
+
+ public long getRequestTimeoutMillis() {
+ return requestTimeoutMillis;
+ }
+
+ /**
+ * Timeout milliseconds of receiving response when using in-out pattern.
+ */
+ public void setRequestTimeoutMillis(long requestTimeoutMillis) {
+ this.requestTimeoutMillis = requestTimeoutMillis;
+ }
+
+ public long getRequestTimeoutCheckerIntervalMillis() {
+ return requestTimeoutCheckerIntervalMillis;
+ }
+
+ /**
+ * Check interval milliseconds of request timeout.
+ */
+ public void setRequestTimeoutCheckerIntervalMillis(long
requestTimeoutCheckerIntervalMillis) {
+ this.requestTimeoutCheckerIntervalMillis =
requestTimeoutCheckerIntervalMillis;
+ }
+
+ public boolean isWaitForSendResult() {
+ return waitForSendResult;
+ }
+
+ /**
+ * Whether waiting for send result before routing to next endpoint.
+ */
+ public void setWaitForSendResult(boolean waitForSendResult) {
+ this.waitForSendResult = waitForSendResult;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ /**
+ * Access key for RocketMQ ACL.
+ */
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ /**
+ * Secret key for RocketMQ ACL.
+ */
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQMessageConverter.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQMessageConverter.java
new file mode 100644
index 00000000000..cb8e4a0e119
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQMessageConverter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public final class RocketMQMessageConverter {
+
+ private RocketMQMessageConverter() {
+ }
+
+ public static void populateHeadersByMessageExt(final Message message,
final MessageExt messageExt) {
+ message.setHeader(RocketMQConstants.TOPIC, messageExt.getTopic());
+ message.setHeader(RocketMQConstants.TAG, messageExt.getTags());
+ message.setHeader(RocketMQConstants.KEY, messageExt.getKeys());
+ message.setHeader(RocketMQConstants.BROKER_NAME,
messageExt.getBrokerName());
+ message.setHeader(RocketMQConstants.QUEUE_ID, messageExt.getQueueId());
+ message.setHeader(RocketMQConstants.STORE_SIZE,
messageExt.getStoreSize());
+ message.setHeader(RocketMQConstants.QUEUE_OFFSET,
messageExt.getQueueOffset());
+ message.setHeader(RocketMQConstants.SYS_FLAG, messageExt.getSysFlag());
+ message.setHeader(RocketMQConstants.BORN_TIMESTAMP,
messageExt.getBornTimestamp());
+ message.setHeader(RocketMQConstants.BORN_HOST,
messageExt.getBornHost());
+ message.setHeader(RocketMQConstants.STORE_TIMESTAMP,
messageExt.getStoreTimestamp());
+ message.setHeader(RocketMQConstants.STORE_HOST,
messageExt.getStoreHost());
+ message.setHeader(RocketMQConstants.MSG_ID, messageExt.getMsgId());
+ message.setHeader(RocketMQConstants.COMMIT_LOG_OFFSET,
messageExt.getCommitLogOffset());
+ message.setHeader(RocketMQConstants.BODY_CRC, messageExt.getBodyCRC());
+ message.setHeader(RocketMQConstants.RECONSUME_TIMES,
messageExt.getReconsumeTimes());
+ message.setHeader(RocketMQConstants.PREPARED_TRANSACTION_OFFSET,
messageExt.getPreparedTransactionOffset());
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
new file mode 100644
index 00000000000..2eaa76ffc1e
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.rocketmq.reply.ReplyManager;
+import org.apache.camel.component.rocketmq.reply.RocketMQReplyManagerSupport;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQProducer extends DefaultAsyncProducer {
+
+ public static final String GENERATE_MESSAGE_KEY_PREFIX = "camel-rocketmq-";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RocketMQProducer.class);
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private DefaultMQProducer mqProducer;
+
+ private ReplyManager replyManager;
+
+ public RocketMQProducer(RocketMQEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public RocketMQEndpoint getEndpoint() {
+ return (RocketMQEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (!isRunAllowed()) {
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(true);
+ return true;
+ }
+ try {
+ LOG.trace("Exchange Pattern {}", exchange.getPattern());
+ if (exchange.getPattern().isOutCapable()) {
+ return processInOut(exchange, callback);
+ } else {
+ return processInOnly(exchange, callback);
+ }
+ } catch (Throwable e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ }
+
+ protected boolean processInOut(final Exchange exchange, final
AsyncCallback callback)
+ throws RemotingException, MQClientException, InterruptedException,
NoTypeConversionAvailableException {
+ org.apache.camel.Message in = exchange.getIn();
+ Message message = new Message();
+ message.setTopic(in.getHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME,
() -> getEndpoint().getTopicName(), String.class));
+ message.setTags(in.getHeader(RocketMQConstants.OVERRIDE_TAG, () ->
getEndpoint().getSendTag(), String.class));
+
message.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class,
exchange, in.getBody()));
+ message.setKeys(in.getHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY,
"", String.class));
+ initReplyManager();
+ String generateKey = GENERATE_MESSAGE_KEY_PREFIX +
getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
+
message.setKeys(Arrays.asList(Optional.ofNullable(message.getKeys()).orElse(""),
generateKey));
+ LOG.debug("RocketMQ Producer sending {}", message);
+ mqProducer.send(message, new SendCallback() {
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+ exchange.setException(new
SendFailedException(sendResult.toString()));
+ callback.done(false);
+ return;
+ }
+ if (replyManager == null) {
+ LOG.warn("replyToTopic not set! Will not wait for reply.");
+ callback.done(false);
+ return;
+ }
+ replyManager.registerReply(replyManager, exchange, callback,
generateKey,
+ getEndpoint().getRequestTimeoutMillis());
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ try {
+ replyManager.cancelMessageKey(generateKey);
+ exchange.setException(e);
+ } finally {
+ callback.done(false);
+ }
+ }
+ });
+ return false;
+ }
+
+ protected void initReplyManager() {
+ if (!started.get()) {
+ synchronized (this) {
+ if (started.get()) {
+ return;
+ }
+ LOG.debug("Starting reply manager");
+ ClassLoader current =
Thread.currentThread().getContextClassLoader();
+ ClassLoader ac =
getEndpoint().getCamelContext().getApplicationContextClassLoader();
+ try {
+ if (ac != null) {
+ Thread.currentThread().setContextClassLoader(ac);
+ }
+ if (getEndpoint().getReplyToTopic() != null) {
+ replyManager = createReplyManager();
+ LOG.debug("Using RocketMQReplyManager: {} to process
replies from topic {}", replyManager,
+ getEndpoint().getReplyToTopic());
+ }
+ } catch (Exception e) {
+ throw new FailedToCreateProducerException(getEndpoint(),
e);
+ } finally {
+ if (ac != null) {
+ Thread.currentThread().setContextClassLoader(current);
+ }
+ }
+ started.set(true);
+ }
+ }
+ }
+
+ protected void unInitReplyManager() {
+ try {
+ if (replyManager != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping RocketMQReplyManager: {} from
processing replies from : {}", replyManager,
+ getEndpoint().getReplyToTopic());
+ }
+ ServiceHelper.stopService(replyManager);
+ }
+ } catch (Exception e) {
+ throw RuntimeCamelException.wrapRuntimeCamelException(e);
+ } finally {
+ started.set(false);
+ }
+ }
+
+ private ReplyManager createReplyManager() {
+ RocketMQReplyManagerSupport replyManager = new
RocketMQReplyManagerSupport(getEndpoint().getCamelContext());
+ replyManager.setEndpoint(getEndpoint());
+ String name = "RocketMQReplyManagerTimeoutChecker[" +
getEndpoint().getTopicName() + "]";
+ ScheduledExecutorService scheduledExecutorService
+ =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
name);
+ replyManager.setScheduledExecutorService(scheduledExecutorService);
+ LOG.debug("Starting ReplyManager: {}", name);
+ ServiceHelper.startService(replyManager);
+ return replyManager;
+ }
+
+ protected boolean processInOnly(Exchange exchange, AsyncCallback callback)
+ throws NoTypeConversionAvailableException, InterruptedException,
RemotingException, MQClientException {
+ org.apache.camel.Message in = exchange.getIn();
+ Message message = new Message();
+ message.setTopic(in.getHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME,
() -> getEndpoint().getTopicName(), String.class));
+ message.setTags(in.getHeader(RocketMQConstants.OVERRIDE_TAG, () ->
getEndpoint().getSendTag(), String.class));
+
message.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class,
exchange, in.getBody()));
+ message.setKeys(in.getHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY,
"", String.class));
+ LOG.debug("RocketMQ Producer sending {}", message);
+ boolean waitForSendResult = getEndpoint().isWaitForSendResult();
+ mqProducer.send(message, new SendCallback() {
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+ exchange.setException(new
SendFailedException(sendResult.toString()));
+ }
+ callback.done(!waitForSendResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ exchange.setException(e);
+ callback.done(!waitForSendResult);
+ }
+ });
+ // return false to wait send callback
+ return !waitForSendResult;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ this.mqProducer = new DefaultMQProducer(
+ null, getEndpoint().getProducerGroup(),
+ RocketMQAclUtils.getAclRPCHook(getEndpoint().getAccessKey(),
getEndpoint().getSecretKey()));
+ this.mqProducer.setNamesrvAddr(getEndpoint().getNamesrvAddr());
+ this.mqProducer.start();
+ }
+
+ @Override
+ protected void doStop() {
+ unInitReplyManager();
+ this.mqProducer.shutdown();
+ this.mqProducer = null;
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/SendFailedException.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/SendFailedException.java
new file mode 100644
index 00000000000..12dd22ad5af
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/SendFailedException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+public class SendFailedException extends RuntimeException {
+
+ public SendFailedException(String message) {
+ super(message);
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHandler.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHandler.java
new file mode 100644
index 00000000000..7a3eae37f2e
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHandler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+public interface ReplyHandler {
+
+ void onReply(String messageKey, MessageExt messageExt);
+
+ void onTimeout(String messageKey);
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHolder.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHolder.java
new file mode 100644
index 00000000000..08728869960
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHolder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class ReplyHolder {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final String messageKey;
+ private final MessageExt messageExt;
+ private long timeout;
+
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String
messageKey, MessageExt messageExt) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.messageExt = messageExt;
+ this.messageKey = messageKey;
+ }
+
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String
messageKey, long timeout) {
+ this(exchange, callback, messageKey, null);
+ this.timeout = timeout;
+ }
+
+ public boolean isTimeout() {
+ return messageExt == null;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public AsyncCallback getCallback() {
+ return callback;
+ }
+
+ public MessageExt getMessageExt() {
+ return messageExt;
+ }
+
+ public String getMessageKey() {
+ return messageKey;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ @Override
+ public String toString() {
+ return "ReplyHolder{" +
+ "exchange=" + exchange +
+ ", callback=" + callback +
+ ", messageKey='" + messageKey + '\'' +
+ ", messageExt=" + messageExt +
+ ", timeout=" + timeout +
+ '}';
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyManager.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyManager.java
new file mode 100644
index 00000000000..79153aa20cb
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.rocketmq.RocketMQEndpoint;
+
+public interface ReplyManager {
+
+ void setEndpoint(RocketMQEndpoint endpoint);
+
+ void setReplyToTopic(String replyToTopic);
+
+ String registerReply(
+ ReplyManager replyManager, Exchange exchange, AsyncCallback
callback, String messageKey, long requestTimeout);
+
+ void setScheduledExecutorService(ScheduledExecutorService executorService);
+
+ void processReply(ReplyHolder holder);
+
+ void cancelMessageKey(String messageKey);
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java
new file mode 100644
index 00000000000..0ddc479d259
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+public class ReplyTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+ public ReplyTimeoutMap(ScheduledExecutorService executor, long
requestMapPollTimeMillis) {
+ super(executor, requestMapPollTimeMillis);
+ addListener(this::listener);
+ }
+
+ private static long encode(long timeoutMillis) {
+ return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE;
+ }
+
+ private void listener(Listener.Type type, String key, ReplyHandler
handler) {
+ switch (type) {
+ case Put:
+ log.trace("Added messageKey: {}", key);
+ break;
+ case Remove:
+ log.trace("Removed messageKey: {}", key);
+ break;
+ case Evict:
+ try {
+ handler.onTimeout(key);
+ } catch (Throwable e) {
+ log.warn("Error processing onTimeout for messageKey: {}
due: {}. " +
+ "This exception is ignored.",
+ key, e.getLocalizedMessage(), e);
+ }
+ log.trace("Evicted messageKey: {}", key);
+ break;
+ default:
+ }
+ }
+
+ @Override
+ public ReplyHandler put(String key, ReplyHandler value, long
timeoutMillis) {
+ return super.put(key, value, encode(timeoutMillis));
+ }
+
+ @Override
+ public ReplyHandler putIfAbsent(String key, ReplyHandler value, long
timeoutMillis) {
+ return super.putIfAbsent(key, value, encode(timeoutMillis));
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyHandler.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyHandler.java
new file mode 100644
index 00000000000..93a2f48a334
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQReplyHandler implements ReplyHandler {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(RocketMQReplyHandler.class);
+
+ protected final ReplyManager replyManager;
+ protected final Exchange exchange;
+ protected final AsyncCallback callback;
+ protected final String messageKey;
+ protected final long timeout;
+
+ public RocketMQReplyHandler(ReplyManager replyManager, Exchange exchange,
AsyncCallback callback, String messageKey,
+ long timeout) {
+ this.replyManager = replyManager;
+ this.exchange = exchange;
+ this.callback = callback;
+ this.messageKey = messageKey;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public void onReply(String messageKey, MessageExt messageExt) {
+ LOG.debug("onReply with messageKey: {}", messageKey);
+ ReplyHolder holder = new ReplyHolder(exchange, callback, messageKey,
messageExt);
+ replyManager.processReply(holder);
+ }
+
+ @Override
+ public void onTimeout(String messageKey) {
+ LOG.debug("onTimeout with messageKey: {}", messageKey);
+ ReplyHolder holder = new ReplyHolder(exchange, callback, messageKey,
timeout);
+ replyManager.processReply(holder);
+ }
+}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.java
new file mode 100644
index 00000000000..ec2b025d5d8
--- /dev/null
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Message;
+import org.apache.camel.component.rocketmq.RocketMQEndpoint;
+import org.apache.camel.component.rocketmq.RocketMQMessageConverter;
+import org.apache.camel.component.rocketmq.RocketMQProducer;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQReplyManagerSupport extends ServiceSupport implements
ReplyManager {
+
+ private static final int CLOSE_TIMEOUT = 30 * 1000;
+
+ protected final Logger log =
LoggerFactory.getLogger(RocketMQReplyManagerSupport.class);
+ protected final CamelContext camelContext;
+ protected final CountDownLatch replyToLatch = new CountDownLatch(1);
+ protected ScheduledExecutorService executorService;
+ protected RocketMQEndpoint endpoint;
+ protected String replyToTopic;
+ protected DefaultMQPushConsumer mqPushConsumer;
+ protected ReplyTimeoutMap timeoutMap;
+
+ public RocketMQReplyManagerSupport(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(executorService, "executorService", this);
+ ObjectHelper.notNull(endpoint, "endpoint", this);
+
+ log.debug("Using timeout checker interval with {} millis",
endpoint.getRequestTimeoutCheckerIntervalMillis());
+ timeoutMap = new ReplyTimeoutMap(executorService,
endpoint.getRequestTimeoutCheckerIntervalMillis());
+ ServiceHelper.startService(timeoutMap);
+
+ mqPushConsumer = createConsumer();
+ mqPushConsumer.start();
+
+ log.debug("Using executor {}", executorService);
+ }
+
+ protected DefaultMQPushConsumer createConsumer() throws MQClientException {
+ setReplyToTopic(endpoint.getReplyToTopic());
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
+ consumer.setConsumerGroup(endpoint.getReplyToConsumerGroup());
+ consumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+ consumer.subscribe(replyToTopic, "*");
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> {
+ MessageExt messageExt = msgs.get(0);
+ onMessage(messageExt);
+ log.trace("Consume message {}", messageExt);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ return consumer;
+ }
+
+ public void onMessage(MessageExt messageExt) {
+ String messageKey =
Arrays.stream(messageExt.getKeys().split(MessageConst.KEY_SEPARATOR))
+ .filter(s ->
s.startsWith(RocketMQProducer.GENERATE_MESSAGE_KEY_PREFIX)).findFirst().orElse(null);
+ if (messageKey == null) {
+ log.warn("Ignoring message with no messageKey: {}", messageExt);
+ return;
+ }
+
+ log.debug("Received reply message with messageKey [{}] -> {}",
messageKey, messageExt);
+ handleReplyMessage(messageKey, messageExt);
+ }
+
+ @Override
+ protected void doStop() {
+ ServiceHelper.stopService(timeoutMap);
+
+ if (mqPushConsumer != null) {
+ log.debug("Closing connection: {} with timeout: {} ms.",
mqPushConsumer, CLOSE_TIMEOUT);
+ mqPushConsumer.shutdown();
+ mqPushConsumer = null;
+ }
+
+ if (executorService != null) {
+
camelContext.getExecutorServiceManager().shutdownGraceful(executorService);
+ executorService = null;
+ }
+ }
+
+ @Override
+ public void setEndpoint(RocketMQEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void setReplyToTopic(String replyToTopic) {
+ log.debug("ReplyToTopic: {}", replyToTopic);
+ this.replyToTopic = replyToTopic;
+ replyToLatch.countDown();
+ }
+
+ @Override
+ public String registerReply(
+ ReplyManager replyManager, Exchange exchange, AsyncCallback
callback, String messageKey, long requestTimeout) {
+ RocketMQReplyHandler handler = new RocketMQReplyHandler(replyManager,
exchange, callback, messageKey, requestTimeout);
+ ReplyHandler result = timeoutMap.putIfAbsent(messageKey, handler,
requestTimeout);
+ if (result != null) {
+ String logMessage = String.format("The messageKey [%s] is not
unique.", messageKey);
+ throw new IllegalArgumentException(logMessage);
+ }
+ return messageKey;
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService
executorService) {
+ this.executorService = executorService;
+ }
+
+ @Override
+ public void processReply(ReplyHolder holder) {
+ if (!isRunAllowed()) {
+ return;
+ }
+ try {
+ Exchange exchange = holder.getExchange();
+ if (holder.isTimeout()) {
+ if (log.isWarnEnabled()) {
+ log.warn("Timeout occurred after {} millis waiting for
reply message with messageKey [{}] on topic {}." +
+ " Setting ExchangeTimedOutException on {} and
continue routing.",
+ holder.getTimeout(), holder.getMessageKey(),
replyToTopic, ExchangeHelper.logIds(exchange));
+ }
+ String msg = "reply message with messageKey: " +
holder.getMessageKey() + " not received on topic: "
+ + replyToTopic;
+ exchange.setException(new ExchangeTimedOutException(exchange,
holder.getTimeout(), msg));
+ } else {
+ processReceivedReply(holder);
+ }
+ } finally {
+ holder.getCallback().done(false);
+ }
+ }
+
+ private static void processReceivedReply(ReplyHolder holder) {
+ Message message = holder.getExchange().getOut();
+ MessageExt messageExt = holder.getMessageExt();
+ message.setBody(messageExt.getBody());
+ RocketMQMessageConverter.populateHeadersByMessageExt(message,
messageExt);
+ }
+
+ @Override
+ public void cancelMessageKey(String messageKey) {
+ if (null == timeoutMap.get(messageKey)) {
+ return;
+ }
+ log.warn("Cancelling messageKey: {}", messageKey);
+ timeoutMap.remove(messageKey);
+ }
+
+ protected void handleReplyMessage(String messageKey, MessageExt
messageExt) {
+ ReplyHandler handler = timeoutMap.get(messageKey);
+ if (handler != null) {
+ timeoutMap.remove(messageKey);
+ handler.onReply(messageKey, messageExt);
+ } else {
+ log.warn("Reply received for unknown messageKey [{}]. The message
will be ignored: {}", messageKey, messageExt);
+ }
+ }
+
+}
diff --git
a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
new file mode 100644
index 00000000000..78f801834ae
--- /dev/null
+++
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
+
+ private static final int NAMESRV_PORT = 59877;
+
+ private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
+
+ private static final String START_ENDPOINT_URI =
"rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1";
+
+ private static final String INTERMEDIATE_ENDPOINT_URI =
"rocketmq:INTERMEDIATE_TOPIC" +
+
"?producerGroup=intermediaProducer" +
+
"&consumerGroup=intermediateConsumer" +
+
"&replyToTopic=REPLY_TO_TOPIC" +
+
"&replyToConsumerGroup=replyToConsumerGroup" +
+
"&requestTimeoutMillis=30000";
+
+ private static final String RESULT_ENDPOINT_URI = "mock:result";
+
+ private static final String EXPECTED_MESSAGE = "Hi.";
+
+ private static NamesrvController namesrvController;
+
+ private static BrokerController brokerController;
+
+ private MockEndpoint resultEndpoint;
+
+ private DefaultMQPushConsumer replierConsumer;
+
+ private DefaultMQProducer replierProducer;
+
+ @BeforeAll
+ static void beforeAll() throws Exception {
+ namesrvController =
EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
+ brokerController =
EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
+ EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster",
"START_TOPIC");
+ EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster",
"INTERMEDIATE_TOPIC");
+ EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster",
"REPLY_TO_TOPIC");
+ }
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ resultEndpoint = (MockEndpoint)
context.getEndpoint(RESULT_ENDPOINT_URI);
+ replierProducer = new DefaultMQProducer("replierProducer");
+ replierProducer.setNamesrvAddr(NAMESRV_ADDR);
+ replierProducer.start();
+ replierConsumer = new DefaultMQPushConsumer("replierConsumer");
+ replierConsumer.setNamesrvAddr(NAMESRV_ADDR);
+ replierConsumer.subscribe("INTERMEDIATE_TOPIC", "*");
+ replierConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, unused) -> {
+ MessageExt messageExt = msgs.get(0);
+ String key = messageExt.getKeys();
+ Message response = new Message("REPLY_TO_TOPIC", "", key,
EXPECTED_MESSAGE.getBytes(StandardCharsets.UTF_8));
+ try {
+ replierProducer.send(response);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ replierConsumer.start();
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ RocketMQComponent rocketMQComponent = new RocketMQComponent();
+ rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+ camelContext.addComponent("rocketmq", rocketMQComponent);
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from(START_ENDPOINT_URI)
+ .to(ExchangePattern.InOut, INTERMEDIATE_ENDPOINT_URI)
+ .to(RESULT_ENDPOINT_URI);
+ }
+ };
+ }
+
+ @Test
+ public void testRouteMessageInRequestReplyMode() throws Exception {
+ resultEndpoint.expectedBodiesReceived(EXPECTED_MESSAGE);
+
resultEndpoint.message(0).header(RocketMQConstants.TOPIC).isEqualTo("REPLY_TO_TOPIC");
+
+ template.sendBody(START_ENDPOINT_URI, "hello, RocketMQ.");
+
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ replierConsumer.shutdown();
+ replierProducer.shutdown();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ brokerController.shutdown();
+ namesrvController.shutdown();
+ }
+}
diff --git
a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
new file mode 100644
index 00000000000..8deba43ad8d
--- /dev/null
+++
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class RocketMQRouteTest extends CamelTestSupport {
+
+ public static final String EXPECTED_MESSAGE = "hello, RocketMQ.";
+
+ private static final int NAMESRV_PORT = 59876;
+
+ private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
+
+ private static final String START_ENDPOINT_URI =
"rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1&sendTag=startTag";
+
+ private static final String RESULT_ENDPOINT_URI = "mock:result";
+
+ private static NamesrvController namesrvController;
+
+ private static BrokerController brokerController;
+
+ private MockEndpoint resultEndpoint;
+
+ @BeforeAll
+ static void beforeAll() throws Exception {
+ namesrvController =
EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
+ brokerController =
EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
+ EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster",
"START_TOPIC");
+ }
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ resultEndpoint = (MockEndpoint)
context.getEndpoint(RESULT_ENDPOINT_URI);
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ RocketMQComponent rocketMQComponent = new RocketMQComponent();
+ rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+ camelContext.addComponent("rocketmq", rocketMQComponent);
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from(START_ENDPOINT_URI).to(RESULT_ENDPOINT_URI);
+ }
+ };
+ }
+
+ @Test
+ public void testSimpleRoute() throws Exception {
+ resultEndpoint.expectedBodiesReceived(EXPECTED_MESSAGE);
+
resultEndpoint.message(0).header(RocketMQConstants.TOPIC).isEqualTo("START_TOPIC");
+
resultEndpoint.message(0).header(RocketMQConstants.TAG).isEqualTo("startTag");
+
+ template.sendBody(START_ENDPOINT_URI, EXPECTED_MESSAGE);
+
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ brokerController.shutdown();
+ namesrvController.shutdown();
+ }
+}
diff --git
a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
new file mode 100644
index 00000000000..153f6c4ca47
--- /dev/null
+++
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.infra;
+
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.TestUtils;
+
+public final class EmbeddedRocketMQServer {
+
+ private static final AtomicInteger BROKER_INDEX = new AtomicInteger();
+
+ private static final AtomicInteger BROKER_PORTS = new AtomicInteger(61000);
+
+ private EmbeddedRocketMQServer() {
+ }
+
+ public static NamesrvController createAndStartNamesrv(int port) throws
Exception {
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(port);
+ NamesrvController result = new NamesrvController(new NamesrvConfig(),
serverConfig);
+ result.initialize();
+ result.start();
+ return result;
+ }
+
+ public static BrokerController createAndStartBroker(String nsAddr) throws
Exception {
+ NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ nettyServerConfig.setListenPort(BROKER_PORTS.getAndIncrement());
+ BrokerController result = new BrokerController(
+ prepareBrokerConfig(nsAddr), nettyServerConfig, new
NettyClientConfig(), prepareMessageStoreConfig());
+ result.initialize();
+ result.start();
+ return result;
+ }
+
+ private static BrokerConfig prepareBrokerConfig(final String nsAddr) {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setNamesrvAddr(nsAddr);
+ brokerConfig.setBrokerName("CamelRocketMQBroker" +
BROKER_INDEX.getAndIncrement());
+ brokerConfig.setBrokerIP1("127.0.0.1");
+ brokerConfig.setSendMessageThreadPoolNums(1);
+ brokerConfig.setPutMessageFutureThreadPoolNums(1);
+ brokerConfig.setPullMessageThreadPoolNums(1);
+ brokerConfig.setProcessReplyMessageThreadPoolNums(1);
+ brokerConfig.setQueryMessageThreadPoolNums(1);
+ brokerConfig.setAdminBrokerThreadPoolNums(1);
+ brokerConfig.setClientManageThreadPoolNums(1);
+ brokerConfig.setConsumerManageThreadPoolNums(1);
+ brokerConfig.setHeartbeatThreadPoolNums(1);
+ return brokerConfig;
+ }
+
+ private static MessageStoreConfig prepareMessageStoreConfig() {
+ String baseDir = System.getProperty("java.io.tmpdir") +
"/embedded_rocketmq/" + System.currentTimeMillis();
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ storeConfig.setStorePathRootDir(baseDir);
+ storeConfig.setStorePathCommitLog(baseDir + "/commitlog");
+ storeConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
+ storeConfig.setMaxIndexNum(100);
+ storeConfig.setMaxHashSlotNum(400);
+ storeConfig.setHaListenPort(BROKER_PORTS.getAndIncrement());
+ return storeConfig;
+ }
+
+ public static void createTopic(String namesrvAddr, String defaultCluster,
String topic) throws TimeoutException {
+ long startTime = System.currentTimeMillis();
+ while (!MQAdmin.createTopic(namesrvAddr, defaultCluster, topic, 4, 3))
{
+ if (System.currentTimeMillis() - startTime > 30 * 1000) {
+ throw new TimeoutException(
+ String.format("Failed to create topic [%s] after %d
ms", topic,
+ System.currentTimeMillis() - startTime));
+ }
+ TestUtils.waitForMoment(500);
+ }
+ }
+}
diff --git a/components/camel-rocketmq/src/test/resources/logback-test.xml
b/components/camel-rocketmq/src/test/resources/logback-test.xml
new file mode 100644
index 00000000000..d45c9a37733
--- /dev/null
+++ b/components/camel-rocketmq/src/test/resources/logback-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<configuration>
+ <appender name="DefaultAppender"
class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss} [%p] [%t] - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <root>
+ <level value="OFF"/>
+ <appender-ref ref="DefaultAppender"/>
+ </root>
+</configuration>
diff --git a/components/pom.xml b/components/pom.xml
index 3bbe9690c1b..6e704a03504 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -272,6 +272,7 @@
<module>camel-resourceresolver-github</module>
<module>camel-resteasy</module>
<module>camel-robotframework</module>
+ <module>camel-rocketmq</module>
<module>camel-rss</module>
<module>camel-rxjava</module>
<module>camel-saga</module>
diff --git a/parent/pom.xml b/parent/pom.xml
index 0c29d0212a7..9466cb192b7 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -467,6 +467,7 @@
<resteasy-version>4.5.6.Final</resteasy-version>
<roaster-version>2.26.0.Final</roaster-version>
<robotframework-version>4.1.2</robotframework-version>
+ <rocketmq-version>4.9.4</rocketmq-version>
<rome-version>1.18.0</rome-version>
<rxjava2-version>2.2.21</rxjava2-version>
<saxon-version>11.4</saxon-version>
@@ -2202,6 +2203,11 @@
<artifactId>camel-robotframework</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-rocketmq</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-rss</artifactId>