This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4b12691a8  [Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & 
sink connector (#3312)
4b12691a8 is described below

commit 4b12691a8d495d4c91a54a3612a4ef99243fdd7a
Author: Bibo <[email protected]>
AuthorDate: Thu Nov 24 11:57:09 2022 +0800

     [Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector 
(#3312)
    
    * add rabbitmq connector
    
    * add rabbitmq connector
    
    * [Feature][Connector-V2]  add rabbitmq connector
    
    * [Feature][Connector-V2]  add rabbitmq connector
    
    * [Feature][Connector-V2]  add rabbitmq connector
    
    * [Feature][Connector-V2] add rabbitmq connector
    
    * fix code style
    
    * fix code style
    
    * merge dev
    
    * fix code style
    
    * add e2e switch in rabbitmq config
    
    * improve RabbitmqSourceReader.pollNext IN E2E
    
    * add Rabbitmq Connector Error Codes in doc
    
    * add RabbitConnectorException and fix doc style
    
    * fix dist pom conflict
    
    Co-authored-by: 毕博 <[email protected]>
    Co-authored-by: Eric <[email protected]>
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |  14 ++
 docs/en/connector-v2/sink/Rabbitmq.md              | 106 +++++++++
 docs/en/connector-v2/source/Rabbitmq.md            | 158 ++++++++++++++
 plugin-mapping.properties                          |   2 +
 .../connector-rabbitmq}/pom.xml                    |  41 ++--
 .../rabbitmq/client/QueueingConsumer.java          |  90 ++++++++
 .../seatunnel/rabbitmq/client/RabbitmqClient.java  | 186 ++++++++++++++++
 .../seatunnel/rabbitmq/config/RabbitmqConfig.java  | 227 +++++++++++++++++++
 .../exception/RabbitmqConnectorErrorCode.java      |  50 +++++
 .../exception/RabbitmqConnectorException.java      |  35 +++
 .../seatunnel/rabbitmq/sink/RabbitmqSink.java      |  86 ++++++++
 .../rabbitmq/sink/RabbitmqSinkFactory.java         |  69 ++++++
 .../rabbitmq/sink/RabbitmqSinkWriter.java          |  54 +++++
 .../seatunnel/rabbitmq/source/DeliveryMessage.java |  30 +++
 .../seatunnel/rabbitmq/source/RabbitmqSource.java  | 119 ++++++++++
 .../rabbitmq/source/RabbitmqSourceFactory.java     |  82 +++++++
 .../rabbitmq/source/RabbitmqSourceReader.java      | 216 ++++++++++++++++++
 .../rabbitmq/source/RabbitmqSplitEnumerator.java   |  71 ++++++
 .../seatunnel/rabbitmq/split/RabbitmqSplit.java    |  40 ++++
 .../split/RabbitmqSplitEnumeratorState.java        |  23 ++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 .../{ => connector-rabbitmq-e2e}/pom.xml           |  34 +--
 .../e2e/connector/rabbitmq/RabbitmqIT.java         | 243 +++++++++++++++++++++
 .../src/test/resources/rabbitmq-to-rabbitmq.conf   |  66 ++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 26 files changed, 1995 insertions(+), 55 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 17f45742a..99d7027c3 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -54,6 +54,20 @@ This document records some common error codes and 
corresponding solutions of Sea
 | SLACK-01  | Conversation can not be founded in channels | When users 
encounter this error code, it means that the channel is not existed in slack 
workspace, please check it |
 | SLACK-02  | Write to slack channel failed               | When users 
encounter this error code, it means that slack has some problems, please check 
it whether is work       |
 
+## Rabbitmq Connector Error Codes
+
+| code        | description                                                   
| solution                                                                      
                                  |
+|-------------|---------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------|
+| RABBITMQ-01 | handle queue consumer shutdown signal failed                  
| When users encounter this error code, it means that job has some problems, 
please check it whether is work well |
+| RABBITMQ-02 | create rabbitmq client failed                                 
| When users encounter this error code, it means that rabbitmq has some 
problems, please check it whether is work |
+| RABBITMQ-03 | close connection failed                                       
| When users encounter this error code, it means that rabbitmq has some 
problems, please check it whether is work |
+| RABBITMQ-04 | send messages failed                                          
| When users encounter this error code, it means that rabbitmq has some 
problems, please check it whether is work |
+| RABBITMQ-05 | messages could not be acknowledged during checkpoint creation 
| When users encounter this error code, it means that job has some problems, 
please check it whether is work well |
+| RABBITMQ-06 | messages could not be acknowledged with basicReject           
| When users encounter this error code, it means that job has some problems, 
please check it whether is work well |
+| RABBITMQ-07 | parse uri failed                                              
| When users encounter this error code, it means that rabbitmq connect uri 
incorrect, please check it             |
+| RABBITMQ-08 | initialize ssl context failed                                 
| When users encounter this error code, it means that rabbitmq has some 
problems, please check it whether is work |
+| RABBITMQ-09 | setup ssl factory failed                                      
| When users encounter this error code, it means that rabbitmq has some 
problems, please check it whether is work |
+
 ## Socket Connector Error Codes
 
 | code      | description                                              | 
solution                                                                        
                                               |
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md 
b/docs/en/connector-v2/sink/Rabbitmq.md
new file mode 100644
index 000000000..5a6c9a7de
--- /dev/null
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -0,0 +1,106 @@
+# Rabbitmq
+
+> Rabbitmq sink connector
+
+## Description
+
+Used to write data to Rabbitmq.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+##  Options
+
+| name                        | type    | required  | default value |
+|-----------------------------|---------|-----------|---------------|
+| host                        | string  | yes       | -             |
+| port                        | int     | yes       | -             |
+| virtual_host                | string  | yes       | -             |
+| username                    | string  | yes       | -             |
+| password                    | string  | yes       | -             |
+| queue_name                  | string  | yes       | -             |
+| url                         | string  | no        | -             |
+| network_recovery_interval   | int     | no        | -             |
+| topology_recovery_enabled   | boolean | no        | -             |
+| automatic_recovery_enabled  | boolean | no        | -             |
+| connection_timeout          | int     | no        | -             |
+| common-options              |         | no        | -             |
+
+### host [string]
+
+the default host to use for connections
+
+### port [int]
+
+the default port to use for connections
+
+### virtual_host [string]
+
+virtual host – the virtual host to use when connecting to the broker
+
+### username [string]
+
+the AMQP user name to use when connecting to the broker
+
+### password [string]
+
+the password to use when connecting to the broker
+
+### url [string]
+
+convenience method for setting the fields in an AMQP URI: host, port, 
username, password and virtual host
+
+### queue_name [string]
+
+the queue to write the message to
+
+### schema [Config]
+
+#### fields [Config]
+
+the schema fields of upstream data.
+
+### network_recovery_interval [int]
+
+how long will automatic recovery wait before attempting to reconnect, in ms
+
+### topology_recovery [string]
+
+if true, enables topology recovery
+
+### automatic_recovery [string]
+
+if true, enables connection recovery
+
+### connection_timeout [int]
+
+connection TCP establishment timeout in milliseconds; zero for infinite
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
+
+## Example
+
+simple:
+
+```hocon
+sink {
+      RabbitMQ {
+          host = "rabbitmq-e2e"
+          port = 5672
+          virtual_host = "/"
+          username = "guest"
+          password = "guest"
+          queue_name = "test1"
+      }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Rabbitmq Sink Connector
diff --git a/docs/en/connector-v2/source/Rabbitmq.md 
b/docs/en/connector-v2/source/Rabbitmq.md
new file mode 100644
index 000000000..47030cd46
--- /dev/null
+++ b/docs/en/connector-v2/source/Rabbitmq.md
@@ -0,0 +1,158 @@
+# Rabbitmq
+
+> Rabbitmq source connector
+
+## Description
+
+Used to read data from Rabbitmq.
+
+## Key features
+
+- [ ] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+:::tip
+The source must be non-parallel (parallelism set to 1) in order to achieve 
exactly-once. This limitation is mainly due to RabbitMQ’s approach to 
dispatching messages from a single queue to multiple consumers.
+
+##  Options
+
+| name                        | type    | required | default value |
+|-----------------------------|---------|----------|---------------|
+| host                        | string  | yes      | -             |
+| port                        | int     | yes      | -             |
+| virtual_host                | string  | yes      | -             |
+| username                    | string  | yes      | -             |
+| password                    | string  | yes      | -             |
+| queue_name                  | string  | yes      | -             |
+| schema                      | config  | yes      | -             |
+| url                         | string  | no       | -             |
+| routing_key                 | string  | no       | -             |
+| exchange                    | string  | no       | -             |
+| network_recovery_interval   | int     | no       | -             |
+| topology_recovery_enabled   | boolean | no       | -             |
+| automatic_recovery_enabled  | boolean | no       | -             |
+| connection_timeout          | int     | no       | -             |
+| requested_channel_max       | int     | no       | -             |
+| requested_frame_max         | int     | no       | -             |
+| requested_heartbeat         | int     | no       | -             |
+| prefetch_count              | int     | no       | -             |
+| delivery_timeout            | long    | no       | -             |
+| common-options              |         | no       | -             |
+
+### host [string]
+
+the default host to use for connections
+
+### port [int]
+
+the default port to use for connections
+
+### virtual_host [string]
+
+virtual host – the virtual host to use when connecting to the broker
+
+### username [string]
+
+the AMQP user name to use when connecting to the broker
+
+### password [string]
+
+the password to use when connecting to the broker
+
+### url [string]
+
+convenience method for setting the fields in an AMQP URI: host, port, 
username, password and virtual host
+
+### queue_name [string]
+
+the queue to publish the message to
+
+### routing_key [string]
+
+the routing key to publish the message to
+
+### exchange [string]
+
+the exchange to publish the message to
+
+### schema [Config]
+
+#### fields [Config]
+
+the schema fields of upstream data.
+
+### network_recovery_interval [int]
+
+how long will automatic recovery wait before attempting to reconnect, in ms
+
+### topology_recovery [string]
+
+if true, enables topology recovery
+
+### automatic_recovery [string]
+
+if true, enables connection recovery
+
+### connection_timeout [int]
+
+connection tcp establishment timeout in milliseconds; zero for infinite
+
+### requested_channel_max [int]
+
+initially requested maximum channel number; zero for unlimited
+**Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 
0-9-1).
+
+### requested_frame_max [int]
+
+the requested maximum frame size
+
+### requested_heartbeat [int]
+
+Set the requested heartbeat timeout
+**Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 
0-9-1).
+
+### prefetch_count [int]
+
+prefetchCount the max number of messages to receive without acknowledgement
+
+### delivery_timeout [long]
+
+deliveryTimeout maximum wait time, in milliseconds, for the next message 
delivery
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
+
+## Example
+
+simple:
+
+```hocon
+source {
+    RabbitMQ {
+        host = "rabbitmq-e2e"
+        port = 5672
+        virtual_host = "/"
+        username = "guest"
+        password = "guest"
+        queue_name = "test"
+        schema = {
+            fields {
+                id = bigint
+                c_map = "map<string, smallint>"
+                c_array = "array<tinyint>"
+            }
+        }
+    }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Rabbitmq source Connector
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index e42a472de..2962c75b5 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -154,3 +154,5 @@ seatunnel.sink.Slack = connector-slack
 seatunnel.source.OneSignal = connector-http-onesignal
 seatunnel.source.Jira = connector-http-jira
 seatunnel.source.Gitlab = connector-http-gitlab
+seatunnel.sink.RabbitMQ = connector-rabbitmq
+seatunnel.source.RabbitMQ = connector-rabbitmq
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-connectors-v2/connector-rabbitmq/pom.xml
similarity index 58%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-connectors-v2/connector-rabbitmq/pom.xml
index 3faa4c868..c198a54a4 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-connectors-v2/connector-rabbitmq/pom.xml
@@ -1,65 +1,52 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <modules>
-        <module>connector-assert-e2e</module>
-        <module>connector-jdbc-e2e</module>
-        <module>connector-redis-e2e</module>
-        <module>connector-clickhouse-e2e</module>
-        <module>connector-starrocks-e2e</module>
-        <module>connector-influxdb-e2e</module>
-        <module>connector-amazondynamodb-e2e</module>
-        <module>connector-file-local-e2e</module>
-        <module>connector-cassandra-e2e</module>
-        <module>connector-neo4j-e2e</module>
-        <module>connector-http-e2e</module>
-        <module>connector-kafka-e2e</module>
-    </modules>
 
-    <artifactId>seatunnel-connector-v2-e2e</artifactId>
+    <artifactId>connector-rabbitmq</artifactId>
 
+    <properties>
+        <rabbitmq.version>5.9.0</rabbitmq.version>
+    </properties>
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-e2e-common</artifactId>
+            <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>${rabbitmq.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-starter</artifactId>
+            <artifactId>seatunnel-format-json</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
-
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/QueueingConsumer.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/QueueingConsumer.java
new file mode 100644
index 000000000..6257ca2e9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/QueueingConsumer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.client;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.HANDLE_SHUTDOWN_SIGNAL_FAILED;
+
+import org.apache.seatunnel.common.Handover;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConsumerCancelledException;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import com.rabbitmq.utility.Utility;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class QueueingConsumer extends DefaultConsumer {
+    private final Handover<Delivery> handover;
+
+    // When this is non-null the queue is in shutdown mode and nextDelivery 
should
+    // throw a shutdown signal exception.
+    private volatile ShutdownSignalException shutdown;
+    private volatile ConsumerCancelledException cancelled;
+
+    private static final Delivery POISON = new Delivery(null, null, null);
+
+    public QueueingConsumer(Channel channel, Handover<Delivery> handover) {
+        this(channel, Integer.MAX_VALUE, handover);
+    }
+
+    public QueueingConsumer(Channel channel, int capacity, Handover<Delivery> 
handover) {
+        super(channel);
+        this.handover = handover;
+    }
+
+    private void checkShutdown() {
+        if (shutdown != null) {
+            throw Utility.fixStackTrace(shutdown);
+        }
+    }
+
+    @Override
+    public void handleShutdownSignal(String consumerTag, 
ShutdownSignalException sig) {
+        shutdown = sig;
+        try {
+            handover.produce(POISON);
+        } catch (InterruptedException | Handover.ClosedException e) {
+            throw new 
RabbitmqConnectorException(HANDLE_SHUTDOWN_SIGNAL_FAILED, e);
+        }
+    }
+
+    @SneakyThrows
+    @Override
+    public void handleCancel(String consumerTag) throws IOException {
+        cancelled = new ConsumerCancelledException();
+        handover.produce(POISON);
+    }
+
+    @SneakyThrows
+    @Override
+    public void handleDelivery(
+            String consumerTag, Envelope envelope, AMQP.BasicProperties 
properties, byte[] body)
+            throws IOException {
+        checkShutdown();
+        log.info(new String(body));
+        handover.produce(new Delivery(envelope, properties, body));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
new file mode 100644
index 000000000..0e7138110
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.client;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.CLOSE_CONNECTION_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.CREATE_RABBITMQ_CLIENT_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.INIT_SSL_CONTEXT_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.PARSE_URI_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.SEND_MESSAGE_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.SETUP_SSL_FACTORY_FAILED;
+
+import org.apache.seatunnel.common.Handover;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.TimeoutException;
+
+@Slf4j
+@AllArgsConstructor
+public class RabbitmqClient {
+    private final RabbitmqConfig config;
+    private final ConnectionFactory connectionFactory;
+    private final Connection connection;
+    private final Channel channel;
+
+    public RabbitmqClient(RabbitmqConfig config) {
+        this.config = config;
+        try {
+            this.connectionFactory = getConnectionFactory();
+            this.connection = connectionFactory.newConnection();
+            this.channel = connection.createChannel();
+            //set channel prefetch count
+            if (config.getPrefetchCount() != null) {
+                channel.basicQos(config.getPrefetchCount(), true);
+            }
+            setupQueue();
+        } catch (Exception e) {
+            throw new 
RabbitmqConnectorException(CREATE_RABBITMQ_CLIENT_FAILED, String.format("Error 
while create RMQ client with %s at %s", config.getQueueName(), 
config.getHost()), e);
+        }
+
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public DefaultConsumer getQueueingConsumer(Handover<Delivery> handover) {
+        DefaultConsumer consumer = new QueueingConsumer(channel, handover);
+        return consumer;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        ConnectionFactory factory = new ConnectionFactory();
+        if (!StringUtils.isEmpty(config.getUri())) {
+            try {
+                factory.setUri(config.getUri());
+            } catch (URISyntaxException e) {
+                throw new RabbitmqConnectorException(PARSE_URI_FAILED, e);
+            } catch (KeyManagementException e) {
+                // this should never happen
+                throw new RabbitmqConnectorException(INIT_SSL_CONTEXT_FAILED, 
e);
+            } catch (NoSuchAlgorithmException e) {
+                // this should never happen
+                throw new RabbitmqConnectorException(SETUP_SSL_FACTORY_FAILED, 
e);
+            }
+        } else {
+            factory.setHost(config.getHost());
+            factory.setPort(config.getPort());
+            factory.setVirtualHost(config.getVirtualHost());
+            factory.setUsername(config.getUsername());
+            factory.setPassword(config.getPassword());
+        }
+
+        if (config.getAutomaticRecovery() != null) {
+            factory.setAutomaticRecoveryEnabled(config.getAutomaticRecovery());
+        }
+        if (config.getConnectionTimeout() != null) {
+            factory.setConnectionTimeout(config.getConnectionTimeout());
+        }
+        if (config.getNetworkRecoveryInterval() != null) {
+            
factory.setNetworkRecoveryInterval(config.getNetworkRecoveryInterval());
+        }
+        if (config.getRequestedHeartbeat() != null) {
+            factory.setRequestedHeartbeat(config.getRequestedHeartbeat());
+        }
+        if (config.getTopologyRecovery() != null) {
+            factory.setTopologyRecoveryEnabled(config.getTopologyRecovery());
+        }
+        if (config.getRequestedChannelMax() != null) {
+            factory.setRequestedChannelMax(config.getRequestedChannelMax());
+        }
+        if (config.getRequestedFrameMax() != null) {
+            factory.setRequestedFrameMax(config.getRequestedFrameMax());
+        }
+        return factory;
+    }
+
+    public void write(byte[] msg) {
+        try {
+            if (StringUtils.isEmpty(config.getRoutingKey())) {
+                channel.basicPublish("", config.getQueueName(), null, msg);
+            } else {
+                //not support set returnListener
+                channel.basicPublish(
+                        config.getExchange(),
+                        config.getRoutingKey(),
+                        false,
+                        false,
+                        null,
+                        msg);
+            }
+        } catch (IOException e) {
+            if (config.isLogFailuresOnly()) {
+                log.error("Cannot send RMQ message {} at {}", 
config.getQueueName(), config.getHost(), e);
+            } else {
+                throw new RabbitmqConnectorException(SEND_MESSAGE_FAILED, 
String.format("Cannot send RMQ message %s at %s", config.getQueueName(), 
config.getHost()), e);
+            }
+        }
+    }
+
+    public void close() {
+        Exception t = null;
+        try {
+            if (channel != null) {
+                channel.close();
+            }
+        } catch (IOException | TimeoutException e) {
+            t = e;
+        }
+
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (IOException e) {
+            if (t != null) {
+                log.warn(
+                        "Both channel and connection closing failed. Logging 
channel exception and failing with connection exception",
+                        t);
+            }
+            t = e;
+        }
+        if (t != null) {
+            throw new RabbitmqConnectorException(CLOSE_CONNECTION_FAILED, 
String.format("Error while closing RMQ connection with  %s at %s", 
config.getQueueName(), config.getHost()), t);
+        }
+    }
+
+    protected void setupQueue() throws IOException {
+        if (config.getQueueName() != null) {
+            declareQueueDefaults(channel, config.getQueueName());
+        }
+    }
+
+    private void declareQueueDefaults(Channel channel, String queueName) 
throws IOException {
+        channel.queueDeclare(queueName, true, false, false, null);
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
new file mode 100644
index 000000000..ff3051b0c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+@Setter
+@Getter
+@AllArgsConstructor
+public class RabbitmqConfig implements Serializable {
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;
+    private Integer prefetchCount;
+    private  long deliveryTimeout;
+    private String queueName;
+    private String routingKey;
+    private boolean logFailuresOnly = false;
+    private String exchange = "";
+
+    private boolean forE2ETesting = false;
+
+    public static final String RABBITMQ_SINK_CONFIG_PREFIX = 
"rabbitmq.properties.";
+
+    private final Map<String, Object> sinkOptionProps = new HashMap<>();
+
+    public static final Option<String> HOST = Options.key("host")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the default host to use for connections");
+
+    public static final Option<Integer> PORT = Options.key("port")
+            .intType()
+            .noDefaultValue()
+            .withDescription("the default port to use for connections");
+
+    public static final Option<String> VIRTUAL_HOST = 
Options.key("virtual_host")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the virtual host to use when connecting to the 
broker");
+
+    public static final Option<String> USERNAME = Options.key("username")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the AMQP user name to use when connecting to the 
broker");
+
+    public static final Option<String> PASSWORD = Options.key("password")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the password to use when connecting to the 
broker");
+
+
+    public static final Option<String> QUEUE_NAME = Options.key("queue_name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the queue to write the message to");
+
+    public static final Option<String> URL = Options.key("url")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("convenience method for setting the fields in an 
AMQP URI: host, port, username, password and virtual host");
+
+    public static final Option<Integer> NETWORK_RECOVERY_INTERVAL = 
Options.key("network_recovery_interval")
+            .intType()
+            .noDefaultValue()
+            .withDescription("how long will automatic recovery wait before 
attempting to reconnect, in ms");
+
+    public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED = 
Options.key("AUTOMATIC_RECOVERY_ENABLED")
+            .booleanType()
+            .noDefaultValue()
+            .withDescription("if true, enables connection recovery");
+
+    public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED = 
Options.key("topology_recovery_enabled")
+            .booleanType()
+            .noDefaultValue()
+            .withDescription("if true, enables topology recovery");
+
+    public static final Option<Integer> CONNECTION_TIMEOUT = 
Options.key("connection_timeout")
+            .intType()
+            .noDefaultValue()
+            .withDescription("connection TCP establishment timeout in 
milliseconds");
+
+
+    public static final Option<Integer> REQUESTED_CHANNEL_MAX = 
Options.key("requested_channel_max")
+            .intType()
+            .noDefaultValue()
+            .withDescription("initially requested maximum channel number");
+
+    public static final Option<Integer> REQUESTED_FRAME_MAX = 
Options.key("requested_frame_max")
+            .intType()
+            .noDefaultValue()
+            .withDescription("the requested maximum frame size");
+
+    public static final Option<Integer> REQUESTED_HEARTBEAT = 
Options.key("requested_heartbeat")
+            .intType()
+            .noDefaultValue()
+            .withDescription("the requested heartbeat timeout");
+
+    public static final Option<Long> PREFETCH_COUNT = 
Options.key("prefetch_count")
+            .longType()
+            .noDefaultValue()
+            .withDescription("prefetchCount the max number of messages to 
receive without acknowledgement\n");
+
+    public static final Option<Integer> DELIVERY_TIMEOUT = 
Options.key("delivery_timeout")
+            .intType()
+            .noDefaultValue()
+            .withDescription("deliveryTimeout maximum wait time");
+
+    public static final Option<String> ROUTING_KEY = Options.key("routing_key")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the routing key to publish the message to");
+
+    public static final Option<String> EXCHANGE = Options.key("exchange")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the exchange to publish the message to");
+
+    public static final Option<Boolean> FOR_E2E_TESTING = 
Options.key("for_e2e_testing")
+            .booleanType()
+            .noDefaultValue()
+            .withDescription("use to recognize E2E mode");
+
+    private void parseSinkOptionProperties(Config pluginConfig) {
+        Config sinkOptionConfig = 
TypesafeConfigUtils.extractSubConfig(pluginConfig,
+                RABBITMQ_SINK_CONFIG_PREFIX, false);
+        sinkOptionConfig.entrySet().forEach(entry -> {
+            final String configKey = entry.getKey().toLowerCase();
+            this.sinkOptionProps.put(configKey, entry.getValue().unwrapped());
+        });
+    }
+
+    public RabbitmqConfig(Config config) {
+        this.host = config.getString(HOST.key());
+        this.port = config.getInt(PORT.key());
+        this.queueName = config.getString(QUEUE_NAME.key());
+        if (config.hasPath(USERNAME.key())) {
+            this.username = config.getString(USERNAME.key());
+        }
+        if (config.hasPath(PASSWORD.key())) {
+            this.password = config.getString(PASSWORD.key());
+        }
+        if (config.hasPath(VIRTUAL_HOST.key())) {
+            this.virtualHost = config.getString(VIRTUAL_HOST.key());
+        }
+        if (config.hasPath(NETWORK_RECOVERY_INTERVAL.key())) {
+            this.networkRecoveryInterval = 
config.getInt(NETWORK_RECOVERY_INTERVAL.key());
+        }
+        if (config.hasPath(AUTOMATIC_RECOVERY_ENABLED.key())) {
+            this.automaticRecovery = 
config.getBoolean(AUTOMATIC_RECOVERY_ENABLED.key());
+        }
+        if (config.hasPath(TOPOLOGY_RECOVERY_ENABLED.key())) {
+            this.topologyRecovery = 
config.getBoolean(TOPOLOGY_RECOVERY_ENABLED.key());
+        }
+        if (config.hasPath(CONNECTION_TIMEOUT.key())) {
+            this.connectionTimeout = config.getInt(CONNECTION_TIMEOUT.key());
+        }
+        if (config.hasPath(REQUESTED_CHANNEL_MAX.key())) {
+            this.requestedChannelMax = 
config.getInt(REQUESTED_CHANNEL_MAX.key());
+        }
+        if (config.hasPath(REQUESTED_FRAME_MAX.key())) {
+            this.requestedFrameMax = config.getInt(REQUESTED_FRAME_MAX.key());
+        }
+        if (config.hasPath(REQUESTED_HEARTBEAT.key())) {
+            this.requestedHeartbeat = config.getInt(REQUESTED_HEARTBEAT.key());
+        }
+        if (config.hasPath(PREFETCH_COUNT.key())) {
+            this.prefetchCount = config.getInt(PREFETCH_COUNT.key());
+        }
+        if (config.hasPath(DELIVERY_TIMEOUT.key())) {
+            this.deliveryTimeout = config.getInt(DELIVERY_TIMEOUT.key());
+        }
+        if (config.hasPath(ROUTING_KEY.key())) {
+            this.routingKey = config.getString(ROUTING_KEY.key());
+        }
+        if (config.hasPath(EXCHANGE.key())) {
+            this.exchange = config.getString(EXCHANGE.key());
+        }
+        if (config.hasPath(FOR_E2E_TESTING.key())) {
+            this.forE2ETesting = config.getBoolean(FOR_E2E_TESTING.key());
+        }
+        parseSinkOptionProperties(config);
+    }
+
+    @VisibleForTesting
+    public RabbitmqConfig() {
+
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/exception/RabbitmqConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/exception/RabbitmqConnectorErrorCode.java
new file mode 100644
index 000000000..e1abf85ef
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/exception/RabbitmqConnectorErrorCode.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum RabbitmqConnectorErrorCode implements SeaTunnelErrorCode {
+    HANDLE_SHUTDOWN_SIGNAL_FAILED("RABBITMQ-01", "handle queue consumer 
shutdown signal failed"),
+    CREATE_RABBITMQ_CLIENT_FAILED("RABBITMQ-02", "create rabbitmq client 
failed"),
+    CLOSE_CONNECTION_FAILED("RABBITMQ-03", "close connection failed"),
+    SEND_MESSAGE_FAILED("RABBITMQ-04", "send messages failed"),
+    MESSAGE_ACK_FAILED("RABBITMQ-05", "messages could not be acknowledged 
during checkpoint creation"),
+    MESSAGE_ACK_REJECTED("RABBITMQ-06", "messages could not be acknowledged 
with basicReject"),
+    PARSE_URI_FAILED("RABBITMQ-07", "parse uri failed"),
+    INIT_SSL_CONTEXT_FAILED("RABBITMQ-08", "initialize ssl context failed"),
+    SETUP_SSL_FACTORY_FAILED("RABBITMQ-09", "setup ssl factory failed");
+
+    private final String code;
+    private final String description;
+
+    RabbitmqConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/exception/RabbitmqConnectorException.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/exception/RabbitmqConnectorException.java
new file mode 100644
index 000000000..68b037939
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/exception/RabbitmqConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class RabbitmqConnectorException extends SeaTunnelRuntimeException {
+    public RabbitmqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public RabbitmqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public RabbitmqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
new file mode 100644
index 000000000..971397da3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class RabbitmqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private SeaTunnelRowType seaTunnelRowType;
+    private Config pluginConfig;
+    private RabbitmqConfig rabbitMQConfig;
+
+    @Override
+    public String getPluginName() {
+        return "RabbitMQ";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HOST.key(), PORT.key(), VIRTUAL_HOST.key(), USERNAME.key(), PASSWORD.key(), 
QUEUE_NAME.key());
+        if (!result.isSuccess()) {
+            throw new 
RabbitmqConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
+        }
+        rabbitMQConfig = new RabbitmqConfig(pluginConfig);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType getConsumedType() {
+        return seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
new file mode 100644
index 000000000..12ee197c2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
@@ -0,0 +1,69 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RabbitmqSinkFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "RabbitMQ";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(
+                HOST,
+                PORT,
+                VIRTUAL_HOST,
+                QUEUE_NAME
+            )
+            .bundled(USERNAME, PASSWORD)
+            .optional(
+                URL,
+                ROUTING_KEY,
+                EXCHANGE,
+                NETWORK_RECOVERY_INTERVAL,
+                TOPOLOGY_RECOVERY_ENABLED,
+                AUTOMATIC_RECOVERY_ENABLED,
+                CONNECTION_TIMEOUT
+            )
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkWriter.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkWriter.java
new file mode 100644
index 000000000..a5d7c06b5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import java.util.Optional;
+
+public class RabbitmqSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> 
{
+    private RabbitmqClient rabbitMQClient;
+    private final JsonSerializationSchema jsonSerializationSchema;
+
+    public RabbitmqSinkWriter(RabbitmqConfig config, SeaTunnelRowType 
seaTunnelRowType) {
+        this.rabbitMQClient = new RabbitmqClient(config);
+        this.jsonSerializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) {
+        rabbitMQClient.write(jsonSerializationSchema.serialize(element));
+    }
+
+    @Override
+    public Optional prepareCommit() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void close() {
+        if (rabbitMQClient != null) {
+            rabbitMQClient.close();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/DeliveryMessage.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/DeliveryMessage.java
new file mode 100644
index 000000000..7e8ec4e87
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/DeliveryMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
+
+import com.rabbitmq.client.Delivery;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@AllArgsConstructor
+@Setter
+@Getter
+public final class DeliveryMessage {
+    private final Delivery delivery;
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
new file mode 100644
index 000000000..28a5ddc7e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class RabbitmqSource implements SeaTunnelSource<SeaTunnelRow, 
RabbitmqSplit, RabbitmqSplitEnumeratorState> {
+
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private JobContext jobContext;
+    private RabbitmqConfig rabbitMQConfig;
+
+    @Override
+    public Boundedness getBoundedness() {
+        if (!JobMode.STREAMING.equals(jobContext.getJobMode())) {
+            throw new 
RabbitmqConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, "not support 
batch job mode"));
+        }
+        return rabbitMQConfig.isForE2ETesting() ? Boundedness.BOUNDED : 
Boundedness.UNBOUNDED;
+    }
+
+    @Override
+    public String getPluginName() {
+        return "RabbitMQ";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, 
HOST.key(), PORT.key(), VIRTUAL_HOST.key(), USERNAME.key(), PASSWORD.key(), 
QUEUE_NAME.key(), SCHEMA.key());
+        if (!result.isSuccess()) {
+            throw new 
RabbitmqConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
+        }
+        this.rabbitMQConfig = new RabbitmqConfig(config);
+        setDeserialization(config);
+    }
+
+    @Override
+    public SeaTunnelDataType getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, RabbitmqSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
+        return new RabbitmqSourceReader(deserializationSchema, readerContext, 
rabbitMQConfig);
+    }
+
+    @Override
+    public SourceSplitEnumerator<RabbitmqSplit, RabbitmqSplitEnumeratorState> 
createEnumerator(SourceSplitEnumerator.Context<RabbitmqSplit> 
enumeratorContext) throws Exception {
+        return new RabbitmqSplitEnumerator();
+    }
+
+    @Override
+    public SourceSplitEnumerator<RabbitmqSplit, RabbitmqSplitEnumeratorState> 
restoreEnumerator(SourceSplitEnumerator.Context<RabbitmqSplit> 
enumeratorContext, RabbitmqSplitEnumeratorState checkpointState) throws 
Exception {
+        return new RabbitmqSplitEnumerator();
+    }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+    }
+
+    private void setDeserialization(Config config) {
+        // TODO: format SPI
+        //only support json deserializationSchema
+        SeaTunnelRowType rowType = 
SeaTunnelSchema.buildWithConfig(config.getConfig(SCHEMA.key())).getSeaTunnelRowType();
+        this.deserializationSchema = new JsonDeserializationSchema(false, 
false, rowType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
new file mode 100644
index 000000000..413cdaafa
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.DELIVERY_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PREFETCH_COUNT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_CHANNEL_MAX;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_FRAME_MAX;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_HEARTBEAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RabbitmqSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "RabbitMQ";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(
+                HOST,
+                PORT,
+                VIRTUAL_HOST,
+                USERNAME,
+                PASSWORD,
+                QUEUE_NAME,
+                SCHEMA
+            )
+            .bundled(USERNAME, PASSWORD)
+            .optional(
+                URL,
+                ROUTING_KEY,
+                EXCHANGE,
+                NETWORK_RECOVERY_INTERVAL,
+                TOPOLOGY_RECOVERY_ENABLED,
+                AUTOMATIC_RECOVERY_ENABLED,
+                CONNECTION_TIMEOUT,
+                REQUESTED_CHANNEL_MAX,
+                REQUESTED_FRAME_MAX,
+                REQUESTED_HEARTBEAT,
+                PREFETCH_COUNT,
+                DELIVERY_TIMEOUT
+            )
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
new file mode 100644
index 000000000..f1df92638
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.MESSAGE_ACK_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode.MESSAGE_ACK_REJECTED;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.Handover;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+@Slf4j
+public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> 
{
+    protected final Handover<Delivery> handover;
+
+    protected final SourceReader.Context context;
+    protected transient Channel channel;
+    private final boolean usesCorrelationId = true;
+    protected transient boolean autoAck;
+
+    protected transient Set<String> correlationIdsProcessedButNotAcknowledged;
+    protected transient List<Long> deliveryTagsProcessedForCurrentSnapshot;
+
+    protected final SortedMap<Long, List<Long>> pendingDeliveryTagsToCommit;
+    protected final SortedMap<Long, Set<String>> pendingCorrelationIdsToCommit;
+
+    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private RabbitmqClient rabbitMQClient;
+    private DefaultConsumer consumer;
+    private final RabbitmqConfig config;
+
+    public RabbitmqSourceReader(DeserializationSchema<SeaTunnelRow> 
deserializationSchema,
+                                SourceReader.Context context,
+                                RabbitmqConfig config) {
+        this.handover = new Handover<>();
+        this.pendingDeliveryTagsToCommit = 
Collections.synchronizedSortedMap(new TreeMap<>());
+        this.pendingCorrelationIdsToCommit = 
Collections.synchronizedSortedMap(new TreeMap<>());
+        this.context = context;
+        this.deserializationSchema = deserializationSchema;
+        this.config = config;
+        this.rabbitMQClient = new RabbitmqClient(config);
+        this.channel = rabbitMQClient.getChannel();
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.correlationIdsProcessedButNotAcknowledged = new HashSet<>();
+        this.deliveryTagsProcessedForCurrentSnapshot = new ArrayList<>();
+        consumer = rabbitMQClient.getQueueingConsumer(handover);
+
+        if (Boundedness.UNBOUNDED.equals(context.getBoundedness())) {
+            autoAck = false;
+            // enables transaction mode
+            channel.txSelect();
+        } else {
+            autoAck = true;
+        }
+
+        log.debug("Starting RabbitMQ source with autoAck status: " + autoAck);
+        channel.basicConsume(config.getQueueName(), autoAck, consumer);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (rabbitMQClient != null) {
+            rabbitMQClient.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector output) throws Exception {
+        Optional<Delivery> deliveryOptional = handover.pollNext();
+        if (deliveryOptional.isPresent()) {
+            Delivery delivery = deliveryOptional.get();
+            AMQP.BasicProperties properties = delivery.getProperties();
+            byte[] body = delivery.getBody();
+            Envelope envelope = delivery.getEnvelope();
+            synchronized (output.getCheckpointLock()) {
+                boolean newMessage = 
verifyMessageIdentifier(properties.getCorrelationId(), 
envelope.getDeliveryTag());
+                if (!newMessage) {
+                    return;
+                }
+                
deliveryTagsProcessedForCurrentSnapshot.add(envelope.getDeliveryTag());
+                deserializationSchema.deserialize(body, output);
+            }
+
+            if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                // signal to the source that we have reached the end of the 
data.
+                // rabbitmq source connector on support streaming mode, this 
is for test
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+    @Override
+    public List snapshotState(long checkpointId) throws Exception {
+
+        List<RabbitmqSplit> pendingSplit = Collections.singletonList(new 
RabbitmqSplit(deliveryTagsProcessedForCurrentSnapshot, 
correlationIdsProcessedButNotAcknowledged));
+        // perform a snapshot for these splits.
+        List<Long> deliveryTags =
+                pendingDeliveryTagsToCommit.computeIfAbsent(checkpointId, id 
-> new ArrayList<>());
+        Set<String> correlationIds =
+                pendingCorrelationIdsToCommit.computeIfAbsent(checkpointId, id 
-> new HashSet<>());
+        // put currentCheckPoint deliveryTags and CorrelationIds.
+        for (RabbitmqSplit split : pendingSplit) {
+            List<Long> currentCheckPointDeliveryTags = split.getDeliveryTags();
+            Set<String> currentCheckPointCorrelationIds = 
split.getCorrelationIds();
+
+            if (currentCheckPointDeliveryTags != null) {
+                deliveryTags.addAll(currentCheckPointDeliveryTags);
+            }
+            if (currentCheckPointCorrelationIds != null) {
+                correlationIds.addAll(currentCheckPointCorrelationIds);
+            }
+        }
+        return pendingSplit;
+    }
+
+    @Override
+    public void addSplits(List splits) {
+        //do nothing
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        //do nothing
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        log.debug("Committing cursors for checkpoint {}", checkpointId);
+        List<Long> pendingDeliveryTags = 
pendingDeliveryTagsToCommit.remove(checkpointId);
+        Set<String> pendingCorrelationIds = 
pendingCorrelationIdsToCommit.remove(checkpointId);
+
+        if (pendingDeliveryTags == null || pendingCorrelationIds == null) {
+            log.debug(
+                    "pending delivery tags or correlationIds checkpoint {} 
either do not exist or have already been committed.",
+                    checkpointId);
+            return;
+        }
+        acknowledgeDeliveryTags(pendingDeliveryTags);
+        
correlationIdsProcessedButNotAcknowledged.removeAll(pendingCorrelationIds);
+
+    }
+
+    protected void acknowledgeDeliveryTags(List<Long> deliveryTags) {
+        try {
+            for (long id : deliveryTags) {
+                channel.basicAck(id, false);
+            }
+            channel.txCommit();
+        } catch (IOException e) {
+            throw new RabbitmqConnectorException(MESSAGE_ACK_FAILED, e);
+        }
+    }
+
+    public boolean verifyMessageIdentifier(String correlationId, long 
deliveryTag) {
+        if (!autoAck) {
+            if (usesCorrelationId) {
+                com.google.common.base.Preconditions.checkNotNull(
+                        correlationId,
+                        "RabbitMQ source was instantiated with 
usesCorrelationId set to "
+                                + "true yet we couldn't extract the 
correlation id from it!");
+                if 
(!correlationIdsProcessedButNotAcknowledged.add(correlationId)) {
+                    // we have already processed this message
+                    try {
+                        channel.basicReject(deliveryTag, false);
+                    } catch (IOException e) {
+                        throw new 
RabbitmqConnectorException(MESSAGE_ACK_REJECTED, e);
+                    }
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSplitEnumerator.java
new file mode 100644
index 000000000..c01436830
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSplitEnumerator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import java.io.IOException;
+import java.util.List;
+
+public class RabbitmqSplitEnumerator implements SourceSplitEnumerator {
+
+    @Override
+    public void open() {
+        //do nothing
+    }
+
+    @Override
+    public void run() throws Exception {
+        //do nothing
+    }
+
+    @Override
+    public void close() throws IOException {
+        //do nothing
+    }
+
+    @Override
+    public void addSplitsBack(List splits, int subtaskId) {
+        //do nothing
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        //do nothing
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        //do nothing
+    }
+
+    @Override
+    public Object snapshotState(long checkpointId) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        //do nothing
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/split/RabbitmqSplit.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/split/RabbitmqSplit.java
new file mode 100644
index 000000000..70031aa66
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/split/RabbitmqSplit.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.split;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+import java.util.Set;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class RabbitmqSplit implements SourceSplit {
+    private List<Long> deliveryTags;
+    private Set<String> correlationIds;
+
+    @Override
+    public String splitId() {
+        return "";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/split/RabbitmqSplitEnumeratorState.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/split/RabbitmqSplitEnumeratorState.java
new file mode 100644
index 000000000..d4aedf124
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/split/RabbitmqSplitEnumeratorState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.split;
+
+import java.io.Serializable;
+
+public class RabbitmqSplitEnumeratorState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 4d8776e9c..6eab714d8 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -63,6 +63,7 @@
         <module>connector-starrocks</module>
         <module>connector-google-sheets</module>
         <module>connector-slack</module>
+        <module>connector-rabbitmq</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 6e2d18033..b37a99356 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -369,6 +369,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-rabbitmq</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/pom.xml
similarity index 58%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/pom.xml
index 3faa4c868..fb3d0cb3c 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/pom.xml
@@ -17,49 +17,27 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <modules>
-        <module>connector-assert-e2e</module>
-        <module>connector-jdbc-e2e</module>
-        <module>connector-redis-e2e</module>
-        <module>connector-clickhouse-e2e</module>
-        <module>connector-starrocks-e2e</module>
-        <module>connector-influxdb-e2e</module>
-        <module>connector-amazondynamodb-e2e</module>
-        <module>connector-file-local-e2e</module>
-        <module>connector-cassandra-e2e</module>
-        <module>connector-neo4j-e2e</module>
-        <module>connector-http-e2e</module>
-        <module>connector-kafka-e2e</module>
-    </modules>
 
-    <artifactId>seatunnel-connector-v2-e2e</artifactId>
+    <artifactId>connector-rabbitmq-e2e</artifactId>
 
     <dependencies>
+        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-e2e-common</artifactId>
+            <artifactId>connector-rabbitmq</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-starter</artifactId>
+            <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-</project>
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
new file mode 100644
index 000000000..517094118
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.rabbitmq;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.Handover;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Delivery;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+@Slf4j
+public class RabbitmqIT extends TestSuiteBase implements TestResource {
+    private static final String IMAGE = "rabbitmq:3-management";
+    private static final String HOST = "rabbitmq-e2e";
+    private static final int PORT = 5672;
+    private static final String QUEUE_NAME = "test";
+    private static final String SINK_QUEUE_NAME = "test1";
+    private static final String USERNAME = "guest";
+    private static final String PASSWORD = "guest";
+
+    private static final Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> 
TEST_DATASET = generateTestDataSet();
+    private static final JsonSerializationSchema JSON_SERIALIZATION_SCHEMA = 
new JsonSerializationSchema(TEST_DATASET._1());
+
+    private GenericContainer<?> rabbitmqContainer;
+    Connection connection;
+    RabbitmqClient rabbitmqClient;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.rabbitmqContainer = new 
GenericContainer<>(DockerImageName.parse(IMAGE))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withExposedPorts(PORT, 15672)
+                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+                .waitingFor(new HostPortWaitStrategy()
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        Startables.deepStart(Stream.of(rabbitmqContainer)).join();
+        log.info("rabbitmq container started");
+        this.initRabbitMQ();
+    }
+
+    private void initSourceData() throws IOException, InterruptedException {
+        List<SeaTunnelRow> rows = TEST_DATASET._2();
+        for (int i = 0; i < rows.size(); i++) {
+            rabbitmqClient.write(new 
String(JSON_SERIALIZATION_SCHEMA.serialize(rows.get(1))).getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> 
generateTestDataSet() {
+
+        SeaTunnelRowType rowType = new SeaTunnelRowType(
+                new String[]{
+                    "id",
+                    "c_map",
+                    "c_array",
+                    "c_string",
+                    "c_boolean",
+                    "c_tinyint",
+                    "c_smallint",
+                    "c_int",
+                    "c_bigint",
+                    "c_float",
+                    "c_double",
+                    "c_decimal",
+                    "c_bytes",
+                    "c_date",
+                    "c_timestamp"
+                },
+                new SeaTunnelDataType[]{
+                    BasicType.LONG_TYPE,
+                    new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                    ArrayType.BYTE_ARRAY_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.BOOLEAN_TYPE,
+                    BasicType.BYTE_TYPE,
+                    BasicType.SHORT_TYPE,
+                    BasicType.INT_TYPE,
+                    BasicType.LONG_TYPE,
+                    BasicType.FLOAT_TYPE,
+                    BasicType.DOUBLE_TYPE,
+                    new DecimalType(2, 1),
+                    PrimitiveByteArrayType.INSTANCE,
+                    LocalTimeType.LOCAL_DATE_TYPE,
+                    LocalTimeType.LOCAL_DATE_TIME_TYPE
+                }
+        );
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                 new Object[]{
+                     Long.valueOf(1),
+                     Collections.singletonMap("key", Short.parseShort("1")),
+                     new Byte[]{Byte.parseByte("1")},
+                     "string",
+                     Boolean.FALSE,
+                     Byte.parseByte("1"),
+                     Short.parseShort("1"),
+                     Integer.parseInt("1"),
+                     Long.parseLong("1"),
+                     Float.parseFloat("1.1"),
+                     Double.parseDouble("1.1"),
+                     BigDecimal.valueOf(11, 1),
+                     "test".getBytes(),
+                     LocalDate.now(),
+                     LocalDateTime.now()
+                 });
+            rows.add(row);
+        }
+        return Tuple2.apply(rowType, rows);
+    }
+
+    private void initRabbitMQ() {
+        try {
+            RabbitmqConfig config = new RabbitmqConfig();
+            config.setHost(rabbitmqContainer.getHost());
+            config.setPort(rabbitmqContainer.getFirstMappedPort());
+            config.setQueueName(QUEUE_NAME);
+            config.setVirtualHost("/");
+            config.setUsername(USERNAME);
+            config.setPassword(PASSWORD);
+            rabbitmqClient = new RabbitmqClient(config);
+        } catch (Exception e) {
+            throw new RuntimeException("init Rabbitmq error", e);
+        }
+    }
+
+    private RabbitmqClient initSinkRabbitMQ() {
+
+        try {
+            RabbitmqConfig config = new RabbitmqConfig();
+            config.setHost(rabbitmqContainer.getHost());
+            config.setPort(rabbitmqContainer.getFirstMappedPort());
+            config.setQueueName(SINK_QUEUE_NAME);
+            config.setVirtualHost("/");
+            config.setUsername(USERNAME);
+            config.setPassword(PASSWORD);
+            return new RabbitmqClient(config);
+        } catch (Exception e) {
+            throw new RuntimeException("init Rabbitmq error", e);
+        }
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        rabbitmqContainer.close();
+    }
+
+    @TestTemplate
+    public void testRabbitMQ(TestContainer container) throws Exception {
+        //send data to source queue before executeJob start in every 
testContainer
+        initSourceData();
+
+        //init consumer client before executeJob start in every testContainer
+        RabbitmqClient sinkRabbitmqClient =  initSinkRabbitMQ();
+
+        Set<String> resultSet = new HashSet<>();
+        Handover  handover = new Handover<>();
+        DefaultConsumer consumer = 
sinkRabbitmqClient.getQueueingConsumer(handover);
+        sinkRabbitmqClient.getChannel().basicConsume(SINK_QUEUE_NAME, true, 
consumer);
+        // assert execute Job code
+        Container.ExecResult execResult = 
container.executeJob("/rabbitmq-to-rabbitmq.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        //consume data when every  testContainer finished
+        //try to poll five times
+        for (int i = 0; i < 5; i++) {
+            Optional<Delivery> deliveryOptional = handover.pollNext();
+            if (deliveryOptional.isPresent()) {
+                Delivery delivery = deliveryOptional.get();
+                byte[] body = delivery.getBody();
+                resultSet.add(new String(body));
+            }
+        }
+        // close to prevent rabbitmq client consumer in the next TestContainer 
to consume
+        sinkRabbitmqClient.close();
+        //assert source and sink data
+        Assertions.assertTrue(resultSet.size() > 0);
+        Assertions.assertTrue(resultSet.stream().findAny().get().equals(new 
String(JSON_SERIALIZATION_SCHEMA.serialize(TEST_DATASET._2().get(1)))));
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
new file mode 100644
index 000000000..79494c8ba
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "STREAMING"
+}
+
+source {
+    RabbitMQ {
+        host = "rabbitmq-e2e"
+        port = 5672
+        virtual_host = "/"
+        username = "guest"
+        password = "guest"
+        queue_name = "test"
+        for_e2e_testing = true
+        schema = {
+            fields {
+                id = bigint
+                c_map = "map<string, smallint>"
+                c_array = "array<tinyint>"
+                c_string = string
+                c_boolean = boolean
+                c_tinyint = tinyint
+                c_smallint = smallint
+                c_int = int
+                c_bigint = bigint
+                c_float = float
+                c_double = double
+                c_decimal = "decimal(2, 1)"
+                c_bytes = bytes
+                c_date = date
+                c_timestamp = timestamp
+            }
+        }
+    }
+}
+
+transform {
+}
+
+sink {
+      RabbitMQ {
+          host = "rabbitmq-e2e"
+          port = 5672
+          virtual_host = "/"
+          username = "guest"
+          password = "guest"
+          queue_name = "test1"
+      }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 3faa4c868..d31cc7534 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -35,6 +35,7 @@
         <module>connector-cassandra-e2e</module>
         <module>connector-neo4j-e2e</module>
         <module>connector-http-e2e</module>
+        <module>connector-rabbitmq-e2e</module>
         <module>connector-kafka-e2e</module>
     </modules>
 

Reply via email to