fapaul commented on a change in pull request #15140:
URL: https://github.com/apache/flink/pull/15140#discussion_r595436525
##########
File path: flink-connectors/flink-connector-rabbitmq2/README.md
##########
@@ -0,0 +1,24 @@
+# License of the Rabbit MQ Connector
+
+Flink's RabbitMQ connector defines a Maven dependency on the
+"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public
License 1.1 ("MPL"),
+the GNU General Public License version 2 ("GPL") and the Apache License
version 2 ("ASL").
+
+Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
+nor packages binaries from the "RabbitMQ AMQP Java Client".
+
+Users that create and publish derivative work based on Flink's
+RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
+must be aware that this may be subject to conditions declared in the
+Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
("GPL")
+and the Apache License version 2 ("ASL").
+
+This connector allows consuming messages from and publishing to RabbitMQ. It
supports the
Review comment:
```suggestion
This connector allows consuming messages from and publishing to RabbitMQ. It
supports the
```
```suggestion
This connector allows consuming messages from and publishing to RabbitMQ. It
implements the
```
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides
at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and
exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ * .builder()
+ * .setConnectionConfig(connectionConfig)
+ * .setQueueName("queue")
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ * .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via
{@link
+ * RabbitMQConnectionConfig}. It contains required information for the
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and
a {@link
+ * SerializationSchema} for the sink input type is required. {@code
publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an
acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged
messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be
restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to
achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages
that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single
transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of
the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the
performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer
than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once
or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages
that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void,
RabbitMQSinkWriterState<T>, Void> {
+
+ private final RabbitMQConnectionConfig connectionConfig;
+ private final String queueName;
+ private final SerializationSchema<T> serializationSchema;
+ private final RabbitMQSinkPublishOptions<T> publishOptions;
+ private final ConsistencyMode consistencyMode;
+ private final SerializableReturnListener returnListener;
+
+ private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE =
ConsistencyMode.AT_MOST_ONCE;
+
+ private RabbitMQSink(
+ RabbitMQConnectionConfig connectionConfig,
+ String queueName,
+ SerializationSchema<T> serializationSchema,
+ ConsistencyMode consistencyMode,
+ SerializableReturnListener returnListener,
+ @Nullable RabbitMQSinkPublishOptions<T> publishOptions) {
+ this.connectionConfig = connectionConfig;
+ this.queueName = queueName;
+ this.serializationSchema = serializationSchema;
+ this.consistencyMode = consistencyMode;
+ this.returnListener = returnListener;
+ this.publishOptions = publishOptions;
+
+ requireNonNull(connectionConfig);
Review comment:
You can do
```suggestion
this.connectionConfig = requireNonNull(connectionConfig);
```
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides
exactly-once delivery
+ * guarantees, which means incoming stream elements will be delivered to
RabbitMQ exactly once. For
+ * this, checkpointing needs to be enabled.
+ *
+ * <p>Exactly-once behaviour is implemented using a transactional RabbitMQ
channel. All incoming
+ * stream elements are buffered in the state of this writer until the next
checkpoint is triggered.
+ * All buffered {@code messages} are then send to RabbitMQ in a single
transaction. When successful,
+ * all messages committed get removed from the state. If the transaction is
aborted, all messages
+ * are put back into the state and send on the next checkpoint.
+ *
+ * <p>The transactional channel is heavyweight and will decrease throughput.
If the system is under
+ * heavy load, consecutive checkpoints can be delayed if commits take longer
than the checkpointing
+ * interval specified. Only use exactly-once if necessary (no duplicated
messages in RabbitMQ
+ * allowed), otherwise consider using at-least-once.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterExactlyOnce<T> extends
RabbitMQSinkWriterBase<T> {
+
+ /** All messages that arrived and could not be committed this far. */
+ private List<RabbitMQSinkMessageWrapper<T>> messages;
+
+ /**
+ * Create a new RabbitMQSinkWriterExactlyOnce.
+ *
+ * @param connectionConfig configuration parameters used to connect to
RabbitMQ
+ * @param queueName name of the queue to publish to
+ * @param serializationSchema serialization schema to turn elements into
byte representation
+ * @param publishOptions optionally used to compute routing/exchange for
messages
+ * @param returnListener return listener
+ * @param states a list of states to initialize this reader with
+ */
+ public RabbitMQSinkWriterExactlyOnce(
+ RabbitMQConnectionConfig connectionConfig,
+ String queueName,
+ SerializationSchema<T> serializationSchema,
+ RabbitMQSinkPublishOptions<T> publishOptions,
+ SerializableReturnListener returnListener,
+ List<RabbitMQSinkWriterState<T>> states)
+ throws Exception {
+ super(connectionConfig, queueName, serializationSchema,
publishOptions, returnListener);
+ messages = Collections.synchronizedList(new ArrayList<>());
+ initWithState(states);
+ }
+
+ private void initWithState(List<RabbitMQSinkWriterState<T>> states) {
+ List<RabbitMQSinkMessageWrapper<T>> messages =
+ Collections.synchronizedList(new ArrayList<>());
+ for (RabbitMQSinkWriterState<T> state : states) {
+ messages.addAll(state.getOutstandingMessages());
+ }
+ this.messages = messages;
+ }
+
+ @Override
+ protected void configureChannel() throws IOException {
+ // puts channel in commit mode
+ getRmqChannel().txSelect();
+ }
+
+ @Override
+ public void write(T element, Context context) {
+ messages.add(
+ new RabbitMQSinkMessageWrapper<>(element,
serializationSchema.serialize(element)));
+ }
+
+ @Override
+ public List<RabbitMQSinkWriterState<T>> snapshotState() {
+ commitMessages();
+ return Collections.singletonList(new
RabbitMQSinkWriterState<>(messages));
+ }
+
+ private void commitMessages() {
+ List<RabbitMQSinkMessageWrapper<T>> messagesToSend = new
ArrayList<>(messages);
+ messages.subList(0, messagesToSend.size()).clear();
+ try {
+ for (RabbitMQSinkMessageWrapper<T> msg : messagesToSend) {
+ super.send(msg);
+ }
+ getRmqChannel().txCommit();
+ LOG.info("Successfully committed {} messages.",
messagesToSend.size());
+ } catch (IOException e) {
+ LOG.error(
+ "Error during commit of {} messages. Rollback Messages.
Error: {}",
+ messagesToSend.size(),
+ e.getMessage());
+ messages.addAll(0, messagesToSend);
+ try {
+ getRmqChannel().txRollback();
+ } catch (IOException rollbackException) {
+ throw new RuntimeException(rollbackException.getMessage());
Review comment:
You should mention with this failure something like `Cannot rollback
RabbitMQ transaction, this might leave the transaction in a pending state.`
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides
exactly-once delivery
+ * guarantees, which means incoming stream elements will be delivered to
RabbitMQ exactly once. For
+ * this, checkpointing needs to be enabled.
+ *
+ * <p>Exactly-once behaviour is implemented using a transactional RabbitMQ
channel. All incoming
+ * stream elements are buffered in the state of this writer until the next
checkpoint is triggered.
+ * All buffered {@code messages} are then send to RabbitMQ in a single
transaction. When successful,
+ * all messages committed get removed from the state. If the transaction is
aborted, all messages
+ * are put back into the state and send on the next checkpoint.
+ *
+ * <p>The transactional channel is heavyweight and will decrease throughput.
If the system is under
+ * heavy load, consecutive checkpoints can be delayed if commits take longer
than the checkpointing
+ * interval specified. Only use exactly-once if necessary (no duplicated
messages in RabbitMQ
+ * allowed), otherwise consider using at-least-once.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterExactlyOnce<T> extends
RabbitMQSinkWriterBase<T> {
+
+ /** All messages that arrived and could not be committed this far. */
+ private List<RabbitMQSinkMessageWrapper<T>> messages;
+
+ /**
+ * Create a new RabbitMQSinkWriterExactlyOnce.
+ *
+ * @param connectionConfig configuration parameters used to connect to
RabbitMQ
+ * @param queueName name of the queue to publish to
+ * @param serializationSchema serialization schema to turn elements into
byte representation
+ * @param publishOptions optionally used to compute routing/exchange for
messages
+ * @param returnListener return listener
+ * @param states a list of states to initialize this reader with
+ */
+ public RabbitMQSinkWriterExactlyOnce(
+ RabbitMQConnectionConfig connectionConfig,
+ String queueName,
+ SerializationSchema<T> serializationSchema,
+ RabbitMQSinkPublishOptions<T> publishOptions,
+ SerializableReturnListener returnListener,
+ List<RabbitMQSinkWriterState<T>> states)
+ throws Exception {
+ super(connectionConfig, queueName, serializationSchema,
publishOptions, returnListener);
+ messages = Collections.synchronizedList(new ArrayList<>());
+ initWithState(states);
+ }
+
+ private void initWithState(List<RabbitMQSinkWriterState<T>> states) {
+ List<RabbitMQSinkMessageWrapper<T>> messages =
+ Collections.synchronizedList(new ArrayList<>());
+ for (RabbitMQSinkWriterState<T> state : states) {
+ messages.addAll(state.getOutstandingMessages());
+ }
+ this.messages = messages;
+ }
+
+ @Override
+ protected void configureChannel() throws IOException {
+ // puts channel in commit mode
+ getRmqChannel().txSelect();
+ }
+
+ @Override
+ public void write(T element, Context context) {
+ messages.add(
+ new RabbitMQSinkMessageWrapper<>(element,
serializationSchema.serialize(element)));
+ }
+
+ @Override
+ public List<RabbitMQSinkWriterState<T>> snapshotState() {
+ commitMessages();
+ return Collections.singletonList(new
RabbitMQSinkWriterState<>(messages));
+ }
+
+ private void commitMessages() {
+ List<RabbitMQSinkMessageWrapper<T>> messagesToSend = new
ArrayList<>(messages);
+ messages.subList(0, messagesToSend.size()).clear();
Review comment:
Are you sure this clear and add again on failure magic works? ;)
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq2.source.RabbitMQSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The base class for RabbitMQ tests. It sets up a flink cluster and a docker
image for RabbitMQ. It
+ * provides behavior to easily add onto the stream, send message to RabbitMQ
and get the messages in
+ * RabbitMQ.
+ */
+public abstract class RabbitMQBaseTest {
+
+ private static final int RABBITMQ_PORT = 5672;
+ private RabbitMQContainerClient client;
+ private String queueName;
+
+ @Rule public Timeout globalTimeout = Timeout.seconds(10);
+
+ @Rule
+ public MiniClusterWithClientResource flinkCluster =
Review comment:
I think you should use the MiniCluster for your testing because you
every time you call `env.execute` currently it starts a new Flink cluster in
the background which makes testing as you have seen difficult because you do
not have access to the objects.
With the MiniCluster you can get the clusterClient and submit your job i.e.
https://github.com/apache/flink/blob/e63783f69da48b32e2c91e5acb05809ffdb02950/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java#L504
This way all your objects are part of the same process hence you access the
same instances.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQContainerClient.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.common;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class provides a RabbitMQ container client which allows creating
queues, sending messages to
+ * RabbitMQ and get the messages received by RabbitMQ.
+ */
+public class RabbitMQContainerClient {
+
+ private final RabbitMQContainer container;
+ private Channel channel;
+ private final Queue<byte[]> messages;
+ private String queueName;
+ private CountDownLatch latch;
+
+ public RabbitMQContainerClient(RabbitMQContainer container) {
+ container.withExposedPorts(5762).waitingFor(Wait.forListeningPort());
+ this.container = container;
+ this.messages = new LinkedList<>();
+ }
+
+ public void createQueue(String queueName, Boolean withConsumer)
+ throws IOException, TimeoutException {
+ this.queueName = queueName;
+ Connection connection = getRabbitMQConnection();
+ this.channel = connection.createChannel();
+ channel.queueDeclare(queueName, true, false, false, null);
+ if (withConsumer) {
+ messages.clear();
+ final DeliverCallback deliverCallback =
this::handleMessageReceivedCallback;
+ channel.basicConsume(queueName, true, deliverCallback, consumerTag
-> {});
+ }
+ }
+
+ public void createQueue(String queueName) throws IOException,
TimeoutException {
+ createQueue(queueName, false);
+ }
+
+ public <T> void sendMessages(SerializationSchema<T> valueSerializer, T...
messages)
+ throws IOException {
+ for (T message : messages) {
+ channel.basicPublish("", queueName, null,
valueSerializer.serialize(message));
+ }
+ }
+
+ public <T> void sendMessages(
+ SerializationSchema<T> valueSerializer, T message, String
correlationId)
+ throws IOException {
+ AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties.Builder();
+ builder.correlationId(correlationId);
+ AMQP.BasicProperties properties = builder.build();
+ channel.basicPublish("", queueName, properties,
valueSerializer.serialize(message));
+ }
+
+ public <T> List<T> readMessages(DeserializationSchema<T>
valueDeserializer) throws IOException {
+ List<T> deserializedMessages = new ArrayList<>();
+ while (!messages.isEmpty()) {
+ T message = valueDeserializer.deserialize(messages.poll());
+ deserializedMessages.add(message);
+ }
+ return deserializedMessages;
+ }
+
+ protected void handleMessageReceivedCallback(String consumerTag, Delivery
delivery) {
Review comment:
Why protected?
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumState;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumStateSerializer;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import
org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase;
+import
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce;
+import
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtMostOnce;
+import
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderExactlyOnce;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import
org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It
provides
+ * at-most-once, at-least-once and exactly-once processing semantics. For
at-least-once and
+ * exactly-once, checkpointing needs to be enabled. The source operates as a
StreamingSource and
+ * thus works in a streaming fashion. Please use a {@link
RabbitMQSourceBuilder} to construct the
+ * source. The following example shows how to create a RabbitMQSource emitting
records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * RabbitMQSource<String> source = RabbitMQSource
+ * .<String>builder()
+ * .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+ * .setQueueName("myQueue")
+ * .setDeliveryDeserializer(new SimpleStringSchema())
+ * .setConsistencyMode(MY_CONSISTENCY_MODE)
+ * .build();
+ * }</pre>
+ *
+ * <p>When creating the source a {@code connectionConfig} must be specified
via {@link
+ * RabbitMQConnectionConfig}. It contains required information for the
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to consume from
and a {@link
+ * DeserializationSchema}
+ *
+ * <p>When using at-most-once consistency, messages are automatically
acknowledged when received
+ * from RabbitMQ and later consumed by the output. In case of a failure,
messages might be lost.
+ * More details in {@link RabbitMQSourceReaderAtMostOnce}.
+ *
+ * <p>In case of at-least-once consistency, message are buffered and later
consumed by the output.
+ * Once a checkpoint is finished, the messages that were consumed by the
output are acknowledged to
+ * RabbitMQ. This way, we ensure that the messages are successfully received
by the output. In case
+ * of a system failure, the message that were acknowledged to RabbitMQ will be
resend by RabbitMQ.
+ * More details in {@link RabbitMQSourceReaderAtLeastOnce}.
+ *
+ * <p>To ensure exactly-once consistency, messages are deduplicated through
{@code correlationIds}.
+ * Similar to at-least-once consistency, we store the {@code deliveryTags} of
the messages that are
+ * consumed by the output to acknowledge them later. A transactional RabbitMQ
channel is used to
+ * ensure that all messages are successfully acknowledged to RabbitMQ. More
details in {@link
+ * RabbitMQSourceReaderExactlyOnce}.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and
performance will drop. Under
+ * heavy load, checkpoints can be delayed if a transaction takes longer than
the specified
+ * checkpointing interval.
+ *
+ * @param <T> the output type of the source.
+ */
+public class RabbitMQSource<T>
+ implements Source<T, RabbitMQSourceSplit, RabbitMQSourceEnumState>,
ResultTypeQueryable<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RabbitMQSource.class);
+
+ private final RabbitMQConnectionConfig connectionConfig;
+ private final String queueName;
+ private final DeserializationSchema<T> deserializationSchema;
+ private final ConsistencyMode consistencyMode;
+
+ public RabbitMQSource(
+ RabbitMQConnectionConfig connectionConfig,
+ String queueName,
+ DeserializationSchema<T> deserializationSchema,
+ ConsistencyMode consistencyMode) {
+ this.connectionConfig = connectionConfig;
+ this.queueName = queueName;
+ this.deserializationSchema = deserializationSchema;
+ this.consistencyMode = consistencyMode;
+
+ LOG.info("Create RabbitMQ source");
+ }
+
+ /**
+ * Get a {@link RabbitMQSourceBuilder} for the source.
+ *
+ * @param <T> type of the source.
+ * @return a source builder
+ * @see RabbitMQSourceBuilder
+ */
+ public static <T> RabbitMQSourceBuilder<T> builder() {
+ return new RabbitMQSourceBuilder<>();
+ }
+
+ /**
+ * The boundedness is always continuous unbounded as this is a
streaming-only source.
+ *
+ * @return Boundedness continuous unbounded.
+ * @see Boundedness
+ */
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ /**
+ * Returns a new initialized source reader of the source's consistency
mode.
+ *
+ * @param sourceReaderContext context which the reader will be created in.
+ * @return RabbitMQSourceReader a source reader of the specified
consistency type.
+ * @see RabbitMQSourceReaderBase
+ */
+ @Override
+ public SourceReader<T, RabbitMQSourceSplit> createReader(
+ SourceReaderContext sourceReaderContext) {
+ LOG.info("New Source Reader of type " + consistencyMode + "
requested.");
+ switch (consistencyMode) {
+ case AT_MOST_ONCE:
+ return new RabbitMQSourceReaderAtMostOnce<>(
+ sourceReaderContext, deserializationSchema);
+ case AT_LEAST_ONCE:
+ return new RabbitMQSourceReaderAtLeastOnce<>(
+ sourceReaderContext, deserializationSchema);
+ case EXACTLY_ONCE:
+ return new RabbitMQSourceReaderExactlyOnce<>(
+ sourceReaderContext, deserializationSchema);
+ default:
+ LOG.error("The requested reader of type " + consistencyMode +
" is not supported");
+ return null;
Review comment:
Please fail if an unknown consistency mode is specified.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides
at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and
exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ * .builder()
+ * .setConnectionConfig(connectionConfig)
+ * .setQueueName("queue")
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ * .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via
{@link
+ * RabbitMQConnectionConfig}. It contains required information for the
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and
a {@link
+ * SerializationSchema} for the sink input type is required. {@code
publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an
acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged
messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be
restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to
achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages
that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single
transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of
the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the
performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer
than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once
or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages
that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void,
RabbitMQSinkWriterState<T>, Void> {
+
+ private final RabbitMQConnectionConfig connectionConfig;
+ private final String queueName;
+ private final SerializationSchema<T> serializationSchema;
+ private final RabbitMQSinkPublishOptions<T> publishOptions;
+ private final ConsistencyMode consistencyMode;
+ private final SerializableReturnListener returnListener;
+
+ private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE =
ConsistencyMode.AT_MOST_ONCE;
+
+ private RabbitMQSink(
+ RabbitMQConnectionConfig connectionConfig,
+ String queueName,
+ SerializationSchema<T> serializationSchema,
+ ConsistencyMode consistencyMode,
+ SerializableReturnListener returnListener,
+ @Nullable RabbitMQSinkPublishOptions<T> publishOptions) {
+ this.connectionConfig = connectionConfig;
+ this.queueName = queueName;
+ this.serializationSchema = serializationSchema;
+ this.consistencyMode = consistencyMode;
+ this.returnListener = returnListener;
+ this.publishOptions = publishOptions;
+
+ requireNonNull(connectionConfig);
+ requireNonNull(queueName);
+ requireNonNull(serializationSchema);
+
+ Preconditions.checkState(
+ verifyPublishOptions(),
+ "If consistency mode is stronger than at-most-once and publish
options are defined"
+ + "then publish options need a deserialization
schema");
+ }
+
+ private boolean verifyPublishOptions() {
+ // If at-most-once, doesnt matter if publish options are provided (no
state in writer)
+ if (consistencyMode == ConsistencyMode.AT_MOST_ONCE) {
+ return true;
+ }
+ if (publishOptions == null) {
+ return true;
+ }
+ // If we are at-least or exactly-once and publish options are set, we
need a deserialization
+ // schema to recover the original messages from the state to recompute
publish options
+ return publishOptions.getDeserializationSchema().isPresent();
+ }
+
+ public static <T> RabbitMQSinkBuilder<T> builder() {
+ return new RabbitMQSinkBuilder<>();
+ }
+
+ /**
+ * Create and return an extension of {@link RabbitMQSinkWriterBase} based
on the selected {@link
+ * ConsistencyMode}.
+ *
+ * @param context The initialization context of the Sink
+ * @param states A list of states to initialize the writer with
+ * @return The SinkWriter implementation depending on the consistency mode
set by the user
+ */
+ @Override
+ public SinkWriter<T, Void, RabbitMQSinkWriterState<T>> createWriter(
+ InitContext context, List<RabbitMQSinkWriterState<T>> states) {
+ try {
+ switch (consistencyMode) {
+ case AT_MOST_ONCE:
+ return new RabbitMQSinkWriterAtMostOnce<>(
+ connectionConfig,
+ queueName,
+ serializationSchema,
+ publishOptions,
+ returnListener);
+ case AT_LEAST_ONCE:
+ return new RabbitMQSinkWriterAtLeastOnce<>(
+ connectionConfig,
+ queueName,
+ serializationSchema,
+ publishOptions,
+ returnListener,
+ states);
+ case EXACTLY_ONCE:
+ return new RabbitMQSinkWriterExactlyOnce<>(
+ connectionConfig,
+ queueName,
+ serializationSchema,
+ publishOptions,
+ returnListener,
+ states);
+ default:
+ throw new RuntimeException(
+ "Error in creating a SinkWriter: "
+ + "No valid consistency mode was
specified.");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
Review comment:
I think you should create the underlying channel lazy to make the
constructor call safe and not throw an exception. In general it is not
recommended that constructors throw exceptions.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+
+import com.rabbitmq.client.ConfirmCallback;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has
at-least-once semantics,
+ * meaning it guarantees that outgoing message arrive at RabbitMQ at least
once.
+ *
+ * <p>At-least-once behaviour is implemented by assigning sequence numbers to
arriving messages and
+ * buffering them together in the state of the writer until an ack arrives.
+ *
+ * <p>Checkpointing is required for at-least-once to work because messages are
resend only when a
+ * checkpoint is triggered (to avoid complex time tracking mechanisms for each
individual message).
+ * Thus on each checkpoint, all messages which were sent at least once before
to RabbitMQ but are
+ * still unacknowledged will be send once again - duplications are possible by
this behavior.
+ *
+ * <p>After a failure, a new writer gets initialized with one or more states
that contain
+ * unacknowledged messages. These messages get resend immediately while
buffering them in the new
+ * state of the writer.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterAtLeastOnce<T> extends
RabbitMQSinkWriterBase<T> {
+ protected final ConcurrentNavigableMap<Long,
RabbitMQSinkMessageWrapper<T>> outstandingConfirms;
+ private Set<Long> lastSeenMessageIds;
+
+ /**
+ * Create a new RabbitMQSinkWriterExactlyOnce.
+ *
+ * @param connectionConfig configuration parameters used to connect to
RabbitMQ
+ * @param queueName name of the queue to publish to
+ * @param serializationSchema serialization schema to turn elements into
byte representation
+ * @param publishOptions optionally used to compute routing/exchange for
messages
+ * @param returnListener returnListener
+ * @param states a list of states to initialize this reader with
+ */
+ public RabbitMQSinkWriterAtLeastOnce(
+ RabbitMQConnectionConfig connectionConfig,
+ String queueName,
+ SerializationSchema<T> serializationSchema,
+ RabbitMQSinkPublishOptions<T> publishOptions,
+ SerializableReturnListener returnListener,
+ List<RabbitMQSinkWriterState<T>> states)
+ throws Exception {
+ super(connectionConfig, queueName, serializationSchema,
publishOptions, returnListener);
+ this.outstandingConfirms = new ConcurrentSkipListMap<>();
+ this.lastSeenMessageIds = new HashSet<>();
+ initWithState(states);
+ }
+
+ private void initWithState(List<RabbitMQSinkWriterState<T>> states) throws
IOException {
+ for (RabbitMQSinkWriterState<T> state : states) {
+ for (RabbitMQSinkMessageWrapper<T> message :
state.getOutstandingMessages()) {
+ send(message);
+ }
+ }
+ }
+
+ @Override
+ protected void send(RabbitMQSinkMessageWrapper<T> msg) throws IOException {
+ long sequenceNumber = getRmqChannel().getNextPublishSeqNo();
+ super.send(msg);
+ outstandingConfirms.put(sequenceNumber, msg);
+ }
+
+ private void resendMessages() throws IOException {
+ Set<Long> temp = outstandingConfirms.keySet();
+ Set<Long> messagesToResend = new HashSet<>(temp);
+ messagesToResend.retainAll(lastSeenMessageIds);
+ for (Long id : messagesToResend) {
+ // remove the old message from the map, since the message was
added a second time
+ // under a new id or is put into the list of messages to resend
+ RabbitMQSinkMessageWrapper<T> msg = outstandingConfirms.remove(id);
+ if (msg != null) {
+ send(msg);
+ }
+ }
+ lastSeenMessageIds = temp;
+ }
+
+ private ConfirmCallback handleAcknowledgements() {
+ return (sequenceNumber, multiple) -> {
+ // multiple flag indicates that all messages < sequenceNumber can
be safely acknowledged
+ if (multiple) {
+ // create a view of the portion of the map that contains keys
< sequenceNumber
+ ConcurrentNavigableMap<Long, RabbitMQSinkMessageWrapper<T>>
confirmed =
+ outstandingConfirms.headMap(sequenceNumber, true);
+ // changes to the view are reflected in the original map
+ confirmed.clear();
+ } else {
+ outstandingConfirms.remove(sequenceNumber);
+ }
+ };
+ }
+
+ private ConfirmCallback handleNegativeAcknowledgements() {
+ return (sequenceNumber, multiple) -> {
+ RabbitMQSinkMessageWrapper<T> message =
outstandingConfirms.get(sequenceNumber);
+ LOG.error(
+ "Message with body {} has been nack-ed. Sequence number:
{}, multiple: {}",
+ message.getMessage(),
+ sequenceNumber,
+ multiple);
+ };
+ }
+
+ @Override
+ protected void configureChannel() throws IOException {
+ ConfirmCallback ackCallback = handleAcknowledgements();
+ ConfirmCallback nackCallback = handleNegativeAcknowledgements();
+ // register callbacks for cases of ack and negative ack of messages
(seq numbers)
+ getRmqChannel().addConfirmListener(ackCallback, nackCallback);
+ getRmqChannel().confirmSelect();
+ }
+
+ /**
+ * All messages that are older than the minimal resend interval will get
resend. A single state
Review comment:
I guess the `resend interval` is a left-over?
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md
##########
@@ -0,0 +1,63 @@
+# RabbitMQ Sink
+
+Flink's RabbitMQ connector provides a sink which enables you to publish your
stream directly
+to a RabbitMQ exchange in three different consistency modes: at-most-once,
at-least-once,
+and exactly-once. Furthermore, user defined publish options can be used to
customize each message
+options in regard to exchange and publish settings in the RabbitMQ context.
+
+## Consistency Behaviour
Review comment:
Now the source says `Consistency Modes` and the sink uses `Consistency
Behaviour`?!
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
##########
@@ -0,0 +1,134 @@
+package org.apache.flink.connector.rabbitmq2.sink.common;
+
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * This class provides basic RabbitMQ functionality and common behaviour such
as establishing and
+ * closing a connection via the {@code connectionConfig}. In addition, it
provides methods for
+ * serializing and sending messages to RabbitMQ (with or without publish
options).
+ *
+ * @param <T> The type of the messages that are published.
+ */
+public class RabbitMQSinkConnection<T> {
Review comment:
I'd prefer reducing the inheritance hierarchy and create an instance of
this class in `RabbitMQSinkWriterBase`
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides
at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and
exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ * .builder()
+ * .setConnectionConfig(connectionConfig)
+ * .setQueueName("queue")
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ * .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via
{@link
+ * RabbitMQConnectionConfig}. It contains required information for the
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and
a {@link
+ * SerializationSchema} for the sink input type is required. {@code
publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an
acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged
messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be
restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to
achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages
that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single
transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of
the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the
performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer
than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once
or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages
that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void,
RabbitMQSinkWriterState<T>, Void> {
+
+ private final RabbitMQConnectionConfig connectionConfig;
+ private final String queueName;
+ private final SerializationSchema<T> serializationSchema;
+ private final RabbitMQSinkPublishOptions<T> publishOptions;
+ private final ConsistencyMode consistencyMode;
+ private final SerializableReturnListener returnListener;
+
+ private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE =
ConsistencyMode.AT_MOST_ONCE;
+
+ private RabbitMQSink(
+ RabbitMQConnectionConfig connectionConfig,
+ String queueName,
+ SerializationSchema<T> serializationSchema,
+ ConsistencyMode consistencyMode,
+ SerializableReturnListener returnListener,
+ @Nullable RabbitMQSinkPublishOptions<T> publishOptions) {
+ this.connectionConfig = connectionConfig;
+ this.queueName = queueName;
+ this.serializationSchema = serializationSchema;
+ this.consistencyMode = consistencyMode;
+ this.returnListener = returnListener;
+ this.publishOptions = publishOptions;
+
+ requireNonNull(connectionConfig);
+ requireNonNull(queueName);
+ requireNonNull(serializationSchema);
+
+ Preconditions.checkState(
+ verifyPublishOptions(),
+ "If consistency mode is stronger than at-most-once and publish
options are defined"
+ + "then publish options need a deserialization
schema");
+ }
+
+ private boolean verifyPublishOptions() {
+ // If at-most-once, doesnt matter if publish options are provided (no
state in writer)
+ if (consistencyMode == ConsistencyMode.AT_MOST_ONCE) {
+ return true;
+ }
+ if (publishOptions == null) {
+ return true;
+ }
+ // If we are at-least or exactly-once and publish options are set, we
need a deserialization
+ // schema to recover the original messages from the state to recompute
publish options
+ return publishOptions.getDeserializationSchema().isPresent();
+ }
+
+ public static <T> RabbitMQSinkBuilder<T> builder() {
+ return new RabbitMQSinkBuilder<>();
+ }
+
+ /**
+ * Create and return an extension of {@link RabbitMQSinkWriterBase} based
on the selected {@link
+ * ConsistencyMode}.
+ *
+ * @param context The initialization context of the Sink
+ * @param states A list of states to initialize the writer with
+ * @return The SinkWriter implementation depending on the consistency mode
set by the user
+ */
+ @Override
+ public SinkWriter<T, Void, RabbitMQSinkWriterState<T>> createWriter(
+ InitContext context, List<RabbitMQSinkWriterState<T>> states) {
+ try {
+ switch (consistencyMode) {
+ case AT_MOST_ONCE:
+ return new RabbitMQSinkWriterAtMostOnce<>(
+ connectionConfig,
+ queueName,
+ serializationSchema,
+ publishOptions,
+ returnListener);
+ case AT_LEAST_ONCE:
+ return new RabbitMQSinkWriterAtLeastOnce<>(
+ connectionConfig,
+ queueName,
+ serializationSchema,
+ publishOptions,
+ returnListener,
+ states);
+ case EXACTLY_ONCE:
+ return new RabbitMQSinkWriterExactlyOnce<>(
+ connectionConfig,
+ queueName,
+ serializationSchema,
+ publishOptions,
+ returnListener,
+ states);
+ default:
+ throw new RuntimeException(
Review comment:
Nit: This can be an `IllegalStateException`
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * The source enumerator provides the source readers with the split. All
source readers receive the
+ * same split as it only contains information about the connection and in case
of exactly-once, the
+ * seen correlation ids. But in this case, the enumerator makes sure that at
maximum one source
+ * reader receives the split. During exactly-once if multiple reader should be
assigned a split a
+ * {@link RuntimeException} is thrown.
+ */
+public class RabbitMQSourceEnumerator
+ implements SplitEnumerator<RabbitMQSourceSplit,
RabbitMQSourceEnumState> {
+ private final SplitEnumeratorContext<RabbitMQSourceSplit> context;
+ private final ConsistencyMode consistencyMode;
+ private static final Logger LOG =
LoggerFactory.getLogger(RabbitMQSourceEnumerator.class);
+ private RabbitMQSourceSplit split;
+
+ public RabbitMQSourceEnumerator(
+ SplitEnumeratorContext<RabbitMQSourceSplit> context,
+ ConsistencyMode consistencyMode,
+ RabbitMQConnectionConfig connectionConfig,
+ String rmqQueueName,
+ RabbitMQSourceEnumState enumState) {
+ // The enumState is not used since the enumerator has no state in this
architecture.
+ this(context, consistencyMode, connectionConfig, rmqQueueName);
+ }
+
+ public RabbitMQSourceEnumerator(
+ SplitEnumeratorContext<RabbitMQSourceSplit> context,
+ ConsistencyMode consistencyMode,
+ RabbitMQConnectionConfig connectionConfig,
+ String rmqQueueName) {
+
+ if (consistencyMode == ConsistencyMode.EXACTLY_ONCE &&
context.currentParallelism() > 1) {
+ throw new RuntimeException(
+ "The consistency mode is exactly-once and a parallelism
higher than one was defined. "
+ + "For exactly once a parallelism higher than one
is forbidden.");
+ }
Review comment:
I know we have discussed this offline but I think during upscaling we do
not create a new enumerator which can lead to that someone tries to scale up
with exactly once mode.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/state/RabbitMQSinkWriterStateSerializer.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.state;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for a {@link RabbitMQSinkWriterState} used for at-least and
exactly-once behaviour of
+ * the sink.
+ */
+public class RabbitMQSinkWriterStateSerializer<T>
+ implements SimpleVersionedSerializer<RabbitMQSinkWriterState<T>> {
+ private final DeserializationSchema<T> deserializationSchema;
+
+ public RabbitMQSinkWriterStateSerializer(
+ @Nullable DeserializationSchema<T> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ public RabbitMQSinkWriterStateSerializer() {
+ this(null);
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ /**
+ * Serializes all {@code outstandingMessages} of a state of a single sink
writer.
+ *
+ * @param rabbitMQSinkWriterState A state containing a list of {@code
outstandingMessages}
+ * @throws IOException If output stream cant write the required data
+ */
+ @Override
+ public byte[] serialize(RabbitMQSinkWriterState<T>
rabbitMQSinkWriterState) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ serializeV1(out, rabbitMQSinkWriterState.getOutstandingMessages());
+ return baos.toByteArray();
+ }
+
+ private void serializeV1(DataOutputStream out,
List<RabbitMQSinkMessageWrapper<T>> messages)
+ throws IOException {
+ out.writeInt(messages.size());
+ for (RabbitMQSinkMessageWrapper<T> message : messages) {
+ out.writeInt(message.getBytes().length);
+ out.write(message.getBytes());
+ }
+ out.flush();
+ }
+
+ /**
+ * Deserializes {@link RabbitMQSinkMessageWrapper} objects that wrap the
byte representation of
+ * a message that needs to be delivered to RabbitMQ as well as the
original object
+ * representation if a deserialization schema is provided.
+ *
+ * @param version which deserialization version should be used
+ * @param bytes Serialized outstanding sink messages
+ * @return A list of messages that need to be redelivered to RabbitMQ
+ * @throws IOException If input stream cant read the required data
+ */
+ @Override
+ public RabbitMQSinkWriterState<T> deserialize(int version, byte[] bytes)
throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(bytes);
+ default:
+ throw new IOException("Unrecognized version or corrupt state:
" + version);
+ }
+ }
+
+ private RabbitMQSinkWriterState<T> deserializeV1(byte[] bytes) throws
IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputStream(bais);
+ return new RabbitMQSinkWriterState<>(readSinkMessages(in));
+ }
+
+ private List<RabbitMQSinkMessageWrapper<T>>
readSinkMessages(DataInputStream in)
+ throws IOException {
+ final int numberOfMessages = in.readInt();
+ List<RabbitMQSinkMessageWrapper<T>> messages = new ArrayList<>();
+ for (int i = 0; i < numberOfMessages; i++) {
+ byte[] bytes = in.readNBytes(in.readInt());
Review comment:
The method `readNBytes` was introduced with java 9 AFAIK but Flink
builds with java 8 so please double check your changes with the correct jdk
version.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumState;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumStateSerializer;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import
org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase;
+import
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce;
+import
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtMostOnce;
+import
org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderExactlyOnce;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import
org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It
provides
+ * at-most-once, at-least-once and exactly-once processing semantics. For
at-least-once and
+ * exactly-once, checkpointing needs to be enabled. The source operates as a
StreamingSource and
+ * thus works in a streaming fashion. Please use a {@link
RabbitMQSourceBuilder} to construct the
+ * source. The following example shows how to create a RabbitMQSource emitting
records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * RabbitMQSource<String> source = RabbitMQSource
+ * .<String>builder()
+ * .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+ * .setQueueName("myQueue")
+ * .setDeliveryDeserializer(new SimpleStringSchema())
+ * .setConsistencyMode(MY_CONSISTENCY_MODE)
+ * .build();
+ * }</pre>
+ *
+ * <p>When creating the source a {@code connectionConfig} must be specified
via {@link
+ * RabbitMQConnectionConfig}. It contains required information for the
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to consume from
and a {@link
+ * DeserializationSchema}
+ *
+ * <p>When using at-most-once consistency, messages are automatically
acknowledged when received
+ * from RabbitMQ and later consumed by the output. In case of a failure,
messages might be lost.
+ * More details in {@link RabbitMQSourceReaderAtMostOnce}.
+ *
+ * <p>In case of at-least-once consistency, message are buffered and later
consumed by the output.
+ * Once a checkpoint is finished, the messages that were consumed by the
output are acknowledged to
+ * RabbitMQ. This way, we ensure that the messages are successfully received
by the output. In case
+ * of a system failure, the message that were acknowledged to RabbitMQ will be
resend by RabbitMQ.
+ * More details in {@link RabbitMQSourceReaderAtLeastOnce}.
+ *
+ * <p>To ensure exactly-once consistency, messages are deduplicated through
{@code correlationIds}.
+ * Similar to at-least-once consistency, we store the {@code deliveryTags} of
the messages that are
+ * consumed by the output to acknowledge them later. A transactional RabbitMQ
channel is used to
+ * ensure that all messages are successfully acknowledged to RabbitMQ. More
details in {@link
+ * RabbitMQSourceReaderExactlyOnce}.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and
performance will drop. Under
+ * heavy load, checkpoints can be delayed if a transaction takes longer than
the specified
+ * checkpointing interval.
+ *
+ * @param <T> the output type of the source.
+ */
+public class RabbitMQSource<T>
+ implements Source<T, RabbitMQSourceSplit, RabbitMQSourceEnumState>,
ResultTypeQueryable<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RabbitMQSource.class);
+
+ private final RabbitMQConnectionConfig connectionConfig;
+ private final String queueName;
+ private final DeserializationSchema<T> deserializationSchema;
+ private final ConsistencyMode consistencyMode;
+
+ public RabbitMQSource(
Review comment:
Either you provide a builder or a public constructor, having both might
confuse users.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.GlobalBoolean;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the rabbitmq sink with different consistency modes. As the
tests are working a lot
+ * with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSinkTest extends RabbitMQBaseTest {
+ @Test
+ public void atMostOnceTest() throws Exception {
+ List<String> messages = getRandomMessages(100);
+ CountDownLatch latch =
setContainerClientCountDownLatch(messages.size());
Review comment:
I find the current way the latch is handled very complicated. I think
you should restructure it that the latch is nowhere an instance variable that
it cannot live across tests. You may have to give up your inheritance model ;)
Overall it's a great idea to build a basic consumer!
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.reader;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import
org.apache.flink.connector.rabbitmq2.source.common.RabbitMQMessageWrapper;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The source reader for RabbitMQ queues. This is the base class of the
different consistency modes.
+ *
+ * @param <T> The output type of the source.
+ */
+public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T,
RabbitMQSourceSplit> {
+ protected static final Logger LOG =
LoggerFactory.getLogger(RabbitMQSourceReaderBase.class);
+
+ // The assigned split from the enumerator.
+ private RabbitMQSourceSplit split;
+
+ private Connection rmqConnection;
+ private Channel rmqChannel;
+
+ private final SourceReaderContext sourceReaderContext;
+ // The deserialization schema for the messages of RabbitMQ.
+ private final DeserializationSchema<T> deliveryDeserializer;
+ // The collector keeps the messages received from RabbitMQ.
+ private final RabbitMQCollector<T> collector;
+
+ public RabbitMQSourceReaderBase(
+ SourceReaderContext sourceReaderContext,
+ DeserializationSchema<T> deliveryDeserializer) {
+ this.sourceReaderContext = sourceReaderContext;
+ this.deliveryDeserializer = deliveryDeserializer;
+ this.collector = new RabbitMQCollector<>();
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting source reader and send split request");
+ sourceReaderContext.sendSplitRequest();
+ }
+
+ // ------------- start RabbitMQ methods --------------
+
+ private void setupRabbitMQ() {
+ try {
+ setupConnection();
+ setupChannel();
+ LOG.info(
+ "RabbitMQ Connection was successful: Waiting for messages
from the queue. To exit press CTRL+C");
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ }
+
+ private ConnectionFactory setupConnectionFactory() throws Exception {
+ return split.getConnectionConfig().getConnectionFactory();
+ }
+
+ private void setupConnection() throws Exception {
+ rmqConnection = setupConnectionFactory().newConnection();
+ }
+
+ /** @return boolean whether messages should be automatically acknowledged
to RabbitMQ. */
+ protected abstract boolean isAutoAck();
+
+ /**
+ * This function will be called when a new message from RabbitMQ gets
pushed to the source. The
+ * message will be deserialized and forwarded to our message collector
where it is buffered
+ * until it can be processed.
+ *
+ * @param consumerTag The consumer tag of the message.
+ * @param delivery The delivery from RabbitMQ.
+ * @throws IOException if something fails during deserialization.
+ */
+ protected void handleMessageReceivedCallback(String consumerTag, Delivery
delivery)
+ throws IOException {
+ AMQP.BasicProperties properties = delivery.getProperties();
+ byte[] body = delivery.getBody();
+ Envelope envelope = delivery.getEnvelope();
+ collector.setMessageIdentifiers(properties.getCorrelationId(),
envelope.getDeliveryTag());
+ deliveryDeserializer.deserialize(body, collector);
+ }
+
+ protected void setupChannel() throws IOException {
+ rmqChannel = rmqConnection.createChannel();
+ rmqChannel.queueDeclare(split.getQueueName(), true, false, false,
null);
+
+ // Set maximum of unacknowledged messages
+ if (getSplit().getConnectionConfig().getPrefetchCount().isPresent()) {
+ // global: false - the prefetch count is set per consumer, not per
RabbitMQ channel
+
rmqChannel.basicQos(getSplit().getConnectionConfig().getPrefetchCount().get(),
false);
+ }
+
+ final DeliverCallback deliverCallback =
this::handleMessageReceivedCallback;
+ rmqChannel.basicConsume(
+ split.getQueueName(), isAutoAck(), deliverCallback,
consumerTag -> {});
+ }
+
+ // ------------- end RabbitMQ methods --------------
+
+ /**
+ * This method provides a hook that is called when a message gets polled
by the output.
+ *
+ * @param message the message that was polled by the output.
+ */
+ protected void handleMessagePolled(RabbitMQMessageWrapper<T> message) {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<T> output) {
+ RabbitMQMessageWrapper<T> message = collector.pollMessage();
+
+ if (message == null) {
+ return InputStatus.NOTHING_AVAILABLE;
+ }
+
+ output.collect(message.getMessage());
+ handleMessagePolled(message);
+
+ return collector.hasUnpolledMessages()
+ ? InputStatus.MORE_AVAILABLE
+ : InputStatus.NOTHING_AVAILABLE;
+ }
+
+ @Override
+ public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+ return split != null ? Collections.singletonList(split.copy()) : new
ArrayList<>();
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return CompletableFuture.runAsync(
+ () -> {
+ while (!collector.hasUnpolledMessages()) {
+ // supposed to be empty
+ }
+ });
+ }
+
+ /**
+ * Assign the split from the enumerator. If the source reader already has
a split nothing
+ * happens. After the split is assigned, the connection to RabbitMQ can be
setup.
+ *
+ * @param list RabbitMQSourceSplits with only one element.
+ * @see RabbitMQSourceEnumerator
+ * @see RabbitMQSourceSplit
+ */
+ @Override
+ public void addSplits(List<RabbitMQSourceSplit> list) {
+ if (split != null) {
+ return;
+ }
+ if (list.size() != 1) {
+ throw new RuntimeException("The number of added splits should be
exaclty one.");
+ }
+ split = list.get(0);
+ setupRabbitMQ();
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {}
+
+ @Override
+ public void handleSourceEvents(SourceEvent sourceEvent) {}
Review comment:
I think you can remove this method because the interface provides a
default implementation and you never overwrite it.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.reader.specialized;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.flink.connector.rabbitmq2.source.common.RabbitMQMessageWrapper;
+import
org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * The RabbitMQSourceReaderExactlyOnce provides exactly-once guarantee. The
deliveryTag from the
+ * received messages are used to acknowledge the messages once it is assured
that they are safely
+ * consumed by the output. In addition, correlation ids are used to
deduplicate messages. Messages
+ * polled by the output are stored so they can be later acknowledged. During a
checkpoint the
+ * messages that were polled since the last checkpoint are associated with the
id of the current
+ * checkpoint. Once the checkpoint is completed, the messages for the
checkpoint are acknowledged in
+ * a transaction to assure that RabbitMQ successfully receives the
acknowledgements.
+ *
+ * <p>In order for the exactly-once source reader to work, checkpointing needs
to be enabled and the
+ * message from RabbitMQ need to have a correlation id.
+ *
+ * @param <T> The output type of the source.
+ * @see RabbitMQSourceReaderBase
+ */
+public class RabbitMQSourceReaderExactlyOnce<T> extends
RabbitMQSourceReaderBase<T> {
+ // Message that were polled by the output since the last checkpoint was
created.
+ // These messages are currently forward but not yet acknowledged to
RabbitMQ.
+ // It needs to be ensured they are persisted before they can be
acknowledged and thus be delete
+ // in RabbitMQ.
+ private final List<RabbitMQMessageWrapper<T>>
+ polledAndUnacknowledgedMessagesSinceLastCheckpoint;
+
+ // All message in polledAndUnacknowledgedMessagesSinceLastCheckpoint will
move to hear when
+ // a new checkpoint is created and therefore the messages can be mapped to
it. This mapping is
+ // necessary to ensure we acknowledge only message which belong to a
completed checkpoint.
+ private final BlockingQueue<Tuple2<Long, List<RabbitMQMessageWrapper<T>>>>
+ polledAndUnacknowledgedMessagesPerCheckpoint;
+
+ // Set of correlation ids that have been seen and are not acknowledged yet.
+ // The message publisher (who pushes the messages to RabbitMQ) is
obligated to set the
+ // correlation id per message and ensure their uniqueness.
+ private final ConcurrentHashMap.KeySetView<String, Boolean> correlationIds;
+
+ public RabbitMQSourceReaderExactlyOnce(
+ SourceReaderContext sourceReaderContext,
+ DeserializationSchema<T> deliveryDeserializer) {
+ super(sourceReaderContext, deliveryDeserializer);
+ this.polledAndUnacknowledgedMessagesSinceLastCheckpoint =
+ Collections.synchronizedList(new ArrayList<>());
+ this.polledAndUnacknowledgedMessagesPerCheckpoint = new
LinkedBlockingQueue<>();
+ this.correlationIds = ConcurrentHashMap.newKeySet();
+ }
+
+ @Override
+ protected boolean isAutoAck() {
+ return false;
+ }
+
+ @Override
+ protected void handleMessagePolled(RabbitMQMessageWrapper<T> message) {
+ this.polledAndUnacknowledgedMessagesSinceLastCheckpoint.add(message);
+ }
+
+ @Override
+ protected void handleMessageReceivedCallback(String consumerTag, Delivery
delivery)
+ throws IOException {
+ AMQP.BasicProperties properties = delivery.getProperties();
+ String correlationId = properties.getCorrelationId();
+ Preconditions.checkNotNull(
+ correlationId,
+ "RabbitMQ source was instantiated "
+ + "with consistencyMode set EXACTLY_ONCE yet we
couldn't extract the correlation id from it !");
+
+ Envelope envelope = delivery.getEnvelope();
+ long deliveryTag = envelope.getDeliveryTag();
+
+ if (correlationIds.add(correlationId)) {
+ // Handle the message only if the correlation id hasn't been seen
before.
+ // The message will follow the normal process and be acknowledge
when it got polled.
+ super.handleMessageReceivedCallback(consumerTag, delivery);
+ } else {
+ // Otherwise, store the new delivery-tag for later
acknowledgments. The correlation id
+ // was seen before and therefore this is a duplicate received from
RabbitMQ.
+ // Instead of letting the message to be polled, the message will
directly be marked
+ // to be acknowledged in the next wave of acknowledgments under
their new deliveryTag.
+ polledAndUnacknowledgedMessagesSinceLastCheckpoint.add(
+ new RabbitMQMessageWrapper<>(deliveryTag, correlationId));
+ }
+ }
+
+ @Override
+ public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+ Tuple2<Long, List<RabbitMQMessageWrapper<T>>> tuple =
+ new Tuple2<>(checkpointId,
polledAndUnacknowledgedMessagesSinceLastCheckpoint);
+ polledAndUnacknowledgedMessagesPerCheckpoint.add(tuple);
+ polledAndUnacknowledgedMessagesSinceLastCheckpoint.clear();
+
+ if (getSplit() != null) {
+ getSplit().setCorrelationIds(correlationIds);
+ }
+ return super.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void addSplits(List<RabbitMQSourceSplit> list) {
+ super.addSplits(list);
+ correlationIds.addAll(getSplit().getCorrelationIds());
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ Iterator<Tuple2<Long, List<RabbitMQMessageWrapper<T>>>>
checkpointIterator =
+ polledAndUnacknowledgedMessagesPerCheckpoint.iterator();
+ while (checkpointIterator.hasNext()) {
+ final Tuple2<Long, List<RabbitMQMessageWrapper<T>>> nextCheckpoint
=
+ checkpointIterator.next();
+ long nextCheckpointId = nextCheckpoint.f0;
+ if (nextCheckpointId <= checkpointId) {
+ try {
+ acknowledgeMessages(nextCheckpoint.f1);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Messages could not be acknowledged during
checkpoint complete.", e);
+ }
Review comment:
You can propagate the `IOException`.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.reader.specialized;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.flink.connector.rabbitmq2.source.common.RabbitMQMessageWrapper;
+import
org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * The RabbitMQSourceReaderAtLeastOnce provides at-least-once guarantee. The
deliveryTag from the
+ * received messages are used to acknowledge the messages once it is assured
that they are safely
+ * consumed by the output. This means that the deliveryTags of the messages
that were polled by the
+ * output are stored separately. Once a snapshot is executed the deliveryTags
get associated with
+ * the checkpoint id. When the checkpoint is completed successfully, all
messages from before are
+ * acknowledged. In the case of a system failure and a successful restart, the
messages that are
+ * unacknowledged, are resend by RabbitMQ. This way at-least-once is
guaranteed.
+ *
+ * <p>In order for the at-least-once source reader to work, checkpointing
needs to be enabled.
+ *
+ * @param <T> The output type of the source.
+ * @see RabbitMQSourceReaderBase
+ */
+public class RabbitMQSourceReaderAtLeastOnce<T> extends
RabbitMQSourceReaderBase<T> {
+ // DeliveryTags which corresponding messages were polled by the output
since the last
+ // checkpoint.
+ private final List<Long> polledAndUnacknowledgedMessageIds;
+ // List of tuples of checkpoint id and deliveryTags that were polled by
the output since the
+ // last checkpoint.
+ private final BlockingQueue<Tuple2<Long, List<Long>>>
+ polledAndUnacknowledgedMessageIdsPerCheckpoint;
+
+ public RabbitMQSourceReaderAtLeastOnce(
+ SourceReaderContext sourceReaderContext,
+ DeserializationSchema<T> deliveryDeserializer) {
+ super(sourceReaderContext, deliveryDeserializer);
+ this.polledAndUnacknowledgedMessageIds =
Collections.synchronizedList(new ArrayList<>());
+ this.polledAndUnacknowledgedMessageIdsPerCheckpoint = new
LinkedBlockingQueue<>();
+ }
+
+ @Override
+ protected boolean isAutoAck() {
+ return false;
+ }
+
+ @Override
+ protected void handleMessagePolled(RabbitMQMessageWrapper<T> message) {
+ this.polledAndUnacknowledgedMessageIds.add(message.getDeliveryTag());
+ }
+
+ @Override
+ public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+ Tuple2<Long, List<Long>> tuple =
+ new Tuple2<>(checkpointId, polledAndUnacknowledgedMessageIds);
+ polledAndUnacknowledgedMessageIdsPerCheckpoint.add(tuple);
+ polledAndUnacknowledgedMessageIds.clear();
+
+ return super.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ Iterator<Tuple2<Long, List<Long>>> checkpointIterator =
+ polledAndUnacknowledgedMessageIdsPerCheckpoint.iterator();
+ while (checkpointIterator.hasNext()) {
+ final Tuple2<Long, List<Long>> nextCheckpoint =
checkpointIterator.next();
+ long nextCheckpointId = nextCheckpoint.f0;
+ if (nextCheckpointId <= checkpointId) {
+ try {
+ acknowledgeMessageIds(nextCheckpoint.f1);
+ } catch (IOException e) {
+ throw new RuntimeException(
Review comment:
Why do you wrap the `IOException`? The interface allows throwing
exceptions from this method.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.reader;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import
org.apache.flink.connector.rabbitmq2.source.common.RabbitMQMessageWrapper;
+import
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The source reader for RabbitMQ queues. This is the base class of the
different consistency modes.
+ *
+ * @param <T> The output type of the source.
+ */
+public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T,
RabbitMQSourceSplit> {
+ protected static final Logger LOG =
LoggerFactory.getLogger(RabbitMQSourceReaderBase.class);
+
+ // The assigned split from the enumerator.
+ private RabbitMQSourceSplit split;
+
+ private Connection rmqConnection;
+ private Channel rmqChannel;
+
+ private final SourceReaderContext sourceReaderContext;
+ // The deserialization schema for the messages of RabbitMQ.
+ private final DeserializationSchema<T> deliveryDeserializer;
+ // The collector keeps the messages received from RabbitMQ.
+ private final RabbitMQCollector<T> collector;
+
+ public RabbitMQSourceReaderBase(
+ SourceReaderContext sourceReaderContext,
+ DeserializationSchema<T> deliveryDeserializer) {
+ this.sourceReaderContext = sourceReaderContext;
+ this.deliveryDeserializer = deliveryDeserializer;
+ this.collector = new RabbitMQCollector<>();
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting source reader and send split request");
+ sourceReaderContext.sendSplitRequest();
+ }
+
+ // ------------- start RabbitMQ methods --------------
+
+ private void setupRabbitMQ() {
+ try {
+ setupConnection();
+ setupChannel();
+ LOG.info(
+ "RabbitMQ Connection was successful: Waiting for messages
from the queue. To exit press CTRL+C");
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ }
+
+ private ConnectionFactory setupConnectionFactory() throws Exception {
+ return split.getConnectionConfig().getConnectionFactory();
+ }
+
+ private void setupConnection() throws Exception {
+ rmqConnection = setupConnectionFactory().newConnection();
+ }
+
+ /** @return boolean whether messages should be automatically acknowledged
to RabbitMQ. */
+ protected abstract boolean isAutoAck();
+
+ /**
+ * This function will be called when a new message from RabbitMQ gets
pushed to the source. The
+ * message will be deserialized and forwarded to our message collector
where it is buffered
+ * until it can be processed.
+ *
+ * @param consumerTag The consumer tag of the message.
+ * @param delivery The delivery from RabbitMQ.
+ * @throws IOException if something fails during deserialization.
+ */
+ protected void handleMessageReceivedCallback(String consumerTag, Delivery
delivery)
+ throws IOException {
+ AMQP.BasicProperties properties = delivery.getProperties();
+ byte[] body = delivery.getBody();
+ Envelope envelope = delivery.getEnvelope();
+ collector.setMessageIdentifiers(properties.getCorrelationId(),
envelope.getDeliveryTag());
+ deliveryDeserializer.deserialize(body, collector);
+ }
+
+ protected void setupChannel() throws IOException {
+ rmqChannel = rmqConnection.createChannel();
+ rmqChannel.queueDeclare(split.getQueueName(), true, false, false,
null);
+
+ // Set maximum of unacknowledged messages
+ if (getSplit().getConnectionConfig().getPrefetchCount().isPresent()) {
+ // global: false - the prefetch count is set per consumer, not per
RabbitMQ channel
+
rmqChannel.basicQos(getSplit().getConnectionConfig().getPrefetchCount().get(),
false);
+ }
+
+ final DeliverCallback deliverCallback =
this::handleMessageReceivedCallback;
+ rmqChannel.basicConsume(
+ split.getQueueName(), isAutoAck(), deliverCallback,
consumerTag -> {});
+ }
+
+ // ------------- end RabbitMQ methods --------------
+
+ /**
+ * This method provides a hook that is called when a message gets polled
by the output.
+ *
+ * @param message the message that was polled by the output.
+ */
+ protected void handleMessagePolled(RabbitMQMessageWrapper<T> message) {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<T> output) {
+ RabbitMQMessageWrapper<T> message = collector.pollMessage();
+
+ if (message == null) {
+ return InputStatus.NOTHING_AVAILABLE;
+ }
+
+ output.collect(message.getMessage());
+ handleMessagePolled(message);
+
+ return collector.hasUnpolledMessages()
+ ? InputStatus.MORE_AVAILABLE
+ : InputStatus.NOTHING_AVAILABLE;
+ }
+
+ @Override
+ public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+ return split != null ? Collections.singletonList(split.copy()) : new
ArrayList<>();
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return CompletableFuture.runAsync(
+ () -> {
+ while (!collector.hasUnpolledMessages()) {
+ // supposed to be empty
+ }
+ });
+ }
+
+ /**
+ * Assign the split from the enumerator. If the source reader already has
a split nothing
+ * happens. After the split is assigned, the connection to RabbitMQ can be
setup.
+ *
+ * @param list RabbitMQSourceSplits with only one element.
+ * @see RabbitMQSourceEnumerator
+ * @see RabbitMQSourceSplit
+ */
+ @Override
+ public void addSplits(List<RabbitMQSourceSplit> list) {
+ if (split != null) {
Review comment:
Can this case happen in a normal environment? I guess it is also okay to
fail if the enumerator tries to assign a second split it seems at first glance
like a bug if it happens.
##########
File path:
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.GlobalBoolean;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the rabbitmq sink with different consistency modes. As the
tests are working a lot
+ * with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSinkTest extends RabbitMQBaseTest {
+ @Test
+ public void atMostOnceTest() throws Exception {
+ List<String> messages = getRandomMessages(100);
+ CountDownLatch latch =
setContainerClientCountDownLatch(messages.size());
+
+ DataStream<String> stream = env.fromCollection(messages);
+ addSinkOn(stream, ConsistencyMode.AT_MOST_ONCE);
+ env.execute();
+ latch.await();
+
+ List<String> receivedMessages = getMessageFromRabbit();
+ assertEquals(messages, receivedMessages);
+ }
+
+ @Test
+ public void atLeastOnceTest() throws Exception {
+ List<String> messages = getRandomMessages(100);
+ CountDownLatch latch =
setContainerClientCountDownLatch(messages.size());
+
+ DataStream<String> stream = env.fromCollection(messages);
+ addSinkOn(stream, ConsistencyMode.AT_LEAST_ONCE);
+
+ env.execute();
+ latch.await();
+
+ List<String> receivedMessages = getMessageFromRabbit();
+ assertEquals(messages, receivedMessages);
+ }
+
+ @Test
+ public void atLeastOnceWithFlinkFailureTest() throws Exception {
+ List<String> messages = getRandomMessages(100);
+ CountDownLatch latch =
setContainerClientCountDownLatch(messages.size());
+
+ DataStream<String> stream = env.fromCollection(messages);
+ GlobalBoolean shouldFail = new GlobalBoolean(true);
Review comment:
As I wrote before the MiniCluster should help you that you do not use
these static variables anymore.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]