fapaul commented on a change in pull request #15140:
URL: https://github.com/apache/flink/pull/15140#discussion_r596803085



##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+
+import com.rabbitmq.client.ConfirmCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has 
at-least-once semantics,
+ * meaning it guarantees that outgoing message arrive at RabbitMQ at least 
once.
+ *
+ * <p>At-least-once consistency is implemented by assigning sequence numbers 
to arriving messages
+ * and buffering them together in the state of the writer until an ack arrives.
+ *
+ * <p>Checkpointing is required for at-least-once to work because messages are 
resend only when a
+ * checkpoint is triggered (to avoid complex time tracking mechanisms for each 
individual message).
+ * Thus on each checkpoint, all messages which were sent at least once before 
to RabbitMQ but are
+ * still unacknowledged will be send once again - duplications are possible by 
this behavior.
+ *
+ * <p>After a failure, a new writer gets initialized with one or more states 
that contain
+ * unacknowledged messages. These messages get resend immediately while 
buffering them in the new
+ * state of the writer.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterAtLeastOnce<T> extends 
RabbitMQSinkWriterBase<T> {
+    protected final ConcurrentNavigableMap<Long, 
RabbitMQSinkMessageWrapper<T>> outstandingConfirms;
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkWriterAtLeastOnce.class);
+
+    private Set<Long> lastSeenMessageIds;
+
+    /**
+     * Create a new RabbitMQSinkWriterAtLeastOnce.
+     *
+     * @param connectionConfig configuration parameters used to connect to 
RabbitMQ
+     * @param queueName name of the queue to publish to
+     * @param serializationSchema serialization schema to turn elements into 
byte representation
+     * @param publishOptions optionally used to compute routing/exchange for 
messages
+     * @param returnListener returnListener
+     */
+    public RabbitMQSinkWriterAtLeastOnce(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            RabbitMQSinkPublishOptions<T> publishOptions,
+            SerializableReturnListener returnListener) {
+        super(connectionConfig, queueName, serializationSchema, 
publishOptions, returnListener);
+        this.outstandingConfirms = new ConcurrentSkipListMap<>();
+        this.lastSeenMessageIds = new HashSet<>();
+    }
+
+    /**
+     * On recover all stored messages in the states get resend.
+     *
+     * @param states a list of states to recover the reader with
+     * @throws IOException as messages are send to RabbitMQ
+     */
+    @Override
+    public void recoverFromStates(List<RabbitMQSinkWriterState<T>> states) 
throws IOException {
+        for (RabbitMQSinkWriterState<T> state : states) {
+            for (RabbitMQSinkMessageWrapper<T> message : 
state.getOutstandingMessages()) {
+                send(message);
+            }
+        }
+    }
+
+    private void send(RabbitMQSinkMessageWrapper<T> msg) throws IOException {
+        long sequenceNumber = getRmqChannel().getNextPublishSeqNo();
+        getRmqSinkConnection().send(msg);
+        outstandingConfirms.put(sequenceNumber, msg);
+    }
+
+    private void resendMessages() throws IOException {
+        Set<Long> temp = outstandingConfirms.keySet();
+        Set<Long> messagesToResend = new HashSet<>(temp);
+        messagesToResend.retainAll(lastSeenMessageIds);
+        for (Long id : messagesToResend) {
+            // remove the old message from the map, since the message was 
added a second time
+            // under a new id or is put into the list of messages to resend
+            RabbitMQSinkMessageWrapper<T> msg = outstandingConfirms.remove(id);
+            if (msg != null) {
+                send(msg);
+            }
+        }
+        lastSeenMessageIds = temp;
+    }
+
+    private ConfirmCallback handleAcknowledgements() {
+        return (sequenceNumber, multiple) -> {
+            // multiple flag indicates that all messages < sequenceNumber can 
be safely acknowledged
+            if (multiple) {
+                // create a view of the portion of the map that contains keys 
< sequenceNumber
+                ConcurrentNavigableMap<Long, RabbitMQSinkMessageWrapper<T>> 
confirmed =
+                        outstandingConfirms.headMap(sequenceNumber, true);
+                // changes to the view are reflected in the original map
+                confirmed.clear();
+            } else {
+                outstandingConfirms.remove(sequenceNumber);
+            }
+        };
+    }
+
+    private ConfirmCallback handleNegativeAcknowledgements() {
+        return (sequenceNumber, multiple) -> {
+            RabbitMQSinkMessageWrapper<T> message = 
outstandingConfirms.get(sequenceNumber);
+            LOG.error(
+                    "Message with body {} has been nack-ed. Sequence number: 
{}, multiple: {}",
+                    message.getMessage(),
+                    sequenceNumber,
+                    multiple);
+        };
+    }
+
+    @Override
+    protected void configureChannel() throws IOException {
+        ConfirmCallback ackCallback = handleAcknowledgements();
+        ConfirmCallback nackCallback = handleNegativeAcknowledgements();
+        // register callbacks for cases of ack and negative ack of messages 
(seq numbers)
+        getRmqChannel().addConfirmListener(ackCallback, nackCallback);
+        getRmqChannel().confirmSelect();
+    }
+
+    /**
+     * All messages that are sent to RabbitMQ and not acknowledge yet will be 
resend. A single state

Review comment:
       ```suggestion
        * All messages that are sent to RabbitMQ and not acknowledged yet will 
be resend. A single state
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/common/RabbitMQMessageWrapper.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.common;
+
+/**
+ * A wrapper class for the message received from RabbitMQ that holds the 
deserialized message, the
+ * delivery tag and the correlation id.
+ *
+ * @param <T> The type of the message to hold.
+ */
+public class RabbitMQMessageWrapper<T> {

Review comment:
       Duplicate of `RabbitMQSourceMessageWrapper` and therefore unused? 

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides 
exactly-once delivery
+ * guarantees, which means incoming stream elements will be delivered to 
RabbitMQ exactly once. For
+ * this, checkpointing needs to be enabled.
+ *
+ * <p>Exactly-once consistency is implemented using a transactional RabbitMQ 
channel. All incoming
+ * stream elements are buffered in the state of this writer until the next 
checkpoint is triggered.
+ * All buffered {@code messages} are then send to RabbitMQ in a single 
transaction. When successful,
+ * all messages committed get removed from the state. If the transaction is 
aborted, all messages
+ * are put back into the state and send on the next checkpoint.
+ *
+ * <p>The transactional channel is heavyweight and will decrease throughput. 
If the system is under
+ * heavy load, consecutive checkpoints can be delayed if commits take longer 
than the checkpointing
+ * interval specified. Only use exactly-once if necessary (no duplicated 
messages in RabbitMQ
+ * allowed), otherwise consider using at-least-once.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterExactlyOnce<T> extends 
RabbitMQSinkWriterBase<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkWriterExactlyOnce.class);
+
+    // All messages that arrived and could not be committed this far.
+    private List<RabbitMQSinkMessageWrapper<T>> messages;
+
+    /**
+     * Create a new RabbitMQSinkWriterExactlyOnce.
+     *
+     * @param connectionConfig configuration parameters used to connect to 
RabbitMQ
+     * @param queueName name of the queue to publish to
+     * @param serializationSchema serialization schema to turn elements into 
byte representation
+     * @param publishOptions optionally used to compute routing/exchange for 
messages
+     * @param returnListener return listener
+     */
+    public RabbitMQSinkWriterExactlyOnce(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            RabbitMQSinkPublishOptions<T> publishOptions,
+            SerializableReturnListener returnListener) {
+        super(connectionConfig, queueName, serializationSchema, 
publishOptions, returnListener);
+        messages = Collections.synchronizedList(new ArrayList<>());

Review comment:
       ```suggestion
           this.messages = Collections.synchronizedList(new ArrayList<>());
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.common;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class provides a RabbitMQ container client which allows creating 
queues, sending messages to
+ * RabbitMQ and get the messages received by RabbitMQ.
+ */
+public class RabbitMQContainerClient<T> {
+
+    private final RabbitMQContainer container;
+    private Channel channel;
+    private final Queue<byte[]> messages;
+    private String queueName;
+    private CountDownLatch latch;
+    private DeserializationSchema<T> valueDeserializer;
+
+    public RabbitMQContainerClient(
+            RabbitMQContainer container,
+            DeserializationSchema<T> valueDeserializer,
+            int countDownLatchSize) {
+        container.withExposedPorts(5762).waitingFor(Wait.forListeningPort());
+        this.container = container;
+        this.messages = new LinkedList<>();
+        this.latch = new CountDownLatch(countDownLatchSize);
+        this.valueDeserializer = valueDeserializer;
+    }
+
+    public RabbitMQContainerClient(RabbitMQContainer container) {
+        this(container, null, 0);
+    }
+
+    public String createQueue(String queueName, boolean withConsumer)
+            throws IOException, TimeoutException {
+        this.queueName = queueName;
+        Connection connection = getRabbitMQConnection();
+        this.channel = connection.createChannel();
+        channel.queueDeclare(queueName, true, false, false, null);
+        if (withConsumer) {
+            final DeliverCallback deliverCallback = 
this::handleMessageReceivedCallback;
+            channel.basicConsume(queueName, true, deliverCallback, consumerTag 
-> {});
+        }
+        return this.queueName;
+    }
+
+    public String createQueue() throws IOException, TimeoutException {
+        return createQueue(UUID.randomUUID().toString(), true);
+    }
+
+    public String createQueue(boolean withConsumer) throws IOException, 
TimeoutException {
+        return createQueue(UUID.randomUUID().toString(), withConsumer);
+    }
+
+    public <T> void sendMessages(SerializationSchema<T> valueSerializer, T... 
messages)
+            throws IOException {
+        for (T message : messages) {
+            channel.basicPublish("", queueName, null, 
valueSerializer.serialize(message));
+        }
+    }
+
+    public <T> void sendMessages(

Review comment:
       ```suggestion
       public void sendMessages(
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.GlobalBoolean;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQContainerClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the rabbitmq sink with different consistency modes. As the 
tests are working a lot
+ * with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSinkTest extends RabbitMQBaseTest {
+    @Test
+    public void atMostOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+
+        DataStream<String> stream = env.fromCollection(messages);
+        RabbitMQContainerClient client =
+                addSinkOn(stream, ConsistencyMode.AT_MOST_ONCE, 
messages.size());
+        env.execute();
+        client.await();
+
+        List<String> receivedMessages = client.getConsumedMessages();
+        assertEquals(messages, receivedMessages);
+    }
+
+    @Test
+    public void atLeastOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+        DataStream<String> stream = env.fromCollection(messages);
+        RabbitMQContainerClient client =
+                addSinkOn(stream, ConsistencyMode.AT_LEAST_ONCE, 
messages.size());
+
+        env.execute();
+        client.await();
+
+        List<String> receivedMessages = client.getConsumedMessages();
+        assertEquals(messages, receivedMessages);
+    }
+
+    @Test
+    public void atLeastOnceWithFlinkFailureTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+
+        GlobalBoolean shouldFail = new GlobalBoolean(true);

Review comment:
       Since this is the only place you use the `GlobalBoolean` why not use a 
simple static `AtomicBoolean` as part of this class?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/state/RabbitMQSinkWriterStateSerializerTest.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.state;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Test the sink writer state serializer. */
+public class RabbitMQSinkWriterStateSerializerTest {
+
+    private RabbitMQSinkWriterState<String> getSinkWriterState() {
+        List<RabbitMQSinkMessageWrapper<String>> outStandingMessages = new 
ArrayList<>();
+        SimpleStringSchema serializer = new SimpleStringSchema();
+        for (int i = 0; i < 5; i++) {
+            String message = "Message " + i;
+            RabbitMQSinkMessageWrapper<String> messageWrapper =
+                    new RabbitMQSinkMessageWrapper<>(message, 
serializer.serialize(message));
+            outStandingMessages.add(messageWrapper);
+        }
+        return new RabbitMQSinkWriterState<>(outStandingMessages);
+    }
+
+    @Test
+    public void testWriterStateSerializer() throws IOException {
+        RabbitMQSinkWriterState<String> writerState = getSinkWriterState();
+        RabbitMQSinkWriterStateSerializer<String> serializer =
+                new RabbitMQSinkWriterStateSerializer<>();
+
+        byte[] serializedWriterState = serializer.serialize(writerState);
+        RabbitMQSinkWriterState<String> deserializedWriterState =
+                serializer.deserialize(serializer.getVersion(), 
serializedWriterState);
+
+        List<byte[]> expectedBytes =
+                writerState.getOutstandingMessages().stream()
+                        .map(RabbitMQSinkMessageWrapper::getBytes)
+                        .collect(Collectors.toList());
+        List<byte[]> actualBytes =
+                deserializedWriterState.getOutstandingMessages().stream()
+                        .map(RabbitMQSinkMessageWrapper::getBytes)
+                        .collect(Collectors.toList());
+
+        assertEquals(expectedBytes.size(), actualBytes.size());
+        for (int i = 0; i < expectedBytes.size(); i++) {
+            Assert.assertArrayEquals(expectedBytes.get(i), actualBytes.get(i));
+        }
+
+        List<String> actualMessages =
+                deserializedWriterState.getOutstandingMessages().stream()
+                        .map(RabbitMQSinkMessageWrapper::getMessage)
+                        .collect(Collectors.toList());
+
+        for (String message : actualMessages) {
+            assertNull(message);
+        }
+    }
+
+    @Test
+    public void testWriterStateSerializerWithDeserializationSchema() throws 
IOException {
+        RabbitMQSinkWriterState<String> writerState = getSinkWriterState();
+        SimpleStringSchema deserializer = new SimpleStringSchema();
+        RabbitMQSinkWriterStateSerializer<String> serializer =
+                new RabbitMQSinkWriterStateSerializer<>(deserializer);
+
+        byte[] serializedWriterState = serializer.serialize(writerState);
+        RabbitMQSinkWriterState<String> deserializedWriterState =
+                serializer.deserialize(serializer.getVersion(), 
serializedWriterState);
+
+        List<String> expectedMessages =
+                writerState.getOutstandingMessages().stream()
+                        .map(RabbitMQSinkMessageWrapper::getMessage)
+                        .collect(Collectors.toList());
+        List<byte[]> expectedBytes =
+                writerState.getOutstandingMessages().stream()
+                        .map(RabbitMQSinkMessageWrapper::getBytes)
+                        .collect(Collectors.toList());
+
+        List<String> actualMessages =
+                writerState.getOutstandingMessages().stream()

Review comment:
       ```suggestion
                   deserializedWriterState.getOutstandingMessages().stream()
   ```
   ?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides 
exactly-once delivery
+ * guarantees, which means incoming stream elements will be delivered to 
RabbitMQ exactly once. For
+ * this, checkpointing needs to be enabled.
+ *
+ * <p>Exactly-once consistency is implemented using a transactional RabbitMQ 
channel. All incoming
+ * stream elements are buffered in the state of this writer until the next 
checkpoint is triggered.
+ * All buffered {@code messages} are then send to RabbitMQ in a single 
transaction. When successful,
+ * all messages committed get removed from the state. If the transaction is 
aborted, all messages
+ * are put back into the state and send on the next checkpoint.
+ *
+ * <p>The transactional channel is heavyweight and will decrease throughput. 
If the system is under
+ * heavy load, consecutive checkpoints can be delayed if commits take longer 
than the checkpointing
+ * interval specified. Only use exactly-once if necessary (no duplicated 
messages in RabbitMQ
+ * allowed), otherwise consider using at-least-once.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterExactlyOnce<T> extends 
RabbitMQSinkWriterBase<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkWriterExactlyOnce.class);
+
+    // All messages that arrived and could not be committed this far.
+    private List<RabbitMQSinkMessageWrapper<T>> messages;
+
+    /**
+     * Create a new RabbitMQSinkWriterExactlyOnce.
+     *
+     * @param connectionConfig configuration parameters used to connect to 
RabbitMQ
+     * @param queueName name of the queue to publish to
+     * @param serializationSchema serialization schema to turn elements into 
byte representation
+     * @param publishOptions optionally used to compute routing/exchange for 
messages
+     * @param returnListener return listener
+     */
+    public RabbitMQSinkWriterExactlyOnce(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            RabbitMQSinkPublishOptions<T> publishOptions,
+            SerializableReturnListener returnListener) {
+        super(connectionConfig, queueName, serializationSchema, 
publishOptions, returnListener);
+        messages = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    /**
+     * On recover the messages are set to the outstanding messages from the 
states.
+     *
+     * @param states a list of states to recover the reader with
+     */
+    @Override
+    public void recoverFromStates(List<RabbitMQSinkWriterState<T>> states) {
+        List<RabbitMQSinkMessageWrapper<T>> messages =
+                Collections.synchronizedList(new ArrayList<>());
+        for (RabbitMQSinkWriterState<T> state : states) {
+            messages.addAll(state.getOutstandingMessages());
+        }
+        this.messages = messages;

Review comment:
       Why do we have to overwrite messages and do not add to `this.messages` 
directly?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.common;
+
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class provides basic RabbitMQ functionality and common behaviour such 
as establishing and
+ * closing a connection via the {@code connectionConfig}. In addition, it 
provides methods for
+ * serializing and sending messages to RabbitMQ (with or without publish 
options).
+ *
+ * @param <T> The type of the messages that are published
+ */
+public class RabbitMQSinkConnection<T> {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkConnection.class);
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    @Nullable private final RabbitMQSinkPublishOptions<T> publishOptions;
+
+    @Nullable private final SerializableReturnListener returnListener;
+
+    public RabbitMQSinkConnection(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            @Nullable RabbitMQSinkPublishOptions<T> publishOptions,
+            @Nullable SerializableReturnListener returnListener) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.publishOptions = publishOptions;
+        this.returnListener = returnListener;
+    }
+
+    /**
+     * Setup the RabbitMQ connection and a channel to send messages to.
+     *
+     * @throws Exception that might occur when setting up the connection and 
channel.
+     */
+    public void setupRabbitMQ() throws Exception {
+        LOG.info("Setup RabbitMQ");
+        this.rmqConnection = setupConnection(connectionConfig);
+        this.rmqChannel = setupChannel(rmqConnection, queueName, 
returnListener);
+    }
+
+    private Connection setupConnection(RabbitMQConnectionConfig 
connectionConfig) throws Exception {
+        return connectionConfig.getConnectionFactory().newConnection();
+    }
+
+    private Channel setupChannel(
+            Connection rmqConnection, String queueName, 
SerializableReturnListener returnListener)
+            throws IOException {
+        final Channel rmqChannel = rmqConnection.createChannel();
+        rmqChannel.queueDeclare(queueName, true, false, false, null);
+        if (returnListener != null) {
+            rmqChannel.addReturnListener(returnListener);
+        }
+        return rmqChannel;
+    }
+
+    /**
+     * Only used by at-least-once and exactly-once for resending messages that 
could not be
+     * delivered.
+     *
+     * @param message sink message wrapping the atomic message object
+     */
+    public void send(RabbitMQSinkMessageWrapper<T> message) throws IOException 
{
+        send(message.getMessage(), message.getBytes());
+    }
+
+    /**
+     * Publish a message to a queue in RabbitMQ. With publish options enabled, 
first compute the
+     * necessary publishing information.
+     *
+     * @param message original message, only required for publishing with 
publish options present
+     * @param serializedMessage serialized message to send to RabbitMQ
+     */
+    public void send(T message, byte[] serializedMessage) throws IOException {
+        if (publishOptions == null) {
+            rmqChannel.basicPublish("", queueName, null, serializedMessage);
+        } else {
+            publishWithOptions(message, serializedMessage);
+        }
+    }
+
+    private void publishWithOptions(T message, byte[] serializedMessage) 
throws IOException {
+        if (publishOptions == null) {
+            throw new RuntimeException("Try to publish with options without 
publishOptions.");
+        }
+
+        boolean mandatory = publishOptions.computeMandatory(message);
+        boolean immediate = publishOptions.computeImmediate(message);
+
+        Preconditions.checkState(
+                !(returnListener == null && (mandatory || immediate)),
+                "Setting mandatory and/or immediate flags to true requires a 
ReturnListener.");
+
+        String rk = publishOptions.computeRoutingKey(message);
+        String exchange = publishOptions.computeExchange(message);
+
+        rmqChannel.basicPublish(
+                exchange,
+                rk,
+                mandatory,
+                immediate,
+                publishOptions.computeProperties(message),
+                serializedMessage);
+    }
+
+    public void close() throws Exception {
+        // close the channel
+        if (rmqChannel != null) {
+            rmqChannel.close();
+        }
+
+        // close the connection
+        if (rmqConnection != null) {
+            rmqConnection.close();
+        }
+    }
+
+    public Channel getRmqChannel() {
+        return rmqChannel;
+    }

Review comment:
       Please add docs for the public methods

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.reader;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.connector.rabbitmq2.source.common.RabbitMQSourceMessageWrapper;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source reader for RabbitMQ queues. This is the base class of the 
different consistency modes.
+ *
+ * @param <T> The output type of the source.
+ */
+public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T, 
RabbitMQSourceSplit> {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSourceReaderBase.class);
+
+    // The assigned split from the enumerator.
+    private RabbitMQSourceSplit split;
+
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    private final SourceReaderContext sourceReaderContext;
+    // The deserialization schema for the messages of RabbitMQ.
+    private final DeserializationSchema<T> deliveryDeserializer;
+    // The collector keeps the messages received from RabbitMQ.
+    private final RabbitMQCollector<T> collector;
+
+    public RabbitMQSourceReaderBase(
+            SourceReaderContext sourceReaderContext,
+            DeserializationSchema<T> deliveryDeserializer) {
+        this.sourceReaderContext = requireNonNull(sourceReaderContext);
+        this.deliveryDeserializer = requireNonNull(deliveryDeserializer);
+        this.collector = new RabbitMQCollector<>();
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting source reader and send split request");
+        sourceReaderContext.sendSplitRequest();
+    }
+
+    // ------------- start RabbitMQ methods  --------------
+
+    private void setupRabbitMQ() {
+        try {
+            setupConnection();
+            setupChannel();
+            LOG.info(
+                    "RabbitMQ Connection was successful: Waiting for messages 
from the queue. To exit press CTRL+C");
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+        }
+    }
+
+    private ConnectionFactory setupConnectionFactory() throws Exception {
+        return split.getConnectionConfig().getConnectionFactory();
+    }
+
+    private void setupConnection() throws Exception {
+        rmqConnection = setupConnectionFactory().newConnection();
+    }
+
+    /** @return boolean whether messages should be automatically acknowledged 
to RabbitMQ. */
+    protected abstract boolean isAutoAck();
+
+    /**
+     * This function will be called when a new message from RabbitMQ gets 
pushed to the source. The
+     * message will be deserialized and forwarded to our message collector 
where it is buffered
+     * until it can be processed.
+     *
+     * @param consumerTag The consumer tag of the message.
+     * @param delivery The delivery from RabbitMQ.
+     * @throws IOException if something fails during deserialization.
+     */
+    protected void handleMessageReceivedCallback(String consumerTag, Delivery 
delivery)
+            throws IOException {
+
+        AMQP.BasicProperties properties = delivery.getProperties();
+        byte[] body = delivery.getBody();
+        Envelope envelope = delivery.getEnvelope();
+        collector.setMessageIdentifiers(properties.getCorrelationId(), 
envelope.getDeliveryTag());
+        deliveryDeserializer.deserialize(body, collector);
+    }
+
+    protected void setupChannel() throws IOException {
+        rmqChannel = rmqConnection.createChannel();
+        rmqChannel.queueDeclare(split.getQueueName(), true, false, false, 
null);
+
+        // Set maximum of unacknowledged messages
+        if (getSplit().getConnectionConfig().getPrefetchCount().isPresent()) {
+            // global: false - the prefetch count is set per consumer, not per 
RabbitMQ channel
+            
rmqChannel.basicQos(getSplit().getConnectionConfig().getPrefetchCount().get(), 
false);
+        }
+
+        final DeliverCallback deliverCallback = 
this::handleMessageReceivedCallback;
+        rmqChannel.basicConsume(
+                split.getQueueName(), isAutoAck(), deliverCallback, 
consumerTag -> {});
+    }
+
+    // ------------- end RabbitMQ methods  --------------
+
+    /**
+     * This method provides a hook that is called when a message gets polled 
by the output.
+     *
+     * @param message the message that was polled by the output.
+     */
+    protected void handleMessagePolled(RabbitMQSourceMessageWrapper<T> 
message) {}
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<T> output) {
+        RabbitMQSourceMessageWrapper<T> message = collector.pollMessage();
+        if (message == null) {
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+
+        output.collect(message.getMessage());
+        handleMessagePolled(message);
+
+        return collector.hasUnpolledMessages()
+                ? InputStatus.MORE_AVAILABLE
+                : InputStatus.NOTHING_AVAILABLE;
+    }
+
+    @Override
+    public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+        return split != null ? Collections.singletonList(split.copy()) : new 
ArrayList<>();
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return CompletableFuture.runAsync(
+                () -> {
+                    while (!collector.hasUnpolledMessages()) {
+                        // supposed to be empty
+                    }
+                });
+    }
+
+    /**
+     * Assign the split from the enumerator. If the source reader already has 
a split nothing
+     * happens. After the split is assigned, the connection to RabbitMQ can be 
setup.
+     *
+     * @param list RabbitMQSourceSplits with only one element.
+     * @see RabbitMQSourceEnumerator
+     * @see RabbitMQSourceSplit
+     */
+    @Override
+    public void addSplits(List<RabbitMQSourceSplit> list) {
+        if (split != null) {
+            return;
+        }
+        if (list.size() != 1) {
+            throw new RuntimeException("The number of added splits should be 
exaclty one.");
+        }
+        split = list.get(0);
+        setupRabbitMQ();
+    }
+
+    @Override
+    public void notifyNoMoreSplits() {}
+
+    @Override
+    public void handleSourceEvents(SourceEvent sourceEvent) {}

Review comment:
       Why did you keep this method? My previous comment was marked as resolved 
but I think this method is not necessary and you can delete it.

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumState;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumStateSerializer;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtMostOnce;
+import 
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderExactlyOnce;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import 
org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It 
provides
+ * at-most-once, at-least-once and exactly-once processing semantics. For 
at-least-once and
+ * exactly-once, checkpointing needs to be enabled. The source operates as a 
StreamingSource and
+ * thus works in a streaming fashion. Please use a {@link 
RabbitMQSourceBuilder} to construct the
+ * source. The following example shows how to create a RabbitMQSource emitting 
records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * RabbitMQSource<String> source = RabbitMQSource
+ *     .<String>builder()
+ *     .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+ *     .setQueueName("myQueue")
+ *     .setDeliveryDeserializer(new SimpleStringSchema())
+ *     .setConsistencyMode(MY_CONSISTENCY_MODE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the source a {@code connectionConfig} must be specified 
via {@link
+ * RabbitMQConnectionConfig}. It contains required information for the 
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a 
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to consume from 
and a {@link
+ * DeserializationSchema}
+ *
+ * <p>When using at-most-once consistency, messages are automatically 
acknowledged when received
+ * from RabbitMQ and later consumed by the output. In case of a failure, 
messages might be lost.
+ * More details in {@link RabbitMQSourceReaderAtMostOnce}.
+ *
+ * <p>In case of at-least-once consistency, message are buffered and later 
consumed by the output.
+ * Once a checkpoint is finished, the messages that were consumed by the 
output are acknowledged to
+ * RabbitMQ. This way, we ensure that the messages are successfully received 
by the output. In case
+ * of a system failure, the message that were acknowledged to RabbitMQ will be 
resend by RabbitMQ.
+ * More details in {@link RabbitMQSourceReaderAtLeastOnce}.
+ *
+ * <p>To ensure exactly-once consistency, messages are deduplicated through 
{@code correlationIds}.
+ * Similar to at-least-once consistency, we store the {@code deliveryTags} of 
the messages that are
+ * consumed by the output to acknowledge them later. A transactional RabbitMQ 
channel is used to
+ * ensure that all messages are successfully acknowledged to RabbitMQ. More 
details in {@link
+ * RabbitMQSourceReaderExactlyOnce}.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and 
performance will drop. Under
+ * heavy load, checkpoints can be delayed if a transaction takes longer than 
the specified
+ * checkpointing interval.
+ *
+ * @param <T> the output type of the source.
+ */
+public class RabbitMQSource<T>
+        implements Source<T, RabbitMQSourceSplit, RabbitMQSourceEnumState>, 
ResultTypeQueryable<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSource.class);
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private final DeserializationSchema<T> deserializationSchema;
+    private final ConsistencyMode consistencyMode;
+
+    protected RabbitMQSource(

Review comment:
       private?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.common;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class provides a RabbitMQ container client which allows creating 
queues, sending messages to
+ * RabbitMQ and get the messages received by RabbitMQ.
+ */
+public class RabbitMQContainerClient<T> {
+
+    private final RabbitMQContainer container;
+    private Channel channel;
+    private final Queue<byte[]> messages;
+    private String queueName;
+    private CountDownLatch latch;
+    private DeserializationSchema<T> valueDeserializer;
+
+    public RabbitMQContainerClient(
+            RabbitMQContainer container,
+            DeserializationSchema<T> valueDeserializer,
+            int countDownLatchSize) {
+        container.withExposedPorts(5762).waitingFor(Wait.forListeningPort());
+        this.container = container;
+        this.messages = new LinkedList<>();
+        this.latch = new CountDownLatch(countDownLatchSize);
+        this.valueDeserializer = valueDeserializer;
+    }
+
+    public RabbitMQContainerClient(RabbitMQContainer container) {
+        this(container, null, 0);
+    }
+
+    public String createQueue(String queueName, boolean withConsumer)
+            throws IOException, TimeoutException {
+        this.queueName = queueName;
+        Connection connection = getRabbitMQConnection();
+        this.channel = connection.createChannel();
+        channel.queueDeclare(queueName, true, false, false, null);
+        if (withConsumer) {
+            final DeliverCallback deliverCallback = 
this::handleMessageReceivedCallback;
+            channel.basicConsume(queueName, true, deliverCallback, consumerTag 
-> {});
+        }
+        return this.queueName;
+    }
+
+    public String createQueue() throws IOException, TimeoutException {
+        return createQueue(UUID.randomUUID().toString(), true);
+    }
+
+    public String createQueue(boolean withConsumer) throws IOException, 
TimeoutException {
+        return createQueue(UUID.randomUUID().toString(), withConsumer);
+    }
+
+    public <T> void sendMessages(SerializationSchema<T> valueSerializer, T... 
messages)

Review comment:
       ```suggestion
       public void sendMessages(SerializationSchema<T> valueSerializer, T... 
messages)
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides 
exactly-once delivery
+ * guarantees, which means incoming stream elements will be delivered to 
RabbitMQ exactly once. For
+ * this, checkpointing needs to be enabled.
+ *
+ * <p>Exactly-once consistency is implemented using a transactional RabbitMQ 
channel. All incoming
+ * stream elements are buffered in the state of this writer until the next 
checkpoint is triggered.
+ * All buffered {@code messages} are then send to RabbitMQ in a single 
transaction. When successful,
+ * all messages committed get removed from the state. If the transaction is 
aborted, all messages
+ * are put back into the state and send on the next checkpoint.
+ *
+ * <p>The transactional channel is heavyweight and will decrease throughput. 
If the system is under
+ * heavy load, consecutive checkpoints can be delayed if commits take longer 
than the checkpointing
+ * interval specified. Only use exactly-once if necessary (no duplicated 
messages in RabbitMQ
+ * allowed), otherwise consider using at-least-once.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterExactlyOnce<T> extends 
RabbitMQSinkWriterBase<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkWriterExactlyOnce.class);
+
+    // All messages that arrived and could not be committed this far.
+    private List<RabbitMQSinkMessageWrapper<T>> messages;
+
+    /**
+     * Create a new RabbitMQSinkWriterExactlyOnce.
+     *
+     * @param connectionConfig configuration parameters used to connect to 
RabbitMQ
+     * @param queueName name of the queue to publish to
+     * @param serializationSchema serialization schema to turn elements into 
byte representation
+     * @param publishOptions optionally used to compute routing/exchange for 
messages
+     * @param returnListener return listener
+     */
+    public RabbitMQSinkWriterExactlyOnce(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            RabbitMQSinkPublishOptions<T> publishOptions,
+            SerializableReturnListener returnListener) {
+        super(connectionConfig, queueName, serializationSchema, 
publishOptions, returnListener);
+        messages = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    /**
+     * On recover the messages are set to the outstanding messages from the 
states.
+     *
+     * @param states a list of states to recover the reader with
+     */
+    @Override
+    public void recoverFromStates(List<RabbitMQSinkWriterState<T>> states) {
+        List<RabbitMQSinkMessageWrapper<T>> messages =
+                Collections.synchronizedList(new ArrayList<>());
+        for (RabbitMQSinkWriterState<T> state : states) {
+            messages.addAll(state.getOutstandingMessages());
+        }
+        this.messages = messages;
+    }
+
+    @Override
+    protected void configureChannel() throws IOException {
+        // puts channel in commit mode
+        getRmqChannel().txSelect();
+    }
+
+    @Override
+    public void write(T element, Context context) {
+        messages.add(
+                new RabbitMQSinkMessageWrapper<>(
+                        element, getSerializationSchema().serialize(element)));
+    }
+
+    @Override
+    public List<RabbitMQSinkWriterState<T>> snapshotState() {
+        commitMessages();
+        return Collections.singletonList(new 
RabbitMQSinkWriterState<>(messages));
+    }
+
+    private void commitMessages() {
+        List<RabbitMQSinkMessageWrapper<T>> messagesToSend = 
messages.subList(0, messages.size());
+        try {
+            for (RabbitMQSinkMessageWrapper<T> msg : messagesToSend) {
+                getRmqSinkConnection().send(msg);
+            }
+            getRmqChannel().txCommit();
+            LOG.info("Successfully committed {} messages.", 
messagesToSend.size());
+            messagesToSend.clear();
+        } catch (IOException e) {
+            LOG.error(
+                    "Error during commit of {} messages. Rollback Messages. 
Error: {}",
+                    messagesToSend.size(),
+                    e.getMessage());
+            try {
+                getRmqChannel().txRollback();

Review comment:
       I am afraid that if we never successful commit, the state increases 
endlessly. WDYT of failing the job after the rollback?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq2.source.RabbitMQSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The base class for RabbitMQ tests. It sets up a flink cluster and a docker 
image for RabbitMQ. It
+ * provides behavior to easily add onto the stream, send message to RabbitMQ 
and get the messages in
+ * RabbitMQ.
+ */
+public abstract class RabbitMQBaseTest {
+
+    private static final int RABBITMQ_PORT = 5672;
+    private RabbitMQContainerClient client;

Review comment:
       Can you check that you do not use a raw type and specify the generic 
`<>`?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq2.source.RabbitMQSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The base class for RabbitMQ tests. It sets up a flink cluster and a docker 
image for RabbitMQ. It
+ * provides behavior to easily add onto the stream, send message to RabbitMQ 
and get the messages in
+ * RabbitMQ.
+ */
+public abstract class RabbitMQBaseTest {
+
+    private static final int RABBITMQ_PORT = 5672;
+    private RabbitMQContainerClient client;
+
+    @Rule public Timeout globalTimeout = Timeout.seconds(20);
+
+    @Rule
+    public MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(1)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    protected StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @Rule
+    public RabbitMQContainer rabbitMq =
+            new RabbitMQContainer(
+                            
DockerImageName.parse("rabbitmq").withTag("3.7.25-management-alpine"))
+                    .withExposedPorts(RABBITMQ_PORT);
+
+    @Before
+    public void setUpContainerClient() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000));
+        this.client = new RabbitMQContainerClient(rabbitMq);
+    }
+
+    public RabbitMQContainerClient addSinkOn(
+            DataStream<String> stream, ConsistencyMode consistencyMode, int 
countDownLatchSize)
+            throws IOException, TimeoutException {
+        RabbitMQContainerClient client =
+                new RabbitMQContainerClient(rabbitMq, new 
SimpleStringSchema(), countDownLatchSize);
+        String queueName = client.createQueue();
+        final RabbitMQConnectionConfig connectionConfig =
+                new RabbitMQConnectionConfig.Builder()
+                        .setHost(rabbitMq.getHost())
+                        .setVirtualHost("/")
+                        .setUserName(rabbitMq.getAdminUsername())
+                        .setPassword(rabbitMq.getAdminPassword())
+                        .setPort(rabbitMq.getMappedPort(RABBITMQ_PORT))
+                        .build();
+
+        RabbitMQSink<String> sink =
+                RabbitMQSink.<String>builder()
+                        .setConnectionConfig(connectionConfig)
+                        .setQueueName(queueName)
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .setConsistencyMode(consistencyMode)
+                        .build();
+        stream.sinkTo(sink).setParallelism(1);
+        return client;
+    }
+
+    public DataStream<String> addSourceOn(
+            StreamExecutionEnvironment env, ConsistencyMode consistencyMode)
+            throws IOException, TimeoutException {
+        String queueName = client.createQueue(false);
+
+        final RabbitMQConnectionConfig connectionConfig =
+                new RabbitMQConnectionConfig.Builder()
+                        .setHost(rabbitMq.getHost())
+                        .setVirtualHost("/")
+                        .setUserName(rabbitMq.getAdminUsername())
+                        .setPassword(rabbitMq.getAdminPassword())
+                        .setPort(rabbitMq.getMappedPort(RABBITMQ_PORT))
+                        .build();
+
+        RabbitMQSource<String> rabbitMQSource =
+                RabbitMQSource.<String>builder()
+                        .setConnectionConfig(connectionConfig)
+                        .setQueueName(queueName)
+                        .setDeserializationSchema(new SimpleStringSchema())
+                        .setConsistencyMode(consistencyMode)
+                        .build();
+
+        final DataStream<String> stream =
+                env.fromSource(rabbitMQSource, 
WatermarkStrategy.noWatermarks(), "RabbitMQSource")
+                        .setParallelism(1);
+
+        return stream;
+    }
+
+    public void sendToRabbit(List<String> messages) throws IOException {
+        for (String message : messages) {
+            client.sendMessages(new SimpleStringSchema(), message);
+        }
+    }
+
+    public void sendToRabbit(List<String> messages, List<String> 
correlationIds)
+            throws IOException {
+        for (int i = 0; i < messages.size(); i++) {
+            client.sendMessages(new SimpleStringSchema(), messages.get(i), 
correlationIds.get(i));
+        }
+    }
+
+    public List<String> getRandomMessages(int numberOfMessages) {
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < numberOfMessages; i++) {
+            messages.add(UUID.randomUUID().toString());
+        }
+        return messages;
+    }
+
+    public List<String> getSequentialMessages(int numberOfMessages) {
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < numberOfMessages; i++) {
+            messages.add("Message " + i);
+        }
+        return messages;
+    }
+
+    public List<String> getCollectedSinkMessages() {
+        List<String> messages = new ArrayList<>(CollectSink.VALUES);
+        CollectSink.VALUES.clear();
+        return messages;
+    }
+
+    public void addCollectorSink(
+            DataStream<String> stream, CountDownLatch latch, int 
failAtNthMessage) {
+        stream.addSink(new CollectSink(latch, failAtNthMessage));
+    }
+
+    public void addCollectorSink(DataStream<String> stream, CountDownLatch 
latch) {
+        stream.addSink(new CollectSink(latch));
+    }
+
+    /** CollectSink to access the messages from the stream. */
+    public static class CollectSink implements SinkFunction<String> {

Review comment:
       To reduce the scope of the class a bit WDYT of moving this class to 
`RabbitMQSourceTest?` It would also allow to specify the latch as static 
variable of `RabbitMQSourceTest` and directly access it in the sink without the 
need of the constructor parameter.

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceTest.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ source with different consistency modes. As the 
tests are working a
+ * lot with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSourceTest extends RabbitMQBaseTest {
+
+    // --------------- at most once ---------------
+    @Test
+    public void atMostOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+        CountDownLatch latch = new CountDownLatch(messages.size());
+
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.AT_MOST_ONCE);
+        addCollectorSink(stream, latch);
+        env.executeAsync();
+
+        sendToRabbit(messages);
+        latch.await();
+
+        assertEquals(
+                CollectionUtils.getCardinalityMap(messages),
+                CollectionUtils.getCardinalityMap(getCollectedSinkMessages()));
+    }
+
+    // --------------- at least once ---------------
+    @Test
+    public void atLeastOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.AT_LEAST_ONCE);
+        CountDownLatch latch = new CountDownLatch(messages.size());
+        addCollectorSink(stream, latch);
+        env.executeAsync();
+
+        sendToRabbit(messages);
+        latch.await();
+
+        assertEquals(
+                CollectionUtils.getCardinalityMap(messages),
+                CollectionUtils.getCardinalityMap(getCollectedSinkMessages()));
+    }
+
+    @Test
+    public void atLeastOnceFailureTest() throws Exception {
+        // An exception is thrown in the MapFunction in order to trigger a 
restart of Flink and it
+        // is assured that the source receives the messages again.
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.AT_LEAST_ONCE);
+
+        List<String> messages = getSequentialMessages(100);
+        int failAtNthMessage = 30;
+        CountDownLatch latch = new CountDownLatch(messages.size() + 
failAtNthMessage - 1);
+        addCollectorSink(stream, latch, failAtNthMessage);
+
+        env.executeAsync();
+
+        sendToRabbit(messages);
+        latch.await();
+
+        List<String> collectedMessages = getCollectedSinkMessages();
+        assertTrue(collectedMessages.containsAll(messages));
+    }
+
+    // --------------- exactly once ---------------
+    @Test
+    public void exactlyOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(1000);
+        CountDownLatch latch = new CountDownLatch(messages.size());
+
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.EXACTLY_ONCE);
+        addCollectorSink(stream, latch);
+        env.executeAsync();
+
+        // use messages as correlation ids here
+        sendToRabbit(messages, messages);
+
+        latch.await();
+
+        List<String> collectedMessages = getCollectedSinkMessages();
+        assertEquals(messages, collectedMessages);
+    }
+
+    @Test
+    public void exactlyOnceFilterCorrelationIdsTest() throws Exception {
+        List<String> messages = getRandomMessages(5);
+        CountDownLatch latch = new CountDownLatch(3);
+
+        env.enableCheckpointing(5000);
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.EXACTLY_ONCE);
+        addCollectorSink(stream, latch);
+        env.executeAsync();
+
+        List<String> correlationIds = Arrays.asList("1", "2", "3", "3", "3");
+        sendToRabbit(messages, correlationIds);
+
+        latch.await();
+
+        List<String> collectedMessages = getCollectedSinkMessages();
+        List<String> expectedMessages = messages.subList(0, 3);
+        assertEquals(expectedMessages, collectedMessages);
+    }
+
+    @Test
+    public void exactlyOnceWithFailureAndMessageDuplicationTest() throws 
Exception {
+        // An exception is thrown in order to trigger a restart of Flink and it
+        // is assured that the system receives the messages only once. We 
disable
+        // (by setting the interval higher than the test duration) checkpoint 
to
+        // expect receiving all pre-exception messages once again.
+        env.enableCheckpointing(500000);
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.EXACTLY_ONCE);
+
+        List<String> messages = getRandomMessages(100);
+        List<String> correlationIds = messages;
+
+        int failAtNthMessage = 30;
+        CountDownLatch latch = new CountDownLatch(messages.size() + 
failAtNthMessage - 1);
+        addCollectorSink(stream, latch, failAtNthMessage);
+        env.executeAsync();
+
+        sendToRabbit(messages, correlationIds);
+        latch.await();
+
+        List<String> collectedMessages = getCollectedSinkMessages();
+        collectedMessages =
+                collectedMessages.subList(failAtNthMessage - 1, 
collectedMessages.size());
+        assertEquals(messages, collectedMessages);
+    }
+
+    @Test
+    public void exactlyOnceWithFailureTest() throws Exception {
+        env.enableCheckpointing(1000);
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.EXACTLY_ONCE);
+
+        List<String> messages = getSequentialMessages(60);
+        List<String> messagesA = messages.subList(0, 30);
+        List<String> messagesB = messages.subList(30, messages.size());
+
+        int failAtNthMessage = messagesA.size() + 1;
+        CountDownLatch latch = new CountDownLatch(messagesA.size() + 
messagesB.size());
+        addCollectorSink(stream, latch, failAtNthMessage);
+        env.executeAsync();
+
+        sendToRabbit(messagesA, messagesA);
+        TimeUnit.MILLISECONDS.sleep(2500);

Review comment:
       Unfortunately you have to rework this timeout because they are 
unreliable. I am not sure overall what this test does in addition to 
`exactlyOnceWithFailureAndMessageDuplicationTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to