RocMarshal commented on a change in pull request #15140:
URL: https://github.com/apache/flink/pull/15140#discussion_r774276762



##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source enumerator provides the source readers with the split. All 
source readers receive the
+ * same split as it only contains information about the connection and in case 
of exactly-once, the
+ * seen correlation ids. But in this case, the enumerator makes sure that at 
maximum one source
+ * reader receives the split. During exactly-once if multiple reader should be 
assigned a split a
+ * {@link RuntimeException} is thrown.
+ */
+public class RabbitMQSourceEnumerator
+        implements SplitEnumerator<RabbitMQSourceSplit, 
RabbitMQSourceEnumState> {
+    private final SplitEnumeratorContext<RabbitMQSourceSplit> context;
+    private final ConsistencyMode consistencyMode;
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSourceEnumerator.class);
+    private RabbitMQSourceSplit split;
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName,
+            RabbitMQSourceEnumState enumState) {
+        // The enumState is not used since the enumerator has no state in this 
architecture.
+        this(context, consistencyMode, connectionConfig, rmqQueueName);
+    }
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName) {
+        this.context = requireNonNull(context);
+        this.consistencyMode = requireNonNull(consistencyMode);
+        this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName);
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Start RabbitMQ source enumerator");
+    }
+
+    @Override
+    public void handleSplitRequest(int i, @Nullable String s) {
+        LOG.info("Split request from reader " + i);
+        assignSplitToReader(i, split);
+    }
+
+    @Override
+    public void addSplitsBack(List<RabbitMQSourceSplit> list, int i) {
+        if (list.size() == 0) {

Review comment:
       ```suggestion
           if (list == null || list.size() == 0) {
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import 
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides 
at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and 
exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ *     .builder()
+ *     .setConnectionConfig(connectionConfig)
+ *     .setQueueName("queue")
+ *     .setSerializationSchema(new SimpleStringSchema())
+ *     .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via 
{@link
+ * RabbitMQConnectionConfig}. It contains required information for the 
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a 
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and 
a {@link
+ * SerializationSchema} for the sink input type is required. {@code 
publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an 
acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged 
messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be 
restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to 
achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages 
that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single 
transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of 
the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the 
performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer 
than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once 
or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages 
that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void, 
RabbitMQSinkWriterState<T>, Void> {
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private final SerializationSchema<T> serializationSchema;
+    private final RabbitMQSinkPublishOptions<T> publishOptions;
+    private final ConsistencyMode consistencyMode;
+    private final SerializableReturnListener returnListener;
+
+    private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = 
ConsistencyMode.AT_MOST_ONCE;
+
+    private RabbitMQSink(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            ConsistencyMode consistencyMode,
+            @Nullable SerializableReturnListener returnListener,
+            @Nullable RabbitMQSinkPublishOptions<T> publishOptions) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.serializationSchema = requireNonNull(serializationSchema);
+        this.consistencyMode = requireNonNull(consistencyMode);
+
+        this.returnListener = returnListener;
+
+        Preconditions.checkState(
+                verifyPublishOptions(),
+                "If consistency mode is stronger than at-most-once and publish 
options are defined"
+                        + "then publish options need a deserialization 
schema");
+        this.publishOptions = publishOptions;
+    }
+
+    private boolean verifyPublishOptions() {
+        // If at-most-once, doesnt matter if publish options are provided (no 
state in writer)

Review comment:
       ```suggestion
           // If at-most-once, doesn't matter if publish options are provided 
(no state in writer)
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source enumerator provides the source readers with the split. All 
source readers receive the
+ * same split as it only contains information about the connection and in case 
of exactly-once, the
+ * seen correlation ids. But in this case, the enumerator makes sure that at 
maximum one source
+ * reader receives the split. During exactly-once if multiple reader should be 
assigned a split a
+ * {@link RuntimeException} is thrown.
+ */
+public class RabbitMQSourceEnumerator
+        implements SplitEnumerator<RabbitMQSourceSplit, 
RabbitMQSourceEnumState> {
+    private final SplitEnumeratorContext<RabbitMQSourceSplit> context;
+    private final ConsistencyMode consistencyMode;
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSourceEnumerator.class);
+    private RabbitMQSourceSplit split;
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName,
+            RabbitMQSourceEnumState enumState) {
+        // The enumState is not used since the enumerator has no state in this 
architecture.
+        this(context, consistencyMode, connectionConfig, rmqQueueName);
+    }
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName) {
+        this.context = requireNonNull(context);
+        this.consistencyMode = requireNonNull(consistencyMode);
+        this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName);
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Start RabbitMQ source enumerator");
+    }
+
+    @Override
+    public void handleSplitRequest(int i, @Nullable String s) {
+        LOG.info("Split request from reader " + i);

Review comment:
       ```suggestion
           LOG.info("Split request from reader {}.", i);
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import 
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides 
at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and 
exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ *     .builder()
+ *     .setConnectionConfig(connectionConfig)
+ *     .setQueueName("queue")
+ *     .setSerializationSchema(new SimpleStringSchema())
+ *     .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via 
{@link
+ * RabbitMQConnectionConfig}. It contains required information for the 
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a 
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and 
a {@link
+ * SerializationSchema} for the sink input type is required. {@code 
publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an 
acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged 
messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be 
restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to 
achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages 
that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single 
transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of 
the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the 
performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer 
than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once 
or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages 
that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void, 
RabbitMQSinkWriterState<T>, Void> {
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private final SerializationSchema<T> serializationSchema;
+    private final RabbitMQSinkPublishOptions<T> publishOptions;
+    private final ConsistencyMode consistencyMode;
+    private final SerializableReturnListener returnListener;
+
+    private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = 
ConsistencyMode.AT_MOST_ONCE;
+
+    private RabbitMQSink(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            ConsistencyMode consistencyMode,
+            @Nullable SerializableReturnListener returnListener,
+            @Nullable RabbitMQSinkPublishOptions<T> publishOptions) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.serializationSchema = requireNonNull(serializationSchema);
+        this.consistencyMode = requireNonNull(consistencyMode);
+
+        this.returnListener = returnListener;
+
+        Preconditions.checkState(
+                verifyPublishOptions(),
+                "If consistency mode is stronger than at-most-once and publish 
options are defined"

Review comment:
       ```suggestion
                   "If consistency mode is stronger than at-most-once and 
publish options are defined "
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq2/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.13-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId>
+       <name>Flink : Connectors : RabbitMQ2</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <rabbitmq.version>5.9.0</rabbitmq.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.rabbitmq</groupId>
+                       <artifactId>amqp-client</artifactId>
+                       <version>${rabbitmq.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>rabbitmq</artifactId>
+                       <version>1.15.1</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>

Review comment:
       ```suggestion
                        <scope>test</scope>
                        <type>test-jar</type>
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumState;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumStateSerializer;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtMostOnce;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderExactlyOnce;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import 
org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It 
provides
+ * at-most-once, at-least-once and exactly-once processing semantics. For 
at-least-once and
+ * exactly-once, checkpointing needs to be enabled. The source operates as a 
StreamingSource and
+ * thus works in a streaming fashion. Please use a {@link 
RabbitMQSourceBuilder} to construct the
+ * source. The following example shows how to create a RabbitMQSource emitting 
records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * RabbitMQSource<String> source = RabbitMQSource
+ *     .<String>builder()
+ *     .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+ *     .setQueueName("myQueue")
+ *     .setDeliveryDeserializer(new SimpleStringSchema())
+ *     .setConsistencyMode(MY_CONSISTENCY_MODE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the source a {@code connectionConfig} must be specified 
via {@link
+ * RabbitMQConnectionConfig}. It contains required information for the 
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a 
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to consume from 
and a {@link
+ * DeserializationSchema}
+ *
+ * <p>When using at-most-once consistency, messages are automatically 
acknowledged when received
+ * from RabbitMQ and later consumed by the output. In case of a failure, 
messages might be lost.
+ * More details in {@link RabbitMQSourceReaderAtMostOnce}.
+ *
+ * <p>In case of at-least-once consistency, message are buffered and later 
consumed by the output.
+ * Once a checkpoint is finished, the messages that were consumed by the 
output are acknowledged to
+ * RabbitMQ. This way, we ensure that the messages are successfully received 
by the output. In case
+ * of a system failure, the message that were acknowledged to RabbitMQ will be 
resend by RabbitMQ.
+ * More details in {@link RabbitMQSourceReaderAtLeastOnce}.
+ *
+ * <p>To ensure exactly-once consistency, messages are deduplicated through 
{@code correlationIds}.
+ * Similar to at-least-once consistency, we store the {@code deliveryTags} of 
the messages that are
+ * consumed by the output to acknowledge them later. A transactional RabbitMQ 
channel is used to
+ * ensure that all messages are successfully acknowledged to RabbitMQ. More 
details in {@link
+ * RabbitMQSourceReaderExactlyOnce}.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and 
performance will drop. Under
+ * heavy load, checkpoints can be delayed if a transaction takes longer than 
the specified
+ * checkpointing interval.
+ *
+ * @param <T> the output type of the source.
+ */
+public class RabbitMQSource<T>
+        implements Source<T, RabbitMQSourceSplit, RabbitMQSourceEnumState>, 
ResultTypeQueryable<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSource.class);
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private final DeserializationSchema<T> deserializationSchema;
+    private final ConsistencyMode consistencyMode;
+
+    private RabbitMQSource(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            DeserializationSchema<T> deserializationSchema,
+            ConsistencyMode consistencyMode) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.deserializationSchema = requireNonNull(deserializationSchema);
+        this.consistencyMode = requireNonNull(consistencyMode);
+
+        LOG.info("Create RabbitMQ source");
+    }
+
+    /**
+     * Get a {@link RabbitMQSourceBuilder} for the source.
+     *
+     * @param <T> type of the source.
+     * @return a source builder
+     * @see RabbitMQSourceBuilder
+     */
+    public static <T> RabbitMQSourceBuilder<T> builder() {
+        return new RabbitMQSourceBuilder<>();
+    }
+
+    /**
+     * The boundedness is always continuous unbounded as this is a 
streaming-only source.
+     *
+     * @return Boundedness continuous unbounded.
+     * @see Boundedness
+     */
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    /**
+     * Returns a new initialized source reader of the source's consistency 
mode.
+     *
+     * @param sourceReaderContext context which the reader will be created in.
+     * @return RabbitMQSourceReader a source reader of the specified 
consistency type.
+     * @see RabbitMQSourceReaderBase
+     */
+    @Override
+    public SourceReader<T, RabbitMQSourceSplit> createReader(
+            SourceReaderContext sourceReaderContext) {
+        LOG.info("New Source Reader of type " + consistencyMode + " 
requested.");

Review comment:
       ```suggestion
           LOG.info("New Source Reader of type {} requested.", consistencyMode);
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq2/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.13-SNAPSHOT</version>

Review comment:
       ```suggestion
                <version>1.15-SNAPSHOT</version>
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq2/README.md
##########
@@ -0,0 +1,24 @@
+# License of the Rabbit MQ Connector

Review comment:
       nit: What about merging `README.md` of RMQSource and `README.md` of 
RMQSink into this `RMQSource` ?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkITCase.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQContainerClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ sink with different consistency modes. As the 
tests are working a lot
+ * with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSinkITCase extends RabbitMQBaseTest {
+
+    private static AtomicBoolean shouldFail;
+
+    @Before
+    public void setup() {
+        shouldFail = new AtomicBoolean(true);
+    }
+
+    private class GeneratorFailureSource implements SourceFunction<String> {

Review comment:
       ```suggestion
       private static class GeneratorFailureSource implements 
SourceFunction<String> {
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source enumerator provides the source readers with the split. All 
source readers receive the
+ * same split as it only contains information about the connection and in case 
of exactly-once, the
+ * seen correlation ids. But in this case, the enumerator makes sure that at 
maximum one source
+ * reader receives the split. During exactly-once if multiple reader should be 
assigned a split a
+ * {@link RuntimeException} is thrown.
+ */
+public class RabbitMQSourceEnumerator
+        implements SplitEnumerator<RabbitMQSourceSplit, 
RabbitMQSourceEnumState> {
+    private final SplitEnumeratorContext<RabbitMQSourceSplit> context;
+    private final ConsistencyMode consistencyMode;
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSourceEnumerator.class);
+    private RabbitMQSourceSplit split;
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName,
+            RabbitMQSourceEnumState enumState) {
+        // The enumState is not used since the enumerator has no state in this 
architecture.
+        this(context, consistencyMode, connectionConfig, rmqQueueName);
+    }
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName) {
+        this.context = requireNonNull(context);
+        this.consistencyMode = requireNonNull(consistencyMode);
+        this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName);
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Start RabbitMQ source enumerator");
+    }
+
+    @Override
+    public void handleSplitRequest(int i, @Nullable String s) {
+        LOG.info("Split request from reader " + i);
+        assignSplitToReader(i, split);
+    }
+
+    @Override
+    public void addSplitsBack(List<RabbitMQSourceSplit> list, int i) {
+        if (list.size() == 0) {
+            return;
+        }
+
+        // Every Source Reader will only receive one split, thus we will never 
get back more.
+        if (list.size() != 1) {
+            throw new RuntimeException(
+                    "There should only be one split added back at a time. per 
reader");
+        }
+
+        LOG.info("Split returned from reader " + i);
+        // In case of exactly-once (parallelism 1) the single split gets 
updated with the
+        // correlation ids and in case of a recovery we have to store this 
split until we can
+        // assign it to the recovered reader.
+        split = list.get(0);
+    }
+
+    /**
+     * In the case of exactly-once multiple readers are not allowed.
+     *
+     * @see RabbitMQSourceEnumerator#assignSplitToReader(int, 
RabbitMQSourceSplit)
+     * @param i reader id
+     */
+    @Override
+    public void addReader(int i) {}
+
+    /** @return empty enum state object */
+    @Override
+    public RabbitMQSourceEnumState snapshotState() {

Review comment:
       ```suggestion
       public RabbitMQSourceEnumState snapshotState(long checkpointId) throws 
Exception {
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq2/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.13-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId>
+       <name>Flink : Connectors : RabbitMQ2</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <rabbitmq.version>5.9.0</rabbitmq.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.rabbitmq</groupId>
+                       <artifactId>amqp-client</artifactId>
+                       <version>${rabbitmq.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>rabbitmq</artifactId>
+                       <version>1.15.1</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

Review comment:
       ```suggestion
                        <artifactId>flink-streaming-java</artifactId>
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source enumerator provides the source readers with the split. All 
source readers receive the
+ * same split as it only contains information about the connection and in case 
of exactly-once, the
+ * seen correlation ids. But in this case, the enumerator makes sure that at 
maximum one source
+ * reader receives the split. During exactly-once if multiple reader should be 
assigned a split a
+ * {@link RuntimeException} is thrown.
+ */
+public class RabbitMQSourceEnumerator
+        implements SplitEnumerator<RabbitMQSourceSplit, 
RabbitMQSourceEnumState> {
+    private final SplitEnumeratorContext<RabbitMQSourceSplit> context;
+    private final ConsistencyMode consistencyMode;
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSourceEnumerator.class);
+    private RabbitMQSourceSplit split;
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName,
+            RabbitMQSourceEnumState enumState) {
+        // The enumState is not used since the enumerator has no state in this 
architecture.
+        this(context, consistencyMode, connectionConfig, rmqQueueName);
+    }
+
+    public RabbitMQSourceEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> context,
+            ConsistencyMode consistencyMode,
+            RabbitMQConnectionConfig connectionConfig,
+            String rmqQueueName) {
+        this.context = requireNonNull(context);
+        this.consistencyMode = requireNonNull(consistencyMode);
+        this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName);
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Start RabbitMQ source enumerator");
+    }
+
+    @Override
+    public void handleSplitRequest(int i, @Nullable String s) {
+        LOG.info("Split request from reader " + i);
+        assignSplitToReader(i, split);
+    }
+
+    @Override
+    public void addSplitsBack(List<RabbitMQSourceSplit> list, int i) {
+        if (list.size() == 0) {
+            return;
+        }
+
+        // Every Source Reader will only receive one split, thus we will never 
get back more.
+        if (list.size() != 1) {
+            throw new RuntimeException(
+                    "There should only be one split added back at a time. per 
reader");
+        }
+
+        LOG.info("Split returned from reader " + i);

Review comment:
       ```suggestion
           LOG.info("Split returned from reader {}.", i);
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq2/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.13-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId>
+       <name>Flink : Connectors : RabbitMQ2</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <rabbitmq.version>5.9.0</rabbitmq.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

Review comment:
       ```suggestion
                        <artifactId>flink-streaming-java</artifactId>
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq2/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.13-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId>
+       <name>Flink : Connectors : RabbitMQ2</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <rabbitmq.version>5.9.0</rabbitmq.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.rabbitmq</groupId>
+                       <artifactId>amqp-client</artifactId>
+                       <version>${rabbitmq.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>rabbitmq</artifactId>
+                       <version>1.15.1</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>

Review comment:
       ```suggestion
                        <artifactId>flink-test-utils</artifactId>
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq2/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.13-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId>

Review comment:
       ```suggestion
        <artifactId>flink-connector-rabbitmq2</artifactId>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to