fapaul commented on a change in pull request #15140: URL: https://github.com/apache/flink/pull/15140#discussion_r596803085
########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.writer.specialized; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; + +import com.rabbitmq.client.ConfirmCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has at-least-once semantics, + * meaning it guarantees that outgoing message arrive at RabbitMQ at least once. + * + * <p>At-least-once consistency is implemented by assigning sequence numbers to arriving messages + * and buffering them together in the state of the writer until an ack arrives. + * + * <p>Checkpointing is required for at-least-once to work because messages are resend only when a + * checkpoint is triggered (to avoid complex time tracking mechanisms for each individual message). + * Thus on each checkpoint, all messages which were sent at least once before to RabbitMQ but are + * still unacknowledged will be send once again - duplications are possible by this behavior. + * + * <p>After a failure, a new writer gets initialized with one or more states that contain + * unacknowledged messages. These messages get resend immediately while buffering them in the new + * state of the writer. + * + * @param <T> Type of the elements in this sink + */ +public class RabbitMQSinkWriterAtLeastOnce<T> extends RabbitMQSinkWriterBase<T> { + protected final ConcurrentNavigableMap<Long, RabbitMQSinkMessageWrapper<T>> outstandingConfirms; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterAtLeastOnce.class); + + private Set<Long> lastSeenMessageIds; + + /** + * Create a new RabbitMQSinkWriterAtLeastOnce. + * + * @param connectionConfig configuration parameters used to connect to RabbitMQ + * @param queueName name of the queue to publish to + * @param serializationSchema serialization schema to turn elements into byte representation + * @param publishOptions optionally used to compute routing/exchange for messages + * @param returnListener returnListener + */ + public RabbitMQSinkWriterAtLeastOnce( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema<T> serializationSchema, + RabbitMQSinkPublishOptions<T> publishOptions, + SerializableReturnListener returnListener) { + super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener); + this.outstandingConfirms = new ConcurrentSkipListMap<>(); + this.lastSeenMessageIds = new HashSet<>(); + } + + /** + * On recover all stored messages in the states get resend. + * + * @param states a list of states to recover the reader with + * @throws IOException as messages are send to RabbitMQ + */ + @Override + public void recoverFromStates(List<RabbitMQSinkWriterState<T>> states) throws IOException { + for (RabbitMQSinkWriterState<T> state : states) { + for (RabbitMQSinkMessageWrapper<T> message : state.getOutstandingMessages()) { + send(message); + } + } + } + + private void send(RabbitMQSinkMessageWrapper<T> msg) throws IOException { + long sequenceNumber = getRmqChannel().getNextPublishSeqNo(); + getRmqSinkConnection().send(msg); + outstandingConfirms.put(sequenceNumber, msg); + } + + private void resendMessages() throws IOException { + Set<Long> temp = outstandingConfirms.keySet(); + Set<Long> messagesToResend = new HashSet<>(temp); + messagesToResend.retainAll(lastSeenMessageIds); + for (Long id : messagesToResend) { + // remove the old message from the map, since the message was added a second time + // under a new id or is put into the list of messages to resend + RabbitMQSinkMessageWrapper<T> msg = outstandingConfirms.remove(id); + if (msg != null) { + send(msg); + } + } + lastSeenMessageIds = temp; + } + + private ConfirmCallback handleAcknowledgements() { + return (sequenceNumber, multiple) -> { + // multiple flag indicates that all messages < sequenceNumber can be safely acknowledged + if (multiple) { + // create a view of the portion of the map that contains keys < sequenceNumber + ConcurrentNavigableMap<Long, RabbitMQSinkMessageWrapper<T>> confirmed = + outstandingConfirms.headMap(sequenceNumber, true); + // changes to the view are reflected in the original map + confirmed.clear(); + } else { + outstandingConfirms.remove(sequenceNumber); + } + }; + } + + private ConfirmCallback handleNegativeAcknowledgements() { + return (sequenceNumber, multiple) -> { + RabbitMQSinkMessageWrapper<T> message = outstandingConfirms.get(sequenceNumber); + LOG.error( + "Message with body {} has been nack-ed. Sequence number: {}, multiple: {}", + message.getMessage(), + sequenceNumber, + multiple); + }; + } + + @Override + protected void configureChannel() throws IOException { + ConfirmCallback ackCallback = handleAcknowledgements(); + ConfirmCallback nackCallback = handleNegativeAcknowledgements(); + // register callbacks for cases of ack and negative ack of messages (seq numbers) + getRmqChannel().addConfirmListener(ackCallback, nackCallback); + getRmqChannel().confirmSelect(); + } + + /** + * All messages that are sent to RabbitMQ and not acknowledge yet will be resend. A single state Review comment: ```suggestion * All messages that are sent to RabbitMQ and not acknowledged yet will be resend. A single state ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/common/RabbitMQMessageWrapper.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source.common; + +/** + * A wrapper class for the message received from RabbitMQ that holds the deserialized message, the + * delivery tag and the correlation id. + * + * @param <T> The type of the message to hold. + */ +public class RabbitMQMessageWrapper<T> { Review comment: Duplicate of `RabbitMQSourceMessageWrapper` and therefore unused? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.writer.specialized; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides exactly-once delivery + * guarantees, which means incoming stream elements will be delivered to RabbitMQ exactly once. For + * this, checkpointing needs to be enabled. + * + * <p>Exactly-once consistency is implemented using a transactional RabbitMQ channel. All incoming + * stream elements are buffered in the state of this writer until the next checkpoint is triggered. + * All buffered {@code messages} are then send to RabbitMQ in a single transaction. When successful, + * all messages committed get removed from the state. If the transaction is aborted, all messages + * are put back into the state and send on the next checkpoint. + * + * <p>The transactional channel is heavyweight and will decrease throughput. If the system is under + * heavy load, consecutive checkpoints can be delayed if commits take longer than the checkpointing + * interval specified. Only use exactly-once if necessary (no duplicated messages in RabbitMQ + * allowed), otherwise consider using at-least-once. + * + * @param <T> Type of the elements in this sink + */ +public class RabbitMQSinkWriterExactlyOnce<T> extends RabbitMQSinkWriterBase<T> { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterExactlyOnce.class); + + // All messages that arrived and could not be committed this far. + private List<RabbitMQSinkMessageWrapper<T>> messages; + + /** + * Create a new RabbitMQSinkWriterExactlyOnce. + * + * @param connectionConfig configuration parameters used to connect to RabbitMQ + * @param queueName name of the queue to publish to + * @param serializationSchema serialization schema to turn elements into byte representation + * @param publishOptions optionally used to compute routing/exchange for messages + * @param returnListener return listener + */ + public RabbitMQSinkWriterExactlyOnce( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema<T> serializationSchema, + RabbitMQSinkPublishOptions<T> publishOptions, + SerializableReturnListener returnListener) { + super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener); + messages = Collections.synchronizedList(new ArrayList<>()); Review comment: ```suggestion this.messages = Collections.synchronizedList(new ArrayList<>()); ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.common; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Delivery; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; + +/** + * This class provides a RabbitMQ container client which allows creating queues, sending messages to + * RabbitMQ and get the messages received by RabbitMQ. + */ +public class RabbitMQContainerClient<T> { + + private final RabbitMQContainer container; + private Channel channel; + private final Queue<byte[]> messages; + private String queueName; + private CountDownLatch latch; + private DeserializationSchema<T> valueDeserializer; + + public RabbitMQContainerClient( + RabbitMQContainer container, + DeserializationSchema<T> valueDeserializer, + int countDownLatchSize) { + container.withExposedPorts(5762).waitingFor(Wait.forListeningPort()); + this.container = container; + this.messages = new LinkedList<>(); + this.latch = new CountDownLatch(countDownLatchSize); + this.valueDeserializer = valueDeserializer; + } + + public RabbitMQContainerClient(RabbitMQContainer container) { + this(container, null, 0); + } + + public String createQueue(String queueName, boolean withConsumer) + throws IOException, TimeoutException { + this.queueName = queueName; + Connection connection = getRabbitMQConnection(); + this.channel = connection.createChannel(); + channel.queueDeclare(queueName, true, false, false, null); + if (withConsumer) { + final DeliverCallback deliverCallback = this::handleMessageReceivedCallback; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); + } + return this.queueName; + } + + public String createQueue() throws IOException, TimeoutException { + return createQueue(UUID.randomUUID().toString(), true); + } + + public String createQueue(boolean withConsumer) throws IOException, TimeoutException { + return createQueue(UUID.randomUUID().toString(), withConsumer); + } + + public <T> void sendMessages(SerializationSchema<T> valueSerializer, T... messages) + throws IOException { + for (T message : messages) { + channel.basicPublish("", queueName, null, valueSerializer.serialize(message)); + } + } + + public <T> void sendMessages( Review comment: ```suggestion public void sendMessages( ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink; + +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.GlobalBoolean; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQContainerClient; +import org.apache.flink.streaming.api.datastream.DataStream; + +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * The tests for the rabbitmq sink with different consistency modes. As the tests are working a lot + * with timeouts to uphold stream it is possible that tests might fail. + */ +public class RabbitMQSinkTest extends RabbitMQBaseTest { + @Test + public void atMostOnceTest() throws Exception { + List<String> messages = getRandomMessages(100); + + DataStream<String> stream = env.fromCollection(messages); + RabbitMQContainerClient client = + addSinkOn(stream, ConsistencyMode.AT_MOST_ONCE, messages.size()); + env.execute(); + client.await(); + + List<String> receivedMessages = client.getConsumedMessages(); + assertEquals(messages, receivedMessages); + } + + @Test + public void atLeastOnceTest() throws Exception { + List<String> messages = getRandomMessages(100); + DataStream<String> stream = env.fromCollection(messages); + RabbitMQContainerClient client = + addSinkOn(stream, ConsistencyMode.AT_LEAST_ONCE, messages.size()); + + env.execute(); + client.await(); + + List<String> receivedMessages = client.getConsumedMessages(); + assertEquals(messages, receivedMessages); + } + + @Test + public void atLeastOnceWithFlinkFailureTest() throws Exception { + List<String> messages = getRandomMessages(100); + + GlobalBoolean shouldFail = new GlobalBoolean(true); Review comment: Since this is the only place you use the `GlobalBoolean` why not use a simple static `AtomicBoolean` as part of this class? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/state/RabbitMQSinkWriterStateSerializerTest.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.state; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** Test the sink writer state serializer. */ +public class RabbitMQSinkWriterStateSerializerTest { + + private RabbitMQSinkWriterState<String> getSinkWriterState() { + List<RabbitMQSinkMessageWrapper<String>> outStandingMessages = new ArrayList<>(); + SimpleStringSchema serializer = new SimpleStringSchema(); + for (int i = 0; i < 5; i++) { + String message = "Message " + i; + RabbitMQSinkMessageWrapper<String> messageWrapper = + new RabbitMQSinkMessageWrapper<>(message, serializer.serialize(message)); + outStandingMessages.add(messageWrapper); + } + return new RabbitMQSinkWriterState<>(outStandingMessages); + } + + @Test + public void testWriterStateSerializer() throws IOException { + RabbitMQSinkWriterState<String> writerState = getSinkWriterState(); + RabbitMQSinkWriterStateSerializer<String> serializer = + new RabbitMQSinkWriterStateSerializer<>(); + + byte[] serializedWriterState = serializer.serialize(writerState); + RabbitMQSinkWriterState<String> deserializedWriterState = + serializer.deserialize(serializer.getVersion(), serializedWriterState); + + List<byte[]> expectedBytes = + writerState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getBytes) + .collect(Collectors.toList()); + List<byte[]> actualBytes = + deserializedWriterState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getBytes) + .collect(Collectors.toList()); + + assertEquals(expectedBytes.size(), actualBytes.size()); + for (int i = 0; i < expectedBytes.size(); i++) { + Assert.assertArrayEquals(expectedBytes.get(i), actualBytes.get(i)); + } + + List<String> actualMessages = + deserializedWriterState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getMessage) + .collect(Collectors.toList()); + + for (String message : actualMessages) { + assertNull(message); + } + } + + @Test + public void testWriterStateSerializerWithDeserializationSchema() throws IOException { + RabbitMQSinkWriterState<String> writerState = getSinkWriterState(); + SimpleStringSchema deserializer = new SimpleStringSchema(); + RabbitMQSinkWriterStateSerializer<String> serializer = + new RabbitMQSinkWriterStateSerializer<>(deserializer); + + byte[] serializedWriterState = serializer.serialize(writerState); + RabbitMQSinkWriterState<String> deserializedWriterState = + serializer.deserialize(serializer.getVersion(), serializedWriterState); + + List<String> expectedMessages = + writerState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getMessage) + .collect(Collectors.toList()); + List<byte[]> expectedBytes = + writerState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getBytes) + .collect(Collectors.toList()); + + List<String> actualMessages = + writerState.getOutstandingMessages().stream() Review comment: ```suggestion deserializedWriterState.getOutstandingMessages().stream() ``` ? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.writer.specialized; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides exactly-once delivery + * guarantees, which means incoming stream elements will be delivered to RabbitMQ exactly once. For + * this, checkpointing needs to be enabled. + * + * <p>Exactly-once consistency is implemented using a transactional RabbitMQ channel. All incoming + * stream elements are buffered in the state of this writer until the next checkpoint is triggered. + * All buffered {@code messages} are then send to RabbitMQ in a single transaction. When successful, + * all messages committed get removed from the state. If the transaction is aborted, all messages + * are put back into the state and send on the next checkpoint. + * + * <p>The transactional channel is heavyweight and will decrease throughput. If the system is under + * heavy load, consecutive checkpoints can be delayed if commits take longer than the checkpointing + * interval specified. Only use exactly-once if necessary (no duplicated messages in RabbitMQ + * allowed), otherwise consider using at-least-once. + * + * @param <T> Type of the elements in this sink + */ +public class RabbitMQSinkWriterExactlyOnce<T> extends RabbitMQSinkWriterBase<T> { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterExactlyOnce.class); + + // All messages that arrived and could not be committed this far. + private List<RabbitMQSinkMessageWrapper<T>> messages; + + /** + * Create a new RabbitMQSinkWriterExactlyOnce. + * + * @param connectionConfig configuration parameters used to connect to RabbitMQ + * @param queueName name of the queue to publish to + * @param serializationSchema serialization schema to turn elements into byte representation + * @param publishOptions optionally used to compute routing/exchange for messages + * @param returnListener return listener + */ + public RabbitMQSinkWriterExactlyOnce( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema<T> serializationSchema, + RabbitMQSinkPublishOptions<T> publishOptions, + SerializableReturnListener returnListener) { + super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener); + messages = Collections.synchronizedList(new ArrayList<>()); + } + + /** + * On recover the messages are set to the outstanding messages from the states. + * + * @param states a list of states to recover the reader with + */ + @Override + public void recoverFromStates(List<RabbitMQSinkWriterState<T>> states) { + List<RabbitMQSinkMessageWrapper<T>> messages = + Collections.synchronizedList(new ArrayList<>()); + for (RabbitMQSinkWriterState<T> state : states) { + messages.addAll(state.getOutstandingMessages()); + } + this.messages = messages; Review comment: Why do we have to overwrite messages and do not add to `this.messages` directly? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.common; + +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.util.Preconditions; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +/** + * This class provides basic RabbitMQ functionality and common behaviour such as establishing and + * closing a connection via the {@code connectionConfig}. In addition, it provides methods for + * serializing and sending messages to RabbitMQ (with or without publish options). + * + * @param <T> The type of the messages that are published + */ +public class RabbitMQSinkConnection<T> { + protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkConnection.class); + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private Connection rmqConnection; + private Channel rmqChannel; + + @Nullable private final RabbitMQSinkPublishOptions<T> publishOptions; + + @Nullable private final SerializableReturnListener returnListener; + + public RabbitMQSinkConnection( + RabbitMQConnectionConfig connectionConfig, + String queueName, + @Nullable RabbitMQSinkPublishOptions<T> publishOptions, + @Nullable SerializableReturnListener returnListener) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.publishOptions = publishOptions; + this.returnListener = returnListener; + } + + /** + * Setup the RabbitMQ connection and a channel to send messages to. + * + * @throws Exception that might occur when setting up the connection and channel. + */ + public void setupRabbitMQ() throws Exception { + LOG.info("Setup RabbitMQ"); + this.rmqConnection = setupConnection(connectionConfig); + this.rmqChannel = setupChannel(rmqConnection, queueName, returnListener); + } + + private Connection setupConnection(RabbitMQConnectionConfig connectionConfig) throws Exception { + return connectionConfig.getConnectionFactory().newConnection(); + } + + private Channel setupChannel( + Connection rmqConnection, String queueName, SerializableReturnListener returnListener) + throws IOException { + final Channel rmqChannel = rmqConnection.createChannel(); + rmqChannel.queueDeclare(queueName, true, false, false, null); + if (returnListener != null) { + rmqChannel.addReturnListener(returnListener); + } + return rmqChannel; + } + + /** + * Only used by at-least-once and exactly-once for resending messages that could not be + * delivered. + * + * @param message sink message wrapping the atomic message object + */ + public void send(RabbitMQSinkMessageWrapper<T> message) throws IOException { + send(message.getMessage(), message.getBytes()); + } + + /** + * Publish a message to a queue in RabbitMQ. With publish options enabled, first compute the + * necessary publishing information. + * + * @param message original message, only required for publishing with publish options present + * @param serializedMessage serialized message to send to RabbitMQ + */ + public void send(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + rmqChannel.basicPublish("", queueName, null, serializedMessage); + } else { + publishWithOptions(message, serializedMessage); + } + } + + private void publishWithOptions(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + throw new RuntimeException("Try to publish with options without publishOptions."); + } + + boolean mandatory = publishOptions.computeMandatory(message); + boolean immediate = publishOptions.computeImmediate(message); + + Preconditions.checkState( + !(returnListener == null && (mandatory || immediate)), + "Setting mandatory and/or immediate flags to true requires a ReturnListener."); + + String rk = publishOptions.computeRoutingKey(message); + String exchange = publishOptions.computeExchange(message); + + rmqChannel.basicPublish( + exchange, + rk, + mandatory, + immediate, + publishOptions.computeProperties(message), + serializedMessage); + } + + public void close() throws Exception { + // close the channel + if (rmqChannel != null) { + rmqChannel.close(); + } + + // close the connection + if (rmqConnection != null) { + rmqConnection.close(); + } + } + + public Channel getRmqChannel() { + return rmqChannel; + } Review comment: Please add docs for the public methods ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source.reader; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.rabbitmq2.source.common.RabbitMQSourceMessageWrapper; +import org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit; +import org.apache.flink.core.io.InputStatus; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.Envelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; + +/** + * The source reader for RabbitMQ queues. This is the base class of the different consistency modes. + * + * @param <T> The output type of the source. + */ +public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T, RabbitMQSourceSplit> { + protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceReaderBase.class); + + // The assigned split from the enumerator. + private RabbitMQSourceSplit split; + + private Connection rmqConnection; + private Channel rmqChannel; + + private final SourceReaderContext sourceReaderContext; + // The deserialization schema for the messages of RabbitMQ. + private final DeserializationSchema<T> deliveryDeserializer; + // The collector keeps the messages received from RabbitMQ. + private final RabbitMQCollector<T> collector; + + public RabbitMQSourceReaderBase( + SourceReaderContext sourceReaderContext, + DeserializationSchema<T> deliveryDeserializer) { + this.sourceReaderContext = requireNonNull(sourceReaderContext); + this.deliveryDeserializer = requireNonNull(deliveryDeserializer); + this.collector = new RabbitMQCollector<>(); + } + + @Override + public void start() { + LOG.info("Starting source reader and send split request"); + sourceReaderContext.sendSplitRequest(); + } + + // ------------- start RabbitMQ methods -------------- + + private void setupRabbitMQ() { + try { + setupConnection(); + setupChannel(); + LOG.info( + "RabbitMQ Connection was successful: Waiting for messages from the queue. To exit press CTRL+C"); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + } + + private ConnectionFactory setupConnectionFactory() throws Exception { + return split.getConnectionConfig().getConnectionFactory(); + } + + private void setupConnection() throws Exception { + rmqConnection = setupConnectionFactory().newConnection(); + } + + /** @return boolean whether messages should be automatically acknowledged to RabbitMQ. */ + protected abstract boolean isAutoAck(); + + /** + * This function will be called when a new message from RabbitMQ gets pushed to the source. The + * message will be deserialized and forwarded to our message collector where it is buffered + * until it can be processed. + * + * @param consumerTag The consumer tag of the message. + * @param delivery The delivery from RabbitMQ. + * @throws IOException if something fails during deserialization. + */ + protected void handleMessageReceivedCallback(String consumerTag, Delivery delivery) + throws IOException { + + AMQP.BasicProperties properties = delivery.getProperties(); + byte[] body = delivery.getBody(); + Envelope envelope = delivery.getEnvelope(); + collector.setMessageIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag()); + deliveryDeserializer.deserialize(body, collector); + } + + protected void setupChannel() throws IOException { + rmqChannel = rmqConnection.createChannel(); + rmqChannel.queueDeclare(split.getQueueName(), true, false, false, null); + + // Set maximum of unacknowledged messages + if (getSplit().getConnectionConfig().getPrefetchCount().isPresent()) { + // global: false - the prefetch count is set per consumer, not per RabbitMQ channel + rmqChannel.basicQos(getSplit().getConnectionConfig().getPrefetchCount().get(), false); + } + + final DeliverCallback deliverCallback = this::handleMessageReceivedCallback; + rmqChannel.basicConsume( + split.getQueueName(), isAutoAck(), deliverCallback, consumerTag -> {}); + } + + // ------------- end RabbitMQ methods -------------- + + /** + * This method provides a hook that is called when a message gets polled by the output. + * + * @param message the message that was polled by the output. + */ + protected void handleMessagePolled(RabbitMQSourceMessageWrapper<T> message) {} + + @Override + public InputStatus pollNext(ReaderOutput<T> output) { + RabbitMQSourceMessageWrapper<T> message = collector.pollMessage(); + if (message == null) { + return InputStatus.NOTHING_AVAILABLE; + } + + output.collect(message.getMessage()); + handleMessagePolled(message); + + return collector.hasUnpolledMessages() + ? InputStatus.MORE_AVAILABLE + : InputStatus.NOTHING_AVAILABLE; + } + + @Override + public List<RabbitMQSourceSplit> snapshotState(long checkpointId) { + return split != null ? Collections.singletonList(split.copy()) : new ArrayList<>(); + } + + @Override + public CompletableFuture<Void> isAvailable() { + return CompletableFuture.runAsync( + () -> { + while (!collector.hasUnpolledMessages()) { + // supposed to be empty + } + }); + } + + /** + * Assign the split from the enumerator. If the source reader already has a split nothing + * happens. After the split is assigned, the connection to RabbitMQ can be setup. + * + * @param list RabbitMQSourceSplits with only one element. + * @see RabbitMQSourceEnumerator + * @see RabbitMQSourceSplit + */ + @Override + public void addSplits(List<RabbitMQSourceSplit> list) { + if (split != null) { + return; + } + if (list.size() != 1) { + throw new RuntimeException("The number of added splits should be exaclty one."); + } + split = list.get(0); + setupRabbitMQ(); + } + + @Override + public void notifyNoMoreSplits() {} + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) {} Review comment: Why did you keep this method? My previous comment was marked as resolved but I think this method is not necessary and you can delete it. ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumState; +import org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumStateSerializer; +import org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator; +import org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase; +import org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce; +import org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtMostOnce; +import org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderExactlyOnce; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It provides + * at-most-once, at-least-once and exactly-once processing semantics. For at-least-once and + * exactly-once, checkpointing needs to be enabled. The source operates as a StreamingSource and + * thus works in a streaming fashion. Please use a {@link RabbitMQSourceBuilder} to construct the + * source. The following example shows how to create a RabbitMQSource emitting records of <code> + * String</code> type. + * + * <pre>{@code + * RabbitMQSource<String> source = RabbitMQSource + * .<String>builder() + * .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG) + * .setQueueName("myQueue") + * .setDeliveryDeserializer(new SimpleStringSchema()) + * .setConsistencyMode(MY_CONSISTENCY_MODE) + * .build(); + * }</pre> + * + * <p>When creating the source a {@code connectionConfig} must be specified via {@link + * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to + * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a + * password and a port. Besides that, the {@code queueName} to consume from and a {@link + * DeserializationSchema} + * + * <p>When using at-most-once consistency, messages are automatically acknowledged when received + * from RabbitMQ and later consumed by the output. In case of a failure, messages might be lost. + * More details in {@link RabbitMQSourceReaderAtMostOnce}. + * + * <p>In case of at-least-once consistency, message are buffered and later consumed by the output. + * Once a checkpoint is finished, the messages that were consumed by the output are acknowledged to + * RabbitMQ. This way, we ensure that the messages are successfully received by the output. In case + * of a system failure, the message that were acknowledged to RabbitMQ will be resend by RabbitMQ. + * More details in {@link RabbitMQSourceReaderAtLeastOnce}. + * + * <p>To ensure exactly-once consistency, messages are deduplicated through {@code correlationIds}. + * Similar to at-least-once consistency, we store the {@code deliveryTags} of the messages that are + * consumed by the output to acknowledge them later. A transactional RabbitMQ channel is used to + * ensure that all messages are successfully acknowledged to RabbitMQ. More details in {@link + * RabbitMQSourceReaderExactlyOnce}. + * + * <p>Keep in mind that the transactional channels are heavyweight and performance will drop. Under + * heavy load, checkpoints can be delayed if a transaction takes longer than the specified + * checkpointing interval. + * + * @param <T> the output type of the source. + */ +public class RabbitMQSource<T> + implements Source<T, RabbitMQSourceSplit, RabbitMQSourceEnumState>, ResultTypeQueryable<T> { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSource.class); + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private final DeserializationSchema<T> deserializationSchema; + private final ConsistencyMode consistencyMode; + + protected RabbitMQSource( Review comment: private? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.common; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Delivery; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; + +/** + * This class provides a RabbitMQ container client which allows creating queues, sending messages to + * RabbitMQ and get the messages received by RabbitMQ. + */ +public class RabbitMQContainerClient<T> { + + private final RabbitMQContainer container; + private Channel channel; + private final Queue<byte[]> messages; + private String queueName; + private CountDownLatch latch; + private DeserializationSchema<T> valueDeserializer; + + public RabbitMQContainerClient( + RabbitMQContainer container, + DeserializationSchema<T> valueDeserializer, + int countDownLatchSize) { + container.withExposedPorts(5762).waitingFor(Wait.forListeningPort()); + this.container = container; + this.messages = new LinkedList<>(); + this.latch = new CountDownLatch(countDownLatchSize); + this.valueDeserializer = valueDeserializer; + } + + public RabbitMQContainerClient(RabbitMQContainer container) { + this(container, null, 0); + } + + public String createQueue(String queueName, boolean withConsumer) + throws IOException, TimeoutException { + this.queueName = queueName; + Connection connection = getRabbitMQConnection(); + this.channel = connection.createChannel(); + channel.queueDeclare(queueName, true, false, false, null); + if (withConsumer) { + final DeliverCallback deliverCallback = this::handleMessageReceivedCallback; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); + } + return this.queueName; + } + + public String createQueue() throws IOException, TimeoutException { + return createQueue(UUID.randomUUID().toString(), true); + } + + public String createQueue(boolean withConsumer) throws IOException, TimeoutException { + return createQueue(UUID.randomUUID().toString(), withConsumer); + } + + public <T> void sendMessages(SerializationSchema<T> valueSerializer, T... messages) Review comment: ```suggestion public void sendMessages(SerializationSchema<T> valueSerializer, T... messages) ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.writer.specialized; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides exactly-once delivery + * guarantees, which means incoming stream elements will be delivered to RabbitMQ exactly once. For + * this, checkpointing needs to be enabled. + * + * <p>Exactly-once consistency is implemented using a transactional RabbitMQ channel. All incoming + * stream elements are buffered in the state of this writer until the next checkpoint is triggered. + * All buffered {@code messages} are then send to RabbitMQ in a single transaction. When successful, + * all messages committed get removed from the state. If the transaction is aborted, all messages + * are put back into the state and send on the next checkpoint. + * + * <p>The transactional channel is heavyweight and will decrease throughput. If the system is under + * heavy load, consecutive checkpoints can be delayed if commits take longer than the checkpointing + * interval specified. Only use exactly-once if necessary (no duplicated messages in RabbitMQ + * allowed), otherwise consider using at-least-once. + * + * @param <T> Type of the elements in this sink + */ +public class RabbitMQSinkWriterExactlyOnce<T> extends RabbitMQSinkWriterBase<T> { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterExactlyOnce.class); + + // All messages that arrived and could not be committed this far. + private List<RabbitMQSinkMessageWrapper<T>> messages; + + /** + * Create a new RabbitMQSinkWriterExactlyOnce. + * + * @param connectionConfig configuration parameters used to connect to RabbitMQ + * @param queueName name of the queue to publish to + * @param serializationSchema serialization schema to turn elements into byte representation + * @param publishOptions optionally used to compute routing/exchange for messages + * @param returnListener return listener + */ + public RabbitMQSinkWriterExactlyOnce( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema<T> serializationSchema, + RabbitMQSinkPublishOptions<T> publishOptions, + SerializableReturnListener returnListener) { + super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener); + messages = Collections.synchronizedList(new ArrayList<>()); + } + + /** + * On recover the messages are set to the outstanding messages from the states. + * + * @param states a list of states to recover the reader with + */ + @Override + public void recoverFromStates(List<RabbitMQSinkWriterState<T>> states) { + List<RabbitMQSinkMessageWrapper<T>> messages = + Collections.synchronizedList(new ArrayList<>()); + for (RabbitMQSinkWriterState<T> state : states) { + messages.addAll(state.getOutstandingMessages()); + } + this.messages = messages; + } + + @Override + protected void configureChannel() throws IOException { + // puts channel in commit mode + getRmqChannel().txSelect(); + } + + @Override + public void write(T element, Context context) { + messages.add( + new RabbitMQSinkMessageWrapper<>( + element, getSerializationSchema().serialize(element))); + } + + @Override + public List<RabbitMQSinkWriterState<T>> snapshotState() { + commitMessages(); + return Collections.singletonList(new RabbitMQSinkWriterState<>(messages)); + } + + private void commitMessages() { + List<RabbitMQSinkMessageWrapper<T>> messagesToSend = messages.subList(0, messages.size()); + try { + for (RabbitMQSinkMessageWrapper<T> msg : messagesToSend) { + getRmqSinkConnection().send(msg); + } + getRmqChannel().txCommit(); + LOG.info("Successfully committed {} messages.", messagesToSend.size()); + messagesToSend.clear(); + } catch (IOException e) { + LOG.error( + "Error during commit of {} messages. Rollback Messages. Error: {}", + messagesToSend.size(), + e.getMessage()); + try { + getRmqChannel().txRollback(); Review comment: I am afraid that if we never successful commit, the state increases endlessly. WDYT of failing the job after the rollback? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.common; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq2.source.RabbitMQSource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; + +/** + * The base class for RabbitMQ tests. It sets up a flink cluster and a docker image for RabbitMQ. It + * provides behavior to easily add onto the stream, send message to RabbitMQ and get the messages in + * RabbitMQ. + */ +public abstract class RabbitMQBaseTest { + + private static final int RABBITMQ_PORT = 5672; + private RabbitMQContainerClient client; Review comment: Can you check that you do not use a raw type and specify the generic `<>`? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.common; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq2.source.RabbitMQSource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; + +/** + * The base class for RabbitMQ tests. It sets up a flink cluster and a docker image for RabbitMQ. It + * provides behavior to easily add onto the stream, send message to RabbitMQ and get the messages in + * RabbitMQ. + */ +public abstract class RabbitMQBaseTest { + + private static final int RABBITMQ_PORT = 5672; + private RabbitMQContainerClient client; + + @Rule public Timeout globalTimeout = Timeout.seconds(20); + + @Rule + public MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + protected StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + @Rule + public RabbitMQContainer rabbitMq = + new RabbitMQContainer( + DockerImageName.parse("rabbitmq").withTag("3.7.25-management-alpine")) + .withExposedPorts(RABBITMQ_PORT); + + @Before + public void setUpContainerClient() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000)); + this.client = new RabbitMQContainerClient(rabbitMq); + } + + public RabbitMQContainerClient addSinkOn( + DataStream<String> stream, ConsistencyMode consistencyMode, int countDownLatchSize) + throws IOException, TimeoutException { + RabbitMQContainerClient client = + new RabbitMQContainerClient(rabbitMq, new SimpleStringSchema(), countDownLatchSize); + String queueName = client.createQueue(); + final RabbitMQConnectionConfig connectionConfig = + new RabbitMQConnectionConfig.Builder() + .setHost(rabbitMq.getHost()) + .setVirtualHost("/") + .setUserName(rabbitMq.getAdminUsername()) + .setPassword(rabbitMq.getAdminPassword()) + .setPort(rabbitMq.getMappedPort(RABBITMQ_PORT)) + .build(); + + RabbitMQSink<String> sink = + RabbitMQSink.<String>builder() + .setConnectionConfig(connectionConfig) + .setQueueName(queueName) + .setSerializationSchema(new SimpleStringSchema()) + .setConsistencyMode(consistencyMode) + .build(); + stream.sinkTo(sink).setParallelism(1); + return client; + } + + public DataStream<String> addSourceOn( + StreamExecutionEnvironment env, ConsistencyMode consistencyMode) + throws IOException, TimeoutException { + String queueName = client.createQueue(false); + + final RabbitMQConnectionConfig connectionConfig = + new RabbitMQConnectionConfig.Builder() + .setHost(rabbitMq.getHost()) + .setVirtualHost("/") + .setUserName(rabbitMq.getAdminUsername()) + .setPassword(rabbitMq.getAdminPassword()) + .setPort(rabbitMq.getMappedPort(RABBITMQ_PORT)) + .build(); + + RabbitMQSource<String> rabbitMQSource = + RabbitMQSource.<String>builder() + .setConnectionConfig(connectionConfig) + .setQueueName(queueName) + .setDeserializationSchema(new SimpleStringSchema()) + .setConsistencyMode(consistencyMode) + .build(); + + final DataStream<String> stream = + env.fromSource(rabbitMQSource, WatermarkStrategy.noWatermarks(), "RabbitMQSource") + .setParallelism(1); + + return stream; + } + + public void sendToRabbit(List<String> messages) throws IOException { + for (String message : messages) { + client.sendMessages(new SimpleStringSchema(), message); + } + } + + public void sendToRabbit(List<String> messages, List<String> correlationIds) + throws IOException { + for (int i = 0; i < messages.size(); i++) { + client.sendMessages(new SimpleStringSchema(), messages.get(i), correlationIds.get(i)); + } + } + + public List<String> getRandomMessages(int numberOfMessages) { + List<String> messages = new ArrayList<>(); + for (int i = 0; i < numberOfMessages; i++) { + messages.add(UUID.randomUUID().toString()); + } + return messages; + } + + public List<String> getSequentialMessages(int numberOfMessages) { + List<String> messages = new ArrayList<>(); + for (int i = 0; i < numberOfMessages; i++) { + messages.add("Message " + i); + } + return messages; + } + + public List<String> getCollectedSinkMessages() { + List<String> messages = new ArrayList<>(CollectSink.VALUES); + CollectSink.VALUES.clear(); + return messages; + } + + public void addCollectorSink( + DataStream<String> stream, CountDownLatch latch, int failAtNthMessage) { + stream.addSink(new CollectSink(latch, failAtNthMessage)); + } + + public void addCollectorSink(DataStream<String> stream, CountDownLatch latch) { + stream.addSink(new CollectSink(latch)); + } + + /** CollectSink to access the messages from the stream. */ + public static class CollectSink implements SinkFunction<String> { Review comment: To reduce the scope of the class a bit WDYT of moving this class to `RabbitMQSourceTest?` It would also allow to specify the latch as static variable of `RabbitMQSourceTest` and directly access it in the sink without the need of the constructor parameter. ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceTest.java ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source; + +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest; +import org.apache.flink.streaming.api.datastream.DataStream; + +import org.apache.commons.collections.CollectionUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * The tests for the RabbitMQ source with different consistency modes. As the tests are working a + * lot with timeouts to uphold stream it is possible that tests might fail. + */ +public class RabbitMQSourceTest extends RabbitMQBaseTest { + + // --------------- at most once --------------- + @Test + public void atMostOnceTest() throws Exception { + List<String> messages = getRandomMessages(100); + CountDownLatch latch = new CountDownLatch(messages.size()); + + DataStream<String> stream = addSourceOn(env, ConsistencyMode.AT_MOST_ONCE); + addCollectorSink(stream, latch); + env.executeAsync(); + + sendToRabbit(messages); + latch.await(); + + assertEquals( + CollectionUtils.getCardinalityMap(messages), + CollectionUtils.getCardinalityMap(getCollectedSinkMessages())); + } + + // --------------- at least once --------------- + @Test + public void atLeastOnceTest() throws Exception { + List<String> messages = getRandomMessages(100); + DataStream<String> stream = addSourceOn(env, ConsistencyMode.AT_LEAST_ONCE); + CountDownLatch latch = new CountDownLatch(messages.size()); + addCollectorSink(stream, latch); + env.executeAsync(); + + sendToRabbit(messages); + latch.await(); + + assertEquals( + CollectionUtils.getCardinalityMap(messages), + CollectionUtils.getCardinalityMap(getCollectedSinkMessages())); + } + + @Test + public void atLeastOnceFailureTest() throws Exception { + // An exception is thrown in the MapFunction in order to trigger a restart of Flink and it + // is assured that the source receives the messages again. + DataStream<String> stream = addSourceOn(env, ConsistencyMode.AT_LEAST_ONCE); + + List<String> messages = getSequentialMessages(100); + int failAtNthMessage = 30; + CountDownLatch latch = new CountDownLatch(messages.size() + failAtNthMessage - 1); + addCollectorSink(stream, latch, failAtNthMessage); + + env.executeAsync(); + + sendToRabbit(messages); + latch.await(); + + List<String> collectedMessages = getCollectedSinkMessages(); + assertTrue(collectedMessages.containsAll(messages)); + } + + // --------------- exactly once --------------- + @Test + public void exactlyOnceTest() throws Exception { + List<String> messages = getRandomMessages(1000); + CountDownLatch latch = new CountDownLatch(messages.size()); + + DataStream<String> stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + addCollectorSink(stream, latch); + env.executeAsync(); + + // use messages as correlation ids here + sendToRabbit(messages, messages); + + latch.await(); + + List<String> collectedMessages = getCollectedSinkMessages(); + assertEquals(messages, collectedMessages); + } + + @Test + public void exactlyOnceFilterCorrelationIdsTest() throws Exception { + List<String> messages = getRandomMessages(5); + CountDownLatch latch = new CountDownLatch(3); + + env.enableCheckpointing(5000); + DataStream<String> stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + addCollectorSink(stream, latch); + env.executeAsync(); + + List<String> correlationIds = Arrays.asList("1", "2", "3", "3", "3"); + sendToRabbit(messages, correlationIds); + + latch.await(); + + List<String> collectedMessages = getCollectedSinkMessages(); + List<String> expectedMessages = messages.subList(0, 3); + assertEquals(expectedMessages, collectedMessages); + } + + @Test + public void exactlyOnceWithFailureAndMessageDuplicationTest() throws Exception { + // An exception is thrown in order to trigger a restart of Flink and it + // is assured that the system receives the messages only once. We disable + // (by setting the interval higher than the test duration) checkpoint to + // expect receiving all pre-exception messages once again. + env.enableCheckpointing(500000); + DataStream<String> stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + + List<String> messages = getRandomMessages(100); + List<String> correlationIds = messages; + + int failAtNthMessage = 30; + CountDownLatch latch = new CountDownLatch(messages.size() + failAtNthMessage - 1); + addCollectorSink(stream, latch, failAtNthMessage); + env.executeAsync(); + + sendToRabbit(messages, correlationIds); + latch.await(); + + List<String> collectedMessages = getCollectedSinkMessages(); + collectedMessages = + collectedMessages.subList(failAtNthMessage - 1, collectedMessages.size()); + assertEquals(messages, collectedMessages); + } + + @Test + public void exactlyOnceWithFailureTest() throws Exception { + env.enableCheckpointing(1000); + DataStream<String> stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + + List<String> messages = getSequentialMessages(60); + List<String> messagesA = messages.subList(0, 30); + List<String> messagesB = messages.subList(30, messages.size()); + + int failAtNthMessage = messagesA.size() + 1; + CountDownLatch latch = new CountDownLatch(messagesA.size() + messagesB.size()); + addCollectorSink(stream, latch, failAtNthMessage); + env.executeAsync(); + + sendToRabbit(messagesA, messagesA); + TimeUnit.MILLISECONDS.sleep(2500); Review comment: Unfortunately you have to rework this timeout because they are unreliable. I am not sure overall what this test does in addition to `exactlyOnceWithFailureAndMessageDuplicationTest`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
