This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 62d814748d2 CAMEL-14831: Add camel-rocketmq component (#8820)
62d814748d2 is described below

commit 62d814748d2d8ca208bc93b3b17e24048ebf03d1
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed Dec 7 16:43:48 2022 +0800

    CAMEL-14831: Add camel-rocketmq component (#8820)
    
    * CAMEL-14831: Initial commit of RocketMQ component
    
    * CAMEL-14831: Add rocketmq-component.adoc
    
    * CAMEL-14831: Add camel-rocketmq to camel-parent
    
    * CAMEL-14831: Format RocketMQConstants and document
    
    * CAMEL-14831: Generate files for camel-rocketmq
    
    * CAMEL-14831: Fix indent of pom.xml
    
    * CAMEL-14831: Rename header keys
    
    * CAMEL-14831: Regenerate rocketmq.json
    
    * CAMEL-14831: Revise MetaData and code cleanup
    
    * CAMEL-14831: Code cleanup and refactor
    
    * CAMEL-14831: Add log marker to ReplyTimeoutMap
    
    * CAMEL-14831: Perform code cleanup and format
    
    * CAMEL-14831: Regenerate codes
    
    * CAMEL-14831: Revise log in ReplyTimeoutMap
    
    * CAMEL-14831: Clarify RocketMQ version compatibility in doc
---
 catalog/camel-allcomponents/pom.xml                |   5 +
 components/camel-rocketmq/pom.xml                  |  93 ++++++++
 .../rocketmq/RocketMQComponentConfigurer.java      | 133 ++++++++++++
 .../rocketmq/RocketMQEndpointConfigurer.java       | 139 ++++++++++++
 .../rocketmq/RocketMQEndpointUriFactory.java       |  87 ++++++++
 .../services/org/apache/camel/component.properties |   7 +
 .../services/org/apache/camel/component/rocketmq   |   2 +
 .../org/apache/camel/configurer/rocketmq-component |   2 +
 .../org/apache/camel/configurer/rocketmq-endpoint  |   2 +
 .../org/apache/camel/urifactory/rocketmq-endpoint  |   2 +
 .../apache/camel/component/rocketmq/rocketmq.json  |  82 +++++++
 .../src/main/docs/rocketmq-component.adoc          | 112 ++++++++++
 .../camel/component/rocketmq/RocketMQAclUtils.java |  36 ++++
 .../component/rocketmq/RocketMQComponent.java      | 216 +++++++++++++++++++
 .../component/rocketmq/RocketMQConstants.java      |  75 +++++++
 .../camel/component/rocketmq/RocketMQConsumer.java |  95 +++++++++
 .../camel/component/rocketmq/RocketMQEndpoint.java | 237 +++++++++++++++++++++
 .../rocketmq/RocketMQMessageConverter.java         |  47 ++++
 .../camel/component/rocketmq/RocketMQProducer.java | 235 ++++++++++++++++++++
 .../component/rocketmq/SendFailedException.java    |  25 +++
 .../component/rocketmq/reply/ReplyHandler.java     |  27 +++
 .../component/rocketmq/reply/ReplyHolder.java      |  78 +++++++
 .../component/rocketmq/reply/ReplyManager.java     |  40 ++++
 .../component/rocketmq/reply/ReplyTimeoutMap.java  |  66 ++++++
 .../rocketmq/reply/RocketMQReplyHandler.java       |  58 +++++
 .../reply/RocketMQReplyManagerSupport.java         | 199 +++++++++++++++++
 .../rocketmq/RocketMQRequestReplyRouteTest.java    | 148 +++++++++++++
 .../component/rocketmq/RocketMQRouteTest.java      | 100 +++++++++
 .../rocketmq/infra/EmbeddedRocketMQServer.java     | 101 +++++++++
 .../src/test/resources/logback-test.xml            |  32 +++
 components/pom.xml                                 |   1 +
 parent/pom.xml                                     |   6 +
 32 files changed, 2488 insertions(+)

diff --git a/catalog/camel-allcomponents/pom.xml 
b/catalog/camel-allcomponents/pom.xml
index df0989d51d5..da2229d144d 100644
--- a/catalog/camel-allcomponents/pom.xml
+++ b/catalog/camel-allcomponents/pom.xml
@@ -1493,6 +1493,11 @@
             <artifactId>camel-robotframework</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-rocketmq</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-rss</artifactId>
diff --git a/components/camel-rocketmq/pom.xml 
b/components/camel-rocketmq/pom.xml
new file mode 100644
index 00000000000..365e3b2eacd
--- /dev/null
+++ b/components/camel-rocketmq/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>3.20.0-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>camel-rocketmq</artifactId>
+    <packaging>jar</packaging>
+    <name>Camel :: RocketMQ</name>
+    <description>Camel RocketMQ Component</description>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-acl</artifactId>
+            <version>${rocketmq-version}</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+            <version>${rocketmq-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+            <version>${rocketmq-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-test</artifactId>
+            <version>${rocketmq-version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <argLine>--add-opens java.base/java.nio=ALL-UNNAMED 
--add-opens java.base/jdk.internal.ref=ALL-UNNAMED</argLine>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
new file mode 100644
index 00000000000..502e614c84f
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQComponentConfigurer.java
@@ -0,0 +1,133 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class RocketMQComponentConfigurer extends PropertyConfigurerSupport 
implements GeneratedPropertyConfigurer, PropertyConfigurerGetter {
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
+        RocketMQComponent target = (RocketMQComponent) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": target.setAccessKey(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "autowiredenabled":
+        case "autowiredEnabled": 
target.setAutowiredEnabled(property(camelContext, boolean.class, value)); 
return true;
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
+        case "consumergroup":
+        case "consumerGroup": target.setConsumerGroup(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "lazystartproducer":
+        case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "namesrvaddr":
+        case "namesrvAddr": target.setNamesrvAddr(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "producergroup":
+        case "producerGroup": target.setProducerGroup(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "replytoconsumergroup":
+        case "replyToConsumerGroup": 
target.setReplyToConsumerGroup(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "replytotopic":
+        case "replyToTopic": target.setReplyToTopic(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "requesttimeoutcheckerintervalmillis":
+        case "requestTimeoutCheckerIntervalMillis": 
target.setRequestTimeoutCheckerIntervalMillis(property(camelContext, 
long.class, value)); return true;
+        case "requesttimeoutmillis":
+        case "requestTimeoutMillis": 
target.setRequestTimeoutMillis(property(camelContext, long.class, value)); 
return true;
+        case "secretkey":
+        case "secretKey": target.setSecretKey(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "sendtag":
+        case "sendTag": target.setSendTag(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "subscribetags":
+        case "subscribeTags": target.setSubscribeTags(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "waitforsendresult":
+        case "waitForSendResult": 
target.setWaitForSendResult(property(camelContext, boolean.class, value)); 
return true;
+        default: return false;
+        }
+    }
+
+    @Override
+    public Class<?> getOptionType(String name, boolean ignoreCase) {
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": return java.lang.String.class;
+        case "autowiredenabled":
+        case "autowiredEnabled": return boolean.class;
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": return boolean.class;
+        case "consumergroup":
+        case "consumerGroup": return java.lang.String.class;
+        case "lazystartproducer":
+        case "lazyStartProducer": return boolean.class;
+        case "namesrvaddr":
+        case "namesrvAddr": return java.lang.String.class;
+        case "producergroup":
+        case "producerGroup": return java.lang.String.class;
+        case "replytoconsumergroup":
+        case "replyToConsumerGroup": return java.lang.String.class;
+        case "replytotopic":
+        case "replyToTopic": return java.lang.String.class;
+        case "requesttimeoutcheckerintervalmillis":
+        case "requestTimeoutCheckerIntervalMillis": return long.class;
+        case "requesttimeoutmillis":
+        case "requestTimeoutMillis": return long.class;
+        case "secretkey":
+        case "secretKey": return java.lang.String.class;
+        case "sendtag":
+        case "sendTag": return java.lang.String.class;
+        case "subscribetags":
+        case "subscribeTags": return java.lang.String.class;
+        case "waitforsendresult":
+        case "waitForSendResult": return boolean.class;
+        default: return null;
+        }
+    }
+
+    @Override
+    public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+        RocketMQComponent target = (RocketMQComponent) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": return target.getAccessKey();
+        case "autowiredenabled":
+        case "autowiredEnabled": return target.isAutowiredEnabled();
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": return target.isBridgeErrorHandler();
+        case "consumergroup":
+        case "consumerGroup": return target.getConsumerGroup();
+        case "lazystartproducer":
+        case "lazyStartProducer": return target.isLazyStartProducer();
+        case "namesrvaddr":
+        case "namesrvAddr": return target.getNamesrvAddr();
+        case "producergroup":
+        case "producerGroup": return target.getProducerGroup();
+        case "replytoconsumergroup":
+        case "replyToConsumerGroup": return target.getReplyToConsumerGroup();
+        case "replytotopic":
+        case "replyToTopic": return target.getReplyToTopic();
+        case "requesttimeoutcheckerintervalmillis":
+        case "requestTimeoutCheckerIntervalMillis": return 
target.getRequestTimeoutCheckerIntervalMillis();
+        case "requesttimeoutmillis":
+        case "requestTimeoutMillis": return target.getRequestTimeoutMillis();
+        case "secretkey":
+        case "secretKey": return target.getSecretKey();
+        case "sendtag":
+        case "sendTag": return target.getSendTag();
+        case "subscribetags":
+        case "subscribeTags": return target.getSubscribeTags();
+        case "waitforsendresult":
+        case "waitForSendResult": return target.isWaitForSendResult();
+        default: return null;
+        }
+    }
+}
+
diff --git 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
new file mode 100644
index 00000000000..51cb6e6dc67
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointConfigurer.java
@@ -0,0 +1,139 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class RocketMQEndpointConfigurer extends PropertyConfigurerSupport 
implements GeneratedPropertyConfigurer, PropertyConfigurerGetter {
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
+        RocketMQEndpoint target = (RocketMQEndpoint) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": target.setAccessKey(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
+        case "consumergroup":
+        case "consumerGroup": target.setConsumerGroup(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "exceptionhandler":
+        case "exceptionHandler": 
target.setExceptionHandler(property(camelContext, 
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
+        case "exchangepattern":
+        case "exchangePattern": 
target.setExchangePattern(property(camelContext, 
org.apache.camel.ExchangePattern.class, value)); return true;
+        case "lazystartproducer":
+        case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "namesrvaddr":
+        case "namesrvAddr": target.setNamesrvAddr(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "producergroup":
+        case "producerGroup": target.setProducerGroup(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "replytoconsumergroup":
+        case "replyToConsumerGroup": 
target.setReplyToConsumerGroup(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "replytotopic":
+        case "replyToTopic": target.setReplyToTopic(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "requesttimeoutcheckerintervalmillis":
+        case "requestTimeoutCheckerIntervalMillis": 
target.setRequestTimeoutCheckerIntervalMillis(property(camelContext, 
long.class, value)); return true;
+        case "requesttimeoutmillis":
+        case "requestTimeoutMillis": 
target.setRequestTimeoutMillis(property(camelContext, long.class, value)); 
return true;
+        case "secretkey":
+        case "secretKey": target.setSecretKey(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "sendtag":
+        case "sendTag": target.setSendTag(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "subscribetags":
+        case "subscribeTags": target.setSubscribeTags(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "waitforsendresult":
+        case "waitForSendResult": 
target.setWaitForSendResult(property(camelContext, boolean.class, value)); 
return true;
+        default: return false;
+        }
+    }
+
+    @Override
+    public Class<?> getOptionType(String name, boolean ignoreCase) {
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": return java.lang.String.class;
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": return boolean.class;
+        case "consumergroup":
+        case "consumerGroup": return java.lang.String.class;
+        case "exceptionhandler":
+        case "exceptionHandler": return 
org.apache.camel.spi.ExceptionHandler.class;
+        case "exchangepattern":
+        case "exchangePattern": return org.apache.camel.ExchangePattern.class;
+        case "lazystartproducer":
+        case "lazyStartProducer": return boolean.class;
+        case "namesrvaddr":
+        case "namesrvAddr": return java.lang.String.class;
+        case "producergroup":
+        case "producerGroup": return java.lang.String.class;
+        case "replytoconsumergroup":
+        case "replyToConsumerGroup": return java.lang.String.class;
+        case "replytotopic":
+        case "replyToTopic": return java.lang.String.class;
+        case "requesttimeoutcheckerintervalmillis":
+        case "requestTimeoutCheckerIntervalMillis": return long.class;
+        case "requesttimeoutmillis":
+        case "requestTimeoutMillis": return long.class;
+        case "secretkey":
+        case "secretKey": return java.lang.String.class;
+        case "sendtag":
+        case "sendTag": return java.lang.String.class;
+        case "subscribetags":
+        case "subscribeTags": return java.lang.String.class;
+        case "waitforsendresult":
+        case "waitForSendResult": return boolean.class;
+        default: return null;
+        }
+    }
+
+    @Override
+    public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+        RocketMQEndpoint target = (RocketMQEndpoint) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "accesskey":
+        case "accessKey": return target.getAccessKey();
+        case "bridgeerrorhandler":
+        case "bridgeErrorHandler": return target.isBridgeErrorHandler();
+        case "consumergroup":
+        case "consumerGroup": return target.getConsumerGroup();
+        case "exceptionhandler":
+        case "exceptionHandler": return target.getExceptionHandler();
+        case "exchangepattern":
+        case "exchangePattern": return target.getExchangePattern();
+        case "lazystartproducer":
+        case "lazyStartProducer": return target.isLazyStartProducer();
+        case "namesrvaddr":
+        case "namesrvAddr": return target.getNamesrvAddr();
+        case "producergroup":
+        case "producerGroup": return target.getProducerGroup();
+        case "replytoconsumergroup":
+        case "replyToConsumerGroup": return target.getReplyToConsumerGroup();
+        case "replytotopic":
+        case "replyToTopic": return target.getReplyToTopic();
+        case "requesttimeoutcheckerintervalmillis":
+        case "requestTimeoutCheckerIntervalMillis": return 
target.getRequestTimeoutCheckerIntervalMillis();
+        case "requesttimeoutmillis":
+        case "requestTimeoutMillis": return target.getRequestTimeoutMillis();
+        case "secretkey":
+        case "secretKey": return target.getSecretKey();
+        case "sendtag":
+        case "sendTag": return target.getSendTag();
+        case "subscribetags":
+        case "subscribeTags": return target.getSubscribeTags();
+        case "waitforsendresult":
+        case "waitForSendResult": return target.isWaitForSendResult();
+        default: return null;
+        }
+    }
+}
+
diff --git 
a/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
new file mode 100644
index 00000000000..d79d54d1f11
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/java/org/apache/camel/component/rocketmq/RocketMQEndpointUriFactory.java
@@ -0,0 +1,87 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.rocketmq;
+
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.spi.EndpointUriFactory;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+public class RocketMQEndpointUriFactory extends 
org.apache.camel.support.component.EndpointUriFactorySupport implements 
EndpointUriFactory {
+
+    private static final String BASE = ":topicName";
+
+    private static final Set<String> PROPERTY_NAMES;
+    private static final Set<String> SECRET_PROPERTY_NAMES;
+    private static final Set<String> MULTI_VALUE_PREFIXES;
+    static {
+        Set<String> props = new HashSet<>(17);
+        props.add("accessKey");
+        props.add("bridgeErrorHandler");
+        props.add("consumerGroup");
+        props.add("exceptionHandler");
+        props.add("exchangePattern");
+        props.add("lazyStartProducer");
+        props.add("namesrvAddr");
+        props.add("producerGroup");
+        props.add("replyToConsumerGroup");
+        props.add("replyToTopic");
+        props.add("requestTimeoutCheckerIntervalMillis");
+        props.add("requestTimeoutMillis");
+        props.add("secretKey");
+        props.add("sendTag");
+        props.add("subscribeTags");
+        props.add("topicName");
+        props.add("waitForSendResult");
+        PROPERTY_NAMES = Collections.unmodifiableSet(props);
+        Set<String> secretProps = new HashSet<>(2);
+        secretProps.add("accessKey");
+        secretProps.add("secretKey");
+        SECRET_PROPERTY_NAMES = Collections.unmodifiableSet(secretProps);
+        MULTI_VALUE_PREFIXES = Collections.emptySet();
+    }
+
+    @Override
+    public boolean isEnabled(String scheme) {
+        return "rocketmq".equals(scheme);
+    }
+
+    @Override
+    public String buildUri(String scheme, Map<String, Object> properties, 
boolean encode) throws URISyntaxException {
+        String syntax = scheme + BASE;
+        String uri = syntax;
+
+        Map<String, Object> copy = new HashMap<>(properties);
+
+        uri = buildPathParameter(syntax, uri, "topicName", null, true, copy);
+        uri = buildQueryParameters(uri, copy, encode);
+        return uri;
+    }
+
+    @Override
+    public Set<String> propertyNames() {
+        return PROPERTY_NAMES;
+    }
+
+    @Override
+    public Set<String> secretPropertyNames() {
+        return SECRET_PROPERTY_NAMES;
+    }
+
+    @Override
+    public Set<String> multiValuePrefixes() {
+        return MULTI_VALUE_PREFIXES;
+    }
+
+    @Override
+    public boolean isLenientProperties() {
+        return false;
+    }
+}
+
diff --git 
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component.properties
 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component.properties
new file mode 100644
index 00000000000..3e47499498b
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+components=rocketmq
+groupId=org.apache.camel
+artifactId=camel-rocketmq
+version=3.20.0-SNAPSHOT
+projectName=Camel :: RocketMQ
+projectDescription=Camel RocketMQ Component
diff --git 
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component/rocketmq
 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component/rocketmq
new file mode 100644
index 00000000000..6333b860582
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/component/rocketmq
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQComponent
diff --git 
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-component
 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-component
new file mode 100644
index 00000000000..418db6c2d00
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-component
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQComponentConfigurer
diff --git 
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-endpoint
 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-endpoint
new file mode 100644
index 00000000000..bbcf03ee7e6
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/configurer/rocketmq-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQEndpointConfigurer
diff --git 
a/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/urifactory/rocketmq-endpoint
 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/urifactory/rocketmq-endpoint
new file mode 100644
index 00000000000..4fbd39b3303
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/resources/META-INF/services/org/apache/camel/urifactory/rocketmq-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.rocketmq.RocketMQEndpointUriFactory
diff --git 
a/components/camel-rocketmq/src/generated/resources/org/apache/camel/component/rocketmq/rocketmq.json
 
b/components/camel-rocketmq/src/generated/resources/org/apache/camel/component/rocketmq/rocketmq.json
new file mode 100644
index 00000000000..ed2629825c3
--- /dev/null
+++ 
b/components/camel-rocketmq/src/generated/resources/org/apache/camel/component/rocketmq/rocketmq.json
@@ -0,0 +1,82 @@
+{
+  "component": {
+    "kind": "component",
+    "name": "rocketmq",
+    "title": "RocketMQ",
+    "description": "Send and receive messages from RocketMQ cluster.",
+    "deprecated": false,
+    "firstVersion": "3.20.0",
+    "label": "messaging",
+    "javaType": "org.apache.camel.component.rocketmq.RocketMQComponent",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-rocketmq",
+    "version": "3.20.0-SNAPSHOT",
+    "scheme": "rocketmq",
+    "extendsScheme": "",
+    "syntax": "rocketmq:topicName",
+    "async": true,
+    "api": false,
+    "consumerOnly": false,
+    "producerOnly": false,
+    "lenientProperties": false
+  },
+  "componentProperties": {
+    "namesrvAddr": { "kind": "property", "displayName": "Namesrv Addr", 
"group": "common", "label": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "localhost:9876", "description": "Name server 
address of RocketMQ cluster." },
+    "sendTag": { "kind": "property", "displayName": "Send Tag", "group": 
"common", "label": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Each message would be sent with this tag." },
+    "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
+    "consumerGroup": { "kind": "property", "displayName": "Consumer Group", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Consumer group name." },
+    "subscribeTags": { "kind": "property", "displayName": "Subscribe Tags", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "*", "description": "Subscribe tags of 
consumer. Multiple tags could be split by , such as TagATagB" },
+    "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start 
Producer", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether the producer 
should be started lazy (on the first message). By starting lazy you can use 
this to allow CamelContext and routes to startup in situations where a producer 
may otherwise fail during star [...]
+    "producerGroup": { "kind": "property", "displayName": "Producer Group", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Producer group name." },
+    "replyToConsumerGroup": { "kind": "property", "displayName": "Reply To 
Consumer Group", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Consumer group name used 
for receiving response." },
+    "replyToTopic": { "kind": "property", "displayName": "Reply To Topic", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Topic used for receiving response when using 
in-out pattern." },
+    "waitForSendResult": { "kind": "property", "displayName": "Wait For Send 
Result", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether waiting for 
send result before routing to next endpoint." },
+    "autowiredEnabled": { "kind": "property", "displayName": "Autowired 
Enabled", "group": "advanced", "label": "advanced", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "description": "Whether autowiring is 
enabled. This is used for automatic autowiring options (the option must be 
marked as autowired) by looking up in the registry to find if there is a single 
instance of matching type, which t [...]
+    "requestTimeoutCheckerIntervalMillis": { "kind": "property", 
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced", 
"label": "advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, 
"description": "Check interval milliseconds of request timeout." },
+    "requestTimeoutMillis": { "kind": "property", "displayName": "Request 
Timeout Millis", "group": "advanced", "label": "advanced", "required": false, 
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10000, "description": "Timeout milliseconds of 
receiving response when using in-out pattern." },
+    "accessKey": { "kind": "property", "displayName": "Access Key", "group": 
"secret", "label": "secret", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": true, 
"description": "Access key for RocketMQ ACL." },
+    "secretKey": { "kind": "property", "displayName": "Secret Key", "group": 
"secret", "label": "secret", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": true, 
"description": "Secret key for RocketMQ ACL." }
+  },
+  "headers": {
+    "CamelRockerMQTopic": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Topic of message", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#TOPIC" },
+    "CamelRockerMQTag": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Tag of message", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#TAG" },
+    "CamelRockerMQKey": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Key of message", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#KEY" },
+    "CamelRockerMQOverrideTopicName": { "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "If this header is set, the message will be 
routed to the topic specified by this header instead of the origin topic in 
endpoint.", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#OVERRIDE_TOPIC_NAME" },
+    "CamelRockerMQOverrideTag": { "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "If this header is set, the message's tag will 
be set to value specified by this header instead of the sendTag defined in 
endpoint.", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#OVERRIDE_TAG" },
+    "CamelRockerMQOverrideMessageKey": { "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Set keys for the message. When using in-out 
pattern, the value will be prepended to the generated keys", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#OVERRIDE_MESSAGE_KEY" },
+    "CamelRockerMQBrokerName": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Broker name", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#BROKER_NAME" },
+    "CamelRockerMQQueueId": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "int", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Queue ID", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#QUEUE_ID" },
+    "CamelRockerMQStoreSize": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "int", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Store size", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#STORE_SIZE" },
+    "CamelRockerMQQueueOffset": { "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Queue offset", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#QUEUE_OFFSET" },
+    "CamelRockerMQSysFlag": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "int", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Sys flag", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#SYS_FLAG" },
+    "CamelRockerMQBornTimestamp": { "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Born timestamp", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#BORN_TIMESTAMP" },
+    "CamelRockerMQBornHost": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": 
"java.net.SocketAddress", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "Born host", 
"constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#BORN_HOST" },
+    "CamelRockerMQStoreTimestamp": { "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Store timestamp", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#STORE_TIMESTAMP" },
+    "CamelRockerMQStoreHost": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": 
"java.net.SocketAddress", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "Store host", 
"constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#STORE_HOST" },
+    "CamelRockerMQMsgId": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Msg ID", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#MSG_ID" },
+    "CamelRockerMQCommitLogOffset": { "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"long", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Commit log offset", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#COMMIT_LOG_OFFSET" },
+    "CamelRockerMQBodyCrc": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "int", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Body CRC", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#BODY_CRC" },
+    "CamelRockerMQReconsumeTimes": { "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": "int", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Reconsume times", "constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#RECONSUME_TIMES" },
+    "CamelRockerMQPreparedTransactionOffset": { "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Prepard transaction offset", 
"constantName": 
"org.apache.camel.component.rocketmq.RocketMQConstants#PREPARED_TRANSACTION_OFFSET"
 }
+  },
+  "properties": {
+    "topicName": { "kind": "path", "displayName": "Topic Name", "group": 
"common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Topic name of this endpoint." },
+    "namesrvAddr": { "kind": "parameter", "displayName": "Namesrv Addr", 
"group": "common", "label": "common", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "localhost:9876", "description": "Name server 
address of RocketMQ cluster." },
+    "consumerGroup": { "kind": "parameter", "displayName": "Consumer Group", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Consumer group name." },
+    "subscribeTags": { "kind": "parameter", "displayName": "Subscribe Tags", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "*", "description": "Subscribe tags of 
consumer. Multiple tags could be split by , such as TagATagB" },
+    "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error 
Handler", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"description": "Allows for bridging the consumer to the Camel routing Error 
Handler, which mean any exceptions occurred while the consumer is trying to 
pickup incoming messages, or the likes, will now [...]
+    "exceptionHandler": { "kind": "parameter", "displayName": "Exception 
Handler", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By default the 
con [...]
+    "exchangePattern": { "kind": "parameter", "displayName": "Exchange 
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", 
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the exchange pattern when the consumer creates an 
exchange." },
+    "producerGroup": { "kind": "parameter", "displayName": "Producer Group", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Producer group name." },
+    "replyToConsumerGroup": { "kind": "parameter", "displayName": "Reply To 
Consumer Group", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Consumer group name used 
for receiving response." },
+    "replyToTopic": { "kind": "parameter", "displayName": "Reply To Topic", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Topic used for receiving response when using 
in-out pattern." },
+    "sendTag": { "kind": "parameter", "displayName": "Send Tag", "group": 
"producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Each message would be sent with this tag." },
+    "waitForSendResult": { "kind": "parameter", "displayName": "Wait For Send 
Result", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether waiting for 
send result before routing to next endpoint." },
+    "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start 
Producer", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"description": "Whether the producer should be started lazy (on the first 
message). By starting lazy you can use this to allow CamelContext and routes to 
startup in situations where a producer may other [...]
+    "requestTimeoutCheckerIntervalMillis": { "kind": "parameter", 
"displayName": "Request Timeout Checker Interval Millis", "group": "advanced", 
"label": "advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, 
"description": "Check interval milliseconds of request timeout." },
+    "requestTimeoutMillis": { "kind": "parameter", "displayName": "Request 
Timeout Millis", "group": "advanced", "label": "advanced", "required": false, 
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10000, "description": "Timeout milliseconds of 
receiving response when using in-out pattern." },
+    "accessKey": { "kind": "parameter", "displayName": "Access Key", "group": 
"security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Access key for RocketMQ ACL." },
+    "secretKey": { "kind": "parameter", "displayName": "Secret Key", "group": 
"security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Secret key for RocketMQ ACL." }
+  }
+}
diff --git a/components/camel-rocketmq/src/main/docs/rocketmq-component.adoc 
b/components/camel-rocketmq/src/main/docs/rocketmq-component.adoc
new file mode 100644
index 00000000000..4822aa4b8ee
--- /dev/null
+++ b/components/camel-rocketmq/src/main/docs/rocketmq-component.adoc
@@ -0,0 +1,112 @@
+= RocketMQ Component
+:doctitle: RocketMQ
+:shortname: rocketmq
+:artifactid: camel-rocketmq
+:description: Send and receive messages from RocketMQ cluster.
+:since: 3.20
+:supportlevel: Preview
+:component-header: Both producer and consumer are supported
+
+*Since Camel {since}*
+
+*{component-header}*
+
+The RocketMQ component allows you to produce and consume messages from
+https://rocketmq.apache.org/[RocketMQ] instances.
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,xml]
+----
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-rocketmq</artifactId>
+    <version>x.x.x</version>
+    <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+[NOTE]
+====
+Since RocketMQ 5.x API is compatible with 4.x, this component works with both 
RocketMQ 4.x and 5.x.
+Users could change RocketMQ dependencies on their own.
+====
+
+== URI format
+
+----
+rocketmq:topicName?[options]
+----
+
+The topic name determines the topic to which the produced 
+messages will be sent to. In the case of consumers, the topic name 
+determines the topic will be subscribed.
+This component uses RocketMQ push consumer by default.
+
+// component-configure options: START
+
+// component-configure options: END
+
+// component options: START
+include::partial$component-configure-options.adoc[]
+include::partial$component-endpoint-options.adoc[]
+// component options: END
+
+// endpoint options: START
+
+// endpoint options: END
+
+// component headers: START
+include::partial$component-endpoint-headers.adoc[]
+// component headers: END
+
+== InOut Pattern
+
+InOut Pattern based on Message Key. When the producer sending the message, a 
messageKey will be generated and append to the message's key.
+
+After the message sent, a consumer will listen to the topic configured by the 
parameter `ReplyToTopic`.
+
+When a message from `ReplyToTpic` contains the key, it means that the reply 
received and continue routing.
+
+If `requestTimeoutMillis` elapsed and no reply received, an exception will be 
thrown.
+
+[source,java]
+----
+from("rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1")
+
+.to(ExchangePattern.InOut, "rocketmq:INTERMEDIATE_TOPIC" +
+        "?producerGroup=intermediaProducer" +
+        "&consumerGroup=intermediateConsumer" +
+        "&replyToTopic=REPLY_TO_TOPIC" +
+        "&replyToConsumerGroup=replyToConsumerGroup" +
+        "&requestTimeoutMillis=30000")
+
+.to("log:InOutRoute?showAll=true")
+----
+
+== Examples
+
+Receive messages from a topic named `from_topic`, route to `to_topic`.
+
+[source,java]
+----
+from("rocketmq:FROM_TOPIC?namesrvAddr=localhost:9876&consumerGroup=consumer")
+    .to("rocketmq:TO_TOPIC?namesrvAddr=localhost:9876&producerGroup=producer");
+----
+
+Setting specific headers can change routing behaviour. For example, if header 
`RocketMQConstants.OVERRIDE_TOPIC_NAME` was set,
+the message will be sent to `ACTUAL_TARGET` instead of `ORIGIN_TARGET`.
+
+[source,java]
+----
+from("rocketmq:FROM?consumerGroup=consumer")
+        .process(exchange -> {
+            
exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME, 
"ACTUAL_TARGET");
+            exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_TAG, 
"OVERRIDE_TAG");
+            
exchange.getMessage().setHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY, 
"OVERRIDE_MESSAGE_KEY");
+        }
+)
+.to("rocketmq:ORIGIN_TARGET?producerGroup=producer")
+.to("log:RocketRoute?showAll=true")
+----
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQAclUtils.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQAclUtils.java
new file mode 100644
index 00000000000..453c04e84e7
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQAclUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public final class RocketMQAclUtils {
+
+    private RocketMQAclUtils() {
+    }
+
+    public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+        if (StringUtils.isNotBlank(accessKey) && 
StringUtils.isNotBlank(secretKey)) {
+            return new AclClientRPCHook(new SessionCredentials(accessKey, 
secretKey));
+        }
+        return null;
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
new file mode 100644
index 00000000000..08063c8a152
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+@Component("rocketmq")
+public class RocketMQComponent extends DefaultComponent {
+
+    @Metadata(label = "producer")
+    private String producerGroup;
+
+    @Metadata(label = "consumer")
+    private String consumerGroup;
+
+    @Metadata(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+
+    @Metadata(label = "common")
+    private String sendTag = "";
+
+    @Metadata(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+
+    @Metadata(label = "producer")
+    private String replyToTopic;
+
+    @Metadata(label = "producer")
+    private String replyToConsumerGroup;
+
+    @Metadata(label = "advanced", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+
+    @Metadata(label = "advanced", defaultValue = "1000")
+    private long requestTimeoutCheckerIntervalMillis = 1000L;
+
+    @Metadata(label = "producer", defaultValue = "false")
+    private boolean waitForSendResult;
+
+    @Metadata(label = "secret", secret = true)
+    private String accessKey;
+
+    @Metadata(label = "secret", secret = true)
+    private String secretKey;
+
+    @Override
+    protected RocketMQEndpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        RocketMQEndpoint endpoint = new RocketMQEndpoint(uri, this);
+        endpoint.setProducerGroup(getProducerGroup());
+        endpoint.setConsumerGroup(getConsumerGroup());
+        endpoint.setSubscribeTags(getSubscribeTags());
+        endpoint.setNamesrvAddr(getNamesrvAddr());
+        endpoint.setSendTag(getSendTag());
+        endpoint.setReplyToTopic(getReplyToTopic());
+        endpoint.setReplyToConsumerGroup(getReplyToConsumerGroup());
+        endpoint.setRequestTimeoutMillis(getRequestTimeoutMillis());
+        
endpoint.setRequestTimeoutCheckerIntervalMillis(getRequestTimeoutCheckerIntervalMillis());
+        endpoint.setWaitForSendResult(isWaitForSendResult());
+        endpoint.setAccessKey(getAccessKey());
+        endpoint.setSecretKey(getSecretKey());
+        setProperties(endpoint, parameters);
+        endpoint.setTopicName(remaining);
+        return endpoint;
+    }
+
+    public String getSubscribeTags() {
+        return subscribeTags;
+    }
+
+    /**
+     * Subscribe tags of consumer. Multiple tags could be split by "||", such 
as "TagA||TagB"
+     */
+    public void setSubscribeTags(String subscribeTags) {
+        this.subscribeTags = subscribeTags;
+    }
+
+    public String getSendTag() {
+        return sendTag;
+    }
+
+    /**
+     * Each message would be sent with this tag.
+     */
+    public void setSendTag(String sendTag) {
+        this.sendTag = sendTag;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    /**
+     * Name server address of RocketMQ cluster.
+     */
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+    /**
+     * Producer group name.
+     */
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    /**
+     * Consumer group name.
+     */
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getReplyToTopic() {
+        return replyToTopic;
+    }
+
+    /**
+     * Topic used for receiving response when using in-out pattern.
+     */
+    public void setReplyToTopic(String replyToTopic) {
+        this.replyToTopic = replyToTopic;
+    }
+
+    public String getReplyToConsumerGroup() {
+        return replyToConsumerGroup;
+    }
+
+    /**
+     * Consumer group name used for receiving response.
+     */
+    public void setReplyToConsumerGroup(String replyToConsumerGroup) {
+        this.replyToConsumerGroup = replyToConsumerGroup;
+    }
+
+    public long getRequestTimeoutMillis() {
+        return requestTimeoutMillis;
+    }
+
+    /**
+     * Timeout milliseconds of receiving response when using in-out pattern.
+     */
+    public void setRequestTimeoutMillis(long requestTimeoutMillis) {
+        this.requestTimeoutMillis = requestTimeoutMillis;
+    }
+
+    public long getRequestTimeoutCheckerIntervalMillis() {
+        return requestTimeoutCheckerIntervalMillis;
+    }
+
+    /**
+     * Check interval milliseconds of request timeout.
+     */
+    public void setRequestTimeoutCheckerIntervalMillis(long 
requestTimeoutCheckerIntervalMillis) {
+        this.requestTimeoutCheckerIntervalMillis = 
requestTimeoutCheckerIntervalMillis;
+    }
+
+    public boolean isWaitForSendResult() {
+        return waitForSendResult;
+    }
+
+    /**
+     * Whether waiting for send result before routing to next endpoint.
+     */
+    public void setWaitForSendResult(boolean waitForSendResult) {
+        this.waitForSendResult = waitForSendResult;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * Access key for RocketMQ ACL.
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * Secret key for RocketMQ ACL.
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java
new file mode 100644
index 00000000000..6c0ab3140b4
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.spi.Metadata;
+
+public final class RocketMQConstants {
+
+    @Metadata(label = "consumer", description = "Topic of message", javaType = 
"String")
+    public static final String TOPIC = "CamelRockerMQTopic";
+    @Metadata(label = "consumer", description = "Tag of message", javaType = 
"String")
+    public static final String TAG = "CamelRockerMQTag";
+    @Metadata(label = "consumer", description = "Key of message", javaType = 
"String")
+    public static final String KEY = "CamelRockerMQKey";
+    @Metadata(label = "producer",
+              description = "If this header is set, the message will be routed 
to the topic specified by this header\n" +
+                            "instead of the origin topic in endpoint.",
+              javaType = "String")
+    public static final String OVERRIDE_TOPIC_NAME = 
"CamelRockerMQOverrideTopicName";
+    @Metadata(label = "producer",
+              description = "If this header is set, the message's tag will be 
set to value specified by this header\n" +
+                            "instead of the sendTag defined in endpoint.",
+              javaType = "String")
+    public static final String OVERRIDE_TAG = "CamelRockerMQOverrideTag";
+    @Metadata(label = "producer", description = "Set keys for the message. 
When using in-out pattern,\n" +
+                                                "the value will be prepended 
to the generated keys",
+              javaType = "String")
+    public static final String OVERRIDE_MESSAGE_KEY = 
"CamelRockerMQOverrideMessageKey";
+    @Metadata(label = "consumer", description = "Broker name", javaType = 
"String")
+    public static final String BROKER_NAME = "CamelRockerMQBrokerName";
+    @Metadata(label = "consumer", description = "Queue ID", javaType = "int")
+    public static final String QUEUE_ID = "CamelRockerMQQueueId";
+    @Metadata(label = "consumer", description = "Store size", javaType = "int")
+    public static final String STORE_SIZE = "CamelRockerMQStoreSize";
+    @Metadata(label = "consumer", description = "Queue offset", javaType = 
"long")
+    public static final String QUEUE_OFFSET = "CamelRockerMQQueueOffset";
+    @Metadata(label = "consumer", description = "Sys flag", javaType = "int")
+    public static final String SYS_FLAG = "CamelRockerMQSysFlag";
+    @Metadata(label = "consumer", description = "Born timestamp", javaType = 
"long")
+    public static final String BORN_TIMESTAMP = "CamelRockerMQBornTimestamp";
+    @Metadata(label = "consumer", description = "Born host", javaType = 
"java.net.SocketAddress")
+    public static final String BORN_HOST = "CamelRockerMQBornHost";
+    @Metadata(label = "consumer", description = "Store timestamp", javaType = 
"long")
+    public static final String STORE_TIMESTAMP = "CamelRockerMQStoreTimestamp";
+    @Metadata(label = "consumer", description = "Store host", javaType = 
"java.net.SocketAddress")
+    public static final String STORE_HOST = "CamelRockerMQStoreHost";
+    @Metadata(label = "consumer", description = "Msg ID", javaType = "String")
+    public static final String MSG_ID = "CamelRockerMQMsgId";
+    @Metadata(label = "consumer", description = "Commit log offset", javaType 
= "long")
+    public static final String COMMIT_LOG_OFFSET = 
"CamelRockerMQCommitLogOffset";
+    @Metadata(label = "consumer", description = "Body CRC", javaType = "int")
+    public static final String BODY_CRC = "CamelRockerMQBodyCrc";
+    @Metadata(label = "consumer", description = "Reconsume times", javaType = 
"int")
+    public static final String RECONSUME_TIMES = "CamelRockerMQReconsumeTimes";
+    @Metadata(label = "consumer", description = "Prepard transaction offset", 
javaType = "long")
+    public static final String PREPARED_TRANSACTION_OFFSET = 
"CamelRockerMQPreparedTransactionOffset";
+
+    private RocketMQConstants() {
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
new file mode 100644
index 00000000000..6cfa996b861
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Suspendable;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class RocketMQConsumer extends DefaultConsumer implements Suspendable {
+
+    private final RocketMQEndpoint endpoint;
+
+    private DefaultMQPushConsumer mqPushConsumer;
+
+    public RocketMQConsumer(RocketMQEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    private void startConsumer() throws MQClientException {
+        mqPushConsumer = new DefaultMQPushConsumer(
+                null, endpoint.getConsumerGroup(),
+                RocketMQAclUtils.getAclRPCHook(getEndpoint().getAccessKey(), 
getEndpoint().getSecretKey()));
+        mqPushConsumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+        mqPushConsumer.subscribe(endpoint.getTopicName(), 
endpoint.getSubscribeTags());
+        mqPushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            MessageExt messageExt = msgs.get(0);
+            Exchange exchange = 
endpoint.createRocketExchange(messageExt.getBody());
+            
RocketMQMessageConverter.populateHeadersByMessageExt(exchange.getIn(), 
messageExt);
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                getExceptionHandler().handleException(e);
+                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        mqPushConsumer.start();
+    }
+
+    private void stopConsumer() {
+        if (mqPushConsumer != null) {
+            mqPushConsumer.shutdown();
+            mqPushConsumer = null;
+        }
+    }
+
+    @Override
+    public RocketMQEndpoint getEndpoint() {
+        return (RocketMQEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doSuspend() {
+        stopConsumer();
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        startConsumer();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        startConsumer();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        stopConsumer();
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
new file mode 100644
index 00000000000..a77ec99f018
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.AsyncEndpoint;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultMessage;
+
+/**
+ * Send and receive messages from <a 
href="https://rocketmq.apache.org/";>RocketMQ</a> cluster.
+ */
+@UriEndpoint(firstVersion = "3.20.0", scheme = "rocketmq", syntax = 
"rocketmq:topicName", title = "RocketMQ",
+             category = Category.MESSAGING, headersClass = 
RocketMQConstants.class)
+public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint 
{
+
+    @UriPath
+    @Metadata(required = true)
+    private String topicName;
+    @UriParam(label = "producer")
+    private String producerGroup;
+    @UriParam(label = "consumer")
+    private String consumerGroup;
+    @UriParam(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+    @UriParam(label = "producer")
+    private String sendTag = "";
+    @UriParam(label = "producer")
+    private String replyToTopic;
+    @UriParam(label = "producer")
+    private String replyToConsumerGroup;
+    @UriParam(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+    @UriParam(label = "advanced", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+    @UriParam(label = "advanced", defaultValue = "1000")
+    private long requestTimeoutCheckerIntervalMillis = 1000L;
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean waitForSendResult;
+    @UriParam(label = "security", secret = true)
+    private String accessKey;
+    @UriParam(label = "security", secret = true)
+    private String secretKey;
+
+    public RocketMQEndpoint() {
+    }
+
+    public RocketMQEndpoint(String endpointUri, RocketMQComponent component) {
+        super(endpointUri, component);
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new RocketMQProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        RocketMQConsumer consumer = new RocketMQConsumer(this, processor);
+        configureConsumer(consumer);
+        return consumer;
+    }
+
+    public Exchange createRocketExchange(byte[] body) {
+        Exchange exchange = super.createExchange();
+        DefaultMessage message = new DefaultMessage(exchange.getContext());
+        message.setBody(body);
+        exchange.setIn(message);
+        return exchange;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    /**
+     * Topic name of this endpoint.
+     */
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    public String getSubscribeTags() {
+        return subscribeTags;
+    }
+
+    /**
+     * Subscribe tags of consumer. Multiple tags could be split by "||", such 
as "TagA||TagB"
+     */
+    public void setSubscribeTags(String subscribeTags) {
+        this.subscribeTags = subscribeTags;
+    }
+
+    public String getSendTag() {
+        return sendTag;
+    }
+
+    /**
+     * Each message would be sent with this tag.
+     */
+    public void setSendTag(String sendTag) {
+        this.sendTag = sendTag;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    /**
+     * Name server address of RocketMQ cluster.
+     */
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+    /**
+     * Producer group name.
+     */
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    /**
+     * Consumer group name.
+     */
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getReplyToTopic() {
+        return replyToTopic;
+    }
+
+    /**
+     * Topic used for receiving response when using in-out pattern.
+     */
+    public void setReplyToTopic(String replyToTopic) {
+        this.replyToTopic = replyToTopic;
+    }
+
+    public String getReplyToConsumerGroup() {
+        return replyToConsumerGroup;
+    }
+
+    /**
+     * Consumer group name used for receiving response.
+     */
+    public void setReplyToConsumerGroup(String replyToConsumerGroup) {
+        this.replyToConsumerGroup = replyToConsumerGroup;
+    }
+
+    public long getRequestTimeoutMillis() {
+        return requestTimeoutMillis;
+    }
+
+    /**
+     * Timeout milliseconds of receiving response when using in-out pattern.
+     */
+    public void setRequestTimeoutMillis(long requestTimeoutMillis) {
+        this.requestTimeoutMillis = requestTimeoutMillis;
+    }
+
+    public long getRequestTimeoutCheckerIntervalMillis() {
+        return requestTimeoutCheckerIntervalMillis;
+    }
+
+    /**
+     * Check interval milliseconds of request timeout.
+     */
+    public void setRequestTimeoutCheckerIntervalMillis(long 
requestTimeoutCheckerIntervalMillis) {
+        this.requestTimeoutCheckerIntervalMillis = 
requestTimeoutCheckerIntervalMillis;
+    }
+
+    public boolean isWaitForSendResult() {
+        return waitForSendResult;
+    }
+
+    /**
+     * Whether waiting for send result before routing to next endpoint.
+     */
+    public void setWaitForSendResult(boolean waitForSendResult) {
+        this.waitForSendResult = waitForSendResult;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * Access key for RocketMQ ACL.
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * Secret key for RocketMQ ACL.
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQMessageConverter.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQMessageConverter.java
new file mode 100644
index 00000000000..cb8e4a0e119
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQMessageConverter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public final class RocketMQMessageConverter {
+
+    private RocketMQMessageConverter() {
+    }
+
+    public static void populateHeadersByMessageExt(final Message message, 
final MessageExt messageExt) {
+        message.setHeader(RocketMQConstants.TOPIC, messageExt.getTopic());
+        message.setHeader(RocketMQConstants.TAG, messageExt.getTags());
+        message.setHeader(RocketMQConstants.KEY, messageExt.getKeys());
+        message.setHeader(RocketMQConstants.BROKER_NAME, 
messageExt.getBrokerName());
+        message.setHeader(RocketMQConstants.QUEUE_ID, messageExt.getQueueId());
+        message.setHeader(RocketMQConstants.STORE_SIZE, 
messageExt.getStoreSize());
+        message.setHeader(RocketMQConstants.QUEUE_OFFSET, 
messageExt.getQueueOffset());
+        message.setHeader(RocketMQConstants.SYS_FLAG, messageExt.getSysFlag());
+        message.setHeader(RocketMQConstants.BORN_TIMESTAMP, 
messageExt.getBornTimestamp());
+        message.setHeader(RocketMQConstants.BORN_HOST, 
messageExt.getBornHost());
+        message.setHeader(RocketMQConstants.STORE_TIMESTAMP, 
messageExt.getStoreTimestamp());
+        message.setHeader(RocketMQConstants.STORE_HOST, 
messageExt.getStoreHost());
+        message.setHeader(RocketMQConstants.MSG_ID, messageExt.getMsgId());
+        message.setHeader(RocketMQConstants.COMMIT_LOG_OFFSET, 
messageExt.getCommitLogOffset());
+        message.setHeader(RocketMQConstants.BODY_CRC, messageExt.getBodyCRC());
+        message.setHeader(RocketMQConstants.RECONSUME_TIMES, 
messageExt.getReconsumeTimes());
+        message.setHeader(RocketMQConstants.PREPARED_TRANSACTION_OFFSET, 
messageExt.getPreparedTransactionOffset());
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
new file mode 100644
index 00000000000..2eaa76ffc1e
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.rocketmq.reply.ReplyManager;
+import org.apache.camel.component.rocketmq.reply.RocketMQReplyManagerSupport;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQProducer extends DefaultAsyncProducer {
+
+    public static final String GENERATE_MESSAGE_KEY_PREFIX = "camel-rocketmq-";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQProducer.class);
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private DefaultMQProducer mqProducer;
+
+    private ReplyManager replyManager;
+
+    public RocketMQProducer(RocketMQEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public RocketMQEndpoint getEndpoint() {
+        return (RocketMQEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(true);
+            return true;
+        }
+        try {
+            LOG.trace("Exchange Pattern {}", exchange.getPattern());
+            if (exchange.getPattern().isOutCapable()) {
+                return processInOut(exchange, callback);
+            } else {
+                return processInOnly(exchange, callback);
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    protected boolean processInOut(final Exchange exchange, final 
AsyncCallback callback)
+            throws RemotingException, MQClientException, InterruptedException, 
NoTypeConversionAvailableException {
+        org.apache.camel.Message in = exchange.getIn();
+        Message message = new Message();
+        message.setTopic(in.getHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME, 
() -> getEndpoint().getTopicName(), String.class));
+        message.setTags(in.getHeader(RocketMQConstants.OVERRIDE_TAG, () -> 
getEndpoint().getSendTag(), String.class));
+        
message.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class,
 exchange, in.getBody()));
+        message.setKeys(in.getHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY, 
"", String.class));
+        initReplyManager();
+        String generateKey = GENERATE_MESSAGE_KEY_PREFIX + 
getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
+        
message.setKeys(Arrays.asList(Optional.ofNullable(message.getKeys()).orElse(""),
 generateKey));
+        LOG.debug("RocketMQ Producer sending {}", message);
+        mqProducer.send(message, new SendCallback() {
+
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+                    exchange.setException(new 
SendFailedException(sendResult.toString()));
+                    callback.done(false);
+                    return;
+                }
+                if (replyManager == null) {
+                    LOG.warn("replyToTopic not set! Will not wait for reply.");
+                    callback.done(false);
+                    return;
+                }
+                replyManager.registerReply(replyManager, exchange, callback, 
generateKey,
+                        getEndpoint().getRequestTimeoutMillis());
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                try {
+                    replyManager.cancelMessageKey(generateKey);
+                    exchange.setException(e);
+                } finally {
+                    callback.done(false);
+                }
+            }
+        });
+        return false;
+    }
+
+    protected void initReplyManager() {
+        if (!started.get()) {
+            synchronized (this) {
+                if (started.get()) {
+                    return;
+                }
+                LOG.debug("Starting reply manager");
+                ClassLoader current = 
Thread.currentThread().getContextClassLoader();
+                ClassLoader ac = 
getEndpoint().getCamelContext().getApplicationContextClassLoader();
+                try {
+                    if (ac != null) {
+                        Thread.currentThread().setContextClassLoader(ac);
+                    }
+                    if (getEndpoint().getReplyToTopic() != null) {
+                        replyManager = createReplyManager();
+                        LOG.debug("Using RocketMQReplyManager: {} to process 
replies from topic {}", replyManager,
+                                getEndpoint().getReplyToTopic());
+                    }
+                } catch (Exception e) {
+                    throw new FailedToCreateProducerException(getEndpoint(), 
e);
+                } finally {
+                    if (ac != null) {
+                        Thread.currentThread().setContextClassLoader(current);
+                    }
+                }
+                started.set(true);
+            }
+        }
+    }
+
+    protected void unInitReplyManager() {
+        try {
+            if (replyManager != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Stopping RocketMQReplyManager: {} from 
processing replies from : {}", replyManager,
+                            getEndpoint().getReplyToTopic());
+                }
+                ServiceHelper.stopService(replyManager);
+            }
+        } catch (Exception e) {
+            throw RuntimeCamelException.wrapRuntimeCamelException(e);
+        } finally {
+            started.set(false);
+        }
+    }
+
+    private ReplyManager createReplyManager() {
+        RocketMQReplyManagerSupport replyManager = new 
RocketMQReplyManagerSupport(getEndpoint().getCamelContext());
+        replyManager.setEndpoint(getEndpoint());
+        String name = "RocketMQReplyManagerTimeoutChecker[" + 
getEndpoint().getTopicName() + "]";
+        ScheduledExecutorService scheduledExecutorService
+                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+        replyManager.setScheduledExecutorService(scheduledExecutorService);
+        LOG.debug("Starting ReplyManager: {}", name);
+        ServiceHelper.startService(replyManager);
+        return replyManager;
+    }
+
+    protected boolean processInOnly(Exchange exchange, AsyncCallback callback)
+            throws NoTypeConversionAvailableException, InterruptedException, 
RemotingException, MQClientException {
+        org.apache.camel.Message in = exchange.getIn();
+        Message message = new Message();
+        message.setTopic(in.getHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME, 
() -> getEndpoint().getTopicName(), String.class));
+        message.setTags(in.getHeader(RocketMQConstants.OVERRIDE_TAG, () -> 
getEndpoint().getSendTag(), String.class));
+        
message.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class,
 exchange, in.getBody()));
+        message.setKeys(in.getHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY, 
"", String.class));
+        LOG.debug("RocketMQ Producer sending {}", message);
+        boolean waitForSendResult = getEndpoint().isWaitForSendResult();
+        mqProducer.send(message, new SendCallback() {
+
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+                    exchange.setException(new 
SendFailedException(sendResult.toString()));
+                }
+                callback.done(!waitForSendResult);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                exchange.setException(e);
+                callback.done(!waitForSendResult);
+            }
+        });
+        // return false to wait send callback
+        return !waitForSendResult;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        this.mqProducer = new DefaultMQProducer(
+                null, getEndpoint().getProducerGroup(),
+                RocketMQAclUtils.getAclRPCHook(getEndpoint().getAccessKey(), 
getEndpoint().getSecretKey()));
+        this.mqProducer.setNamesrvAddr(getEndpoint().getNamesrvAddr());
+        this.mqProducer.start();
+    }
+
+    @Override
+    protected void doStop() {
+        unInitReplyManager();
+        this.mqProducer.shutdown();
+        this.mqProducer = null;
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/SendFailedException.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/SendFailedException.java
new file mode 100644
index 00000000000..12dd22ad5af
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/SendFailedException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+public class SendFailedException extends RuntimeException {
+
+    public SendFailedException(String message) {
+        super(message);
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHandler.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHandler.java
new file mode 100644
index 00000000000..7a3eae37f2e
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHandler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+public interface ReplyHandler {
+
+    void onReply(String messageKey, MessageExt messageExt);
+
+    void onTimeout(String messageKey);
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHolder.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHolder.java
new file mode 100644
index 00000000000..08728869960
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyHolder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class ReplyHolder {
+
+    private final Exchange exchange;
+    private final AsyncCallback callback;
+    private final String messageKey;
+    private final MessageExt messageExt;
+    private long timeout;
+
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String 
messageKey, MessageExt messageExt) {
+        this.exchange = exchange;
+        this.callback = callback;
+        this.messageExt = messageExt;
+        this.messageKey = messageKey;
+    }
+
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String 
messageKey, long timeout) {
+        this(exchange, callback, messageKey, null);
+        this.timeout = timeout;
+    }
+
+    public boolean isTimeout() {
+        return messageExt == null;
+    }
+
+    public Exchange getExchange() {
+        return exchange;
+    }
+
+    public AsyncCallback getCallback() {
+        return callback;
+    }
+
+    public MessageExt getMessageExt() {
+        return messageExt;
+    }
+
+    public String getMessageKey() {
+        return messageKey;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    @Override
+    public String toString() {
+        return "ReplyHolder{" +
+               "exchange=" + exchange +
+               ", callback=" + callback +
+               ", messageKey='" + messageKey + '\'' +
+               ", messageExt=" + messageExt +
+               ", timeout=" + timeout +
+               '}';
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyManager.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyManager.java
new file mode 100644
index 00000000000..79153aa20cb
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.rocketmq.RocketMQEndpoint;
+
+public interface ReplyManager {
+
+    void setEndpoint(RocketMQEndpoint endpoint);
+
+    void setReplyToTopic(String replyToTopic);
+
+    String registerReply(
+            ReplyManager replyManager, Exchange exchange, AsyncCallback 
callback, String messageKey, long requestTimeout);
+
+    void setScheduledExecutorService(ScheduledExecutorService executorService);
+
+    void processReply(ReplyHolder holder);
+
+    void cancelMessageKey(String messageKey);
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java
new file mode 100644
index 00000000000..0ddc479d259
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+public class ReplyTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    public ReplyTimeoutMap(ScheduledExecutorService executor, long 
requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+        addListener(this::listener);
+    }
+
+    private static long encode(long timeoutMillis) {
+        return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE;
+    }
+
+    private void listener(Listener.Type type, String key, ReplyHandler 
handler) {
+        switch (type) {
+            case Put:
+                log.trace("Added messageKey: {}", key);
+                break;
+            case Remove:
+                log.trace("Removed messageKey: {}", key);
+                break;
+            case Evict:
+                try {
+                    handler.onTimeout(key);
+                } catch (Throwable e) {
+                    log.warn("Error processing onTimeout for messageKey: {} 
due: {}. " +
+                             "This exception is ignored.",
+                            key, e.getLocalizedMessage(), e);
+                }
+                log.trace("Evicted messageKey: {}", key);
+                break;
+            default:
+        }
+    }
+
+    @Override
+    public ReplyHandler put(String key, ReplyHandler value, long 
timeoutMillis) {
+        return super.put(key, value, encode(timeoutMillis));
+    }
+
+    @Override
+    public ReplyHandler putIfAbsent(String key, ReplyHandler value, long 
timeoutMillis) {
+        return super.putIfAbsent(key, value, encode(timeoutMillis));
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyHandler.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyHandler.java
new file mode 100644
index 00000000000..93a2f48a334
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQReplyHandler implements ReplyHandler {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(RocketMQReplyHandler.class);
+
+    protected final ReplyManager replyManager;
+    protected final Exchange exchange;
+    protected final AsyncCallback callback;
+    protected final String messageKey;
+    protected final long timeout;
+
+    public RocketMQReplyHandler(ReplyManager replyManager, Exchange exchange, 
AsyncCallback callback, String messageKey,
+                                long timeout) {
+        this.replyManager = replyManager;
+        this.exchange = exchange;
+        this.callback = callback;
+        this.messageKey = messageKey;
+        this.timeout = timeout;
+    }
+
+    @Override
+    public void onReply(String messageKey, MessageExt messageExt) {
+        LOG.debug("onReply with messageKey: {}", messageKey);
+        ReplyHolder holder = new ReplyHolder(exchange, callback, messageKey, 
messageExt);
+        replyManager.processReply(holder);
+    }
+
+    @Override
+    public void onTimeout(String messageKey) {
+        LOG.debug("onTimeout with messageKey: {}", messageKey);
+        ReplyHolder holder = new ReplyHolder(exchange, callback, messageKey, 
timeout);
+        replyManager.processReply(holder);
+    }
+}
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.java
new file mode 100644
index 00000000000..ec2b025d5d8
--- /dev/null
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Message;
+import org.apache.camel.component.rocketmq.RocketMQEndpoint;
+import org.apache.camel.component.rocketmq.RocketMQMessageConverter;
+import org.apache.camel.component.rocketmq.RocketMQProducer;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQReplyManagerSupport extends ServiceSupport implements 
ReplyManager {
+
+    private static final int CLOSE_TIMEOUT = 30 * 1000;
+
+    protected final Logger log = 
LoggerFactory.getLogger(RocketMQReplyManagerSupport.class);
+    protected final CamelContext camelContext;
+    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
+    protected ScheduledExecutorService executorService;
+    protected RocketMQEndpoint endpoint;
+    protected String replyToTopic;
+    protected DefaultMQPushConsumer mqPushConsumer;
+    protected ReplyTimeoutMap timeoutMap;
+
+    public RocketMQReplyManagerSupport(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(executorService, "executorService", this);
+        ObjectHelper.notNull(endpoint, "endpoint", this);
+
+        log.debug("Using timeout checker interval with {} millis", 
endpoint.getRequestTimeoutCheckerIntervalMillis());
+        timeoutMap = new ReplyTimeoutMap(executorService, 
endpoint.getRequestTimeoutCheckerIntervalMillis());
+        ServiceHelper.startService(timeoutMap);
+
+        mqPushConsumer = createConsumer();
+        mqPushConsumer.start();
+
+        log.debug("Using executor {}", executorService);
+    }
+
+    protected DefaultMQPushConsumer createConsumer() throws MQClientException {
+        setReplyToTopic(endpoint.getReplyToTopic());
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
+        consumer.setConsumerGroup(endpoint.getReplyToConsumerGroup());
+        consumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+        consumer.subscribe(replyToTopic, "*");
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
context) -> {
+            MessageExt messageExt = msgs.get(0);
+            onMessage(messageExt);
+            log.trace("Consume message {}", messageExt);
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        return consumer;
+    }
+
+    public void onMessage(MessageExt messageExt) {
+        String messageKey = 
Arrays.stream(messageExt.getKeys().split(MessageConst.KEY_SEPARATOR))
+                .filter(s -> 
s.startsWith(RocketMQProducer.GENERATE_MESSAGE_KEY_PREFIX)).findFirst().orElse(null);
+        if (messageKey == null) {
+            log.warn("Ignoring message with no messageKey: {}", messageExt);
+            return;
+        }
+
+        log.debug("Received reply message with messageKey [{}] -> {}", 
messageKey, messageExt);
+        handleReplyMessage(messageKey, messageExt);
+    }
+
+    @Override
+    protected void doStop() {
+        ServiceHelper.stopService(timeoutMap);
+
+        if (mqPushConsumer != null) {
+            log.debug("Closing connection: {} with timeout: {} ms.", 
mqPushConsumer, CLOSE_TIMEOUT);
+            mqPushConsumer.shutdown();
+            mqPushConsumer = null;
+        }
+
+        if (executorService != null) {
+            
camelContext.getExecutorServiceManager().shutdownGraceful(executorService);
+            executorService = null;
+        }
+    }
+
+    @Override
+    public void setEndpoint(RocketMQEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public void setReplyToTopic(String replyToTopic) {
+        log.debug("ReplyToTopic: {}", replyToTopic);
+        this.replyToTopic = replyToTopic;
+        replyToLatch.countDown();
+    }
+
+    @Override
+    public String registerReply(
+            ReplyManager replyManager, Exchange exchange, AsyncCallback 
callback, String messageKey, long requestTimeout) {
+        RocketMQReplyHandler handler = new RocketMQReplyHandler(replyManager, 
exchange, callback, messageKey, requestTimeout);
+        ReplyHandler result = timeoutMap.putIfAbsent(messageKey, handler, 
requestTimeout);
+        if (result != null) {
+            String logMessage = String.format("The messageKey [%s] is not 
unique.", messageKey);
+            throw new IllegalArgumentException(logMessage);
+        }
+        return messageKey;
+    }
+
+    @Override
+    public void setScheduledExecutorService(ScheduledExecutorService 
executorService) {
+        this.executorService = executorService;
+    }
+
+    @Override
+    public void processReply(ReplyHolder holder) {
+        if (!isRunAllowed()) {
+            return;
+        }
+        try {
+            Exchange exchange = holder.getExchange();
+            if (holder.isTimeout()) {
+                if (log.isWarnEnabled()) {
+                    log.warn("Timeout occurred after {} millis waiting for 
reply message with messageKey [{}] on topic {}." +
+                             " Setting ExchangeTimedOutException on {} and 
continue routing.",
+                            holder.getTimeout(), holder.getMessageKey(), 
replyToTopic, ExchangeHelper.logIds(exchange));
+                }
+                String msg = "reply message with messageKey: " + 
holder.getMessageKey() + " not received on topic: "
+                             + replyToTopic;
+                exchange.setException(new ExchangeTimedOutException(exchange, 
holder.getTimeout(), msg));
+            } else {
+                processReceivedReply(holder);
+            }
+        } finally {
+            holder.getCallback().done(false);
+        }
+    }
+
+    private static void processReceivedReply(ReplyHolder holder) {
+        Message message = holder.getExchange().getOut();
+        MessageExt messageExt = holder.getMessageExt();
+        message.setBody(messageExt.getBody());
+        RocketMQMessageConverter.populateHeadersByMessageExt(message, 
messageExt);
+    }
+
+    @Override
+    public void cancelMessageKey(String messageKey) {
+        if (null == timeoutMap.get(messageKey)) {
+            return;
+        }
+        log.warn("Cancelling messageKey: {}", messageKey);
+        timeoutMap.remove(messageKey);
+    }
+
+    protected void handleReplyMessage(String messageKey, MessageExt 
messageExt) {
+        ReplyHandler handler = timeoutMap.get(messageKey);
+        if (handler != null) {
+            timeoutMap.remove(messageKey);
+            handler.onReply(messageKey, messageExt);
+        } else {
+            log.warn("Reply received for unknown messageKey [{}]. The message 
will be ignored: {}", messageKey, messageExt);
+        }
+    }
+
+}
diff --git 
a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
 
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
new file mode 100644
index 00000000000..78f801834ae
--- /dev/null
+++ 
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class RocketMQRequestReplyRouteTest extends CamelTestSupport {
+
+    private static final int NAMESRV_PORT = 59877;
+
+    private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
+
+    private static final String START_ENDPOINT_URI = 
"rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1";
+
+    private static final String INTERMEDIATE_ENDPOINT_URI = 
"rocketmq:INTERMEDIATE_TOPIC" +
+                                                            
"?producerGroup=intermediaProducer" +
+                                                            
"&consumerGroup=intermediateConsumer" +
+                                                            
"&replyToTopic=REPLY_TO_TOPIC" +
+                                                            
"&replyToConsumerGroup=replyToConsumerGroup" +
+                                                            
"&requestTimeoutMillis=30000";
+
+    private static final String RESULT_ENDPOINT_URI = "mock:result";
+
+    private static final String EXPECTED_MESSAGE = "Hi.";
+
+    private static NamesrvController namesrvController;
+
+    private static BrokerController brokerController;
+
+    private MockEndpoint resultEndpoint;
+
+    private DefaultMQPushConsumer replierConsumer;
+
+    private DefaultMQProducer replierProducer;
+
+    @BeforeAll
+    static void beforeAll() throws Exception {
+        namesrvController = 
EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
+        brokerController = 
EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
+        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", 
"START_TOPIC");
+        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", 
"INTERMEDIATE_TOPIC");
+        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", 
"REPLY_TO_TOPIC");
+    }
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        resultEndpoint = (MockEndpoint) 
context.getEndpoint(RESULT_ENDPOINT_URI);
+        replierProducer = new DefaultMQProducer("replierProducer");
+        replierProducer.setNamesrvAddr(NAMESRV_ADDR);
+        replierProducer.start();
+        replierConsumer = new DefaultMQPushConsumer("replierConsumer");
+        replierConsumer.setNamesrvAddr(NAMESRV_ADDR);
+        replierConsumer.subscribe("INTERMEDIATE_TOPIC", "*");
+        replierConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, unused) -> {
+            MessageExt messageExt = msgs.get(0);
+            String key = messageExt.getKeys();
+            Message response = new Message("REPLY_TO_TOPIC", "", key, 
EXPECTED_MESSAGE.getBytes(StandardCharsets.UTF_8));
+            try {
+                replierProducer.send(response);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        replierConsumer.start();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        RocketMQComponent rocketMQComponent = new RocketMQComponent();
+        rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+        camelContext.addComponent("rocketmq", rocketMQComponent);
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(START_ENDPOINT_URI)
+                        .to(ExchangePattern.InOut, INTERMEDIATE_ENDPOINT_URI)
+                        .to(RESULT_ENDPOINT_URI);
+            }
+        };
+    }
+
+    @Test
+    public void testRouteMessageInRequestReplyMode() throws Exception {
+        resultEndpoint.expectedBodiesReceived(EXPECTED_MESSAGE);
+        
resultEndpoint.message(0).header(RocketMQConstants.TOPIC).isEqualTo("REPLY_TO_TOPIC");
+
+        template.sendBody(START_ENDPOINT_URI, "hello, RocketMQ.");
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        replierConsumer.shutdown();
+        replierProducer.shutdown();
+    }
+
+    @AfterAll
+    public static void afterAll() {
+        brokerController.shutdown();
+        namesrvController.shutdown();
+    }
+}
diff --git 
a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
 
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
new file mode 100644
index 00000000000..8deba43ad8d
--- /dev/null
+++ 
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class RocketMQRouteTest extends CamelTestSupport {
+
+    public static final String EXPECTED_MESSAGE = "hello, RocketMQ.";
+
+    private static final int NAMESRV_PORT = 59876;
+
+    private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT;
+
+    private static final String START_ENDPOINT_URI = 
"rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1&sendTag=startTag";
+
+    private static final String RESULT_ENDPOINT_URI = "mock:result";
+
+    private static NamesrvController namesrvController;
+
+    private static BrokerController brokerController;
+
+    private MockEndpoint resultEndpoint;
+
+    @BeforeAll
+    static void beforeAll() throws Exception {
+        namesrvController = 
EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT);
+        brokerController = 
EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR);
+        EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", 
"START_TOPIC");
+    }
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        resultEndpoint = (MockEndpoint) 
context.getEndpoint(RESULT_ENDPOINT_URI);
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        RocketMQComponent rocketMQComponent = new RocketMQComponent();
+        rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR);
+        camelContext.addComponent("rocketmq", rocketMQComponent);
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(START_ENDPOINT_URI).to(RESULT_ENDPOINT_URI);
+            }
+        };
+    }
+
+    @Test
+    public void testSimpleRoute() throws Exception {
+        resultEndpoint.expectedBodiesReceived(EXPECTED_MESSAGE);
+        
resultEndpoint.message(0).header(RocketMQConstants.TOPIC).isEqualTo("START_TOPIC");
+        
resultEndpoint.message(0).header(RocketMQConstants.TAG).isEqualTo("startTag");
+
+        template.sendBody(START_ENDPOINT_URI, EXPECTED_MESSAGE);
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    @AfterAll
+    public static void afterAll() {
+        brokerController.shutdown();
+        namesrvController.shutdown();
+    }
+}
diff --git 
a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
 
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
new file mode 100644
index 00000000000..153f6c4ca47
--- /dev/null
+++ 
b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.infra;
+
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.TestUtils;
+
+public final class EmbeddedRocketMQServer {
+
+    private static final AtomicInteger BROKER_INDEX = new AtomicInteger();
+
+    private static final AtomicInteger BROKER_PORTS = new AtomicInteger(61000);
+
+    private EmbeddedRocketMQServer() {
+    }
+
+    public static NamesrvController createAndStartNamesrv(int port) throws 
Exception {
+        NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(port);
+        NamesrvController result = new NamesrvController(new NamesrvConfig(), 
serverConfig);
+        result.initialize();
+        result.start();
+        return result;
+    }
+
+    public static BrokerController createAndStartBroker(String nsAddr) throws 
Exception {
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(BROKER_PORTS.getAndIncrement());
+        BrokerController result = new BrokerController(
+                prepareBrokerConfig(nsAddr), nettyServerConfig, new 
NettyClientConfig(), prepareMessageStoreConfig());
+        result.initialize();
+        result.start();
+        return result;
+    }
+
+    private static BrokerConfig prepareBrokerConfig(final String nsAddr) {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setNamesrvAddr(nsAddr);
+        brokerConfig.setBrokerName("CamelRocketMQBroker" + 
BROKER_INDEX.getAndIncrement());
+        brokerConfig.setBrokerIP1("127.0.0.1");
+        brokerConfig.setSendMessageThreadPoolNums(1);
+        brokerConfig.setPutMessageFutureThreadPoolNums(1);
+        brokerConfig.setPullMessageThreadPoolNums(1);
+        brokerConfig.setProcessReplyMessageThreadPoolNums(1);
+        brokerConfig.setQueryMessageThreadPoolNums(1);
+        brokerConfig.setAdminBrokerThreadPoolNums(1);
+        brokerConfig.setClientManageThreadPoolNums(1);
+        brokerConfig.setConsumerManageThreadPoolNums(1);
+        brokerConfig.setHeartbeatThreadPoolNums(1);
+        return brokerConfig;
+    }
+
+    private static MessageStoreConfig prepareMessageStoreConfig() {
+        String baseDir = System.getProperty("java.io.tmpdir") + 
"/embedded_rocketmq/" + System.currentTimeMillis();
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        storeConfig.setStorePathRootDir(baseDir);
+        storeConfig.setStorePathCommitLog(baseDir + "/commitlog");
+        storeConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
+        storeConfig.setMaxIndexNum(100);
+        storeConfig.setMaxHashSlotNum(400);
+        storeConfig.setHaListenPort(BROKER_PORTS.getAndIncrement());
+        return storeConfig;
+    }
+
+    public static void createTopic(String namesrvAddr, String defaultCluster, 
String topic) throws TimeoutException {
+        long startTime = System.currentTimeMillis();
+        while (!MQAdmin.createTopic(namesrvAddr, defaultCluster, topic, 4, 3)) 
{
+            if (System.currentTimeMillis() - startTime > 30 * 1000) {
+                throw new TimeoutException(
+                        String.format("Failed to create topic [%s] after %d 
ms", topic,
+                                System.currentTimeMillis() - startTime));
+            }
+            TestUtils.waitForMoment(500);
+        }
+    }
+}
diff --git a/components/camel-rocketmq/src/test/resources/logback-test.xml 
b/components/camel-rocketmq/src/test/resources/logback-test.xml
new file mode 100644
index 00000000000..d45c9a37733
--- /dev/null
+++ b/components/camel-rocketmq/src/test/resources/logback-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<configuration>
+    <appender name="DefaultAppender" 
class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss} [%p] [%t] - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+    
+    <root>
+        <level value="OFF"/>
+        <appender-ref ref="DefaultAppender"/>
+    </root>
+</configuration>
diff --git a/components/pom.xml b/components/pom.xml
index 3bbe9690c1b..6e704a03504 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -272,6 +272,7 @@
         <module>camel-resourceresolver-github</module>
         <module>camel-resteasy</module>
         <module>camel-robotframework</module>
+        <module>camel-rocketmq</module>
         <module>camel-rss</module>
         <module>camel-rxjava</module>
         <module>camel-saga</module>
diff --git a/parent/pom.xml b/parent/pom.xml
index 0c29d0212a7..9466cb192b7 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -467,6 +467,7 @@
         <resteasy-version>4.5.6.Final</resteasy-version>
         <roaster-version>2.26.0.Final</roaster-version>
         <robotframework-version>4.1.2</robotframework-version>
+        <rocketmq-version>4.9.4</rocketmq-version>
         <rome-version>1.18.0</rome-version>
         <rxjava2-version>2.2.21</rxjava2-version>
         <saxon-version>11.4</saxon-version>
@@ -2202,6 +2203,11 @@
                 <artifactId>camel-robotframework</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-rocketmq</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.camel</groupId>
                 <artifactId>camel-rss</artifactId>

Reply via email to