fapaul commented on a change in pull request #15140:
URL: https://github.com/apache/flink/pull/15140#discussion_r597494080



##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQBaseTest.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.rabbitmq2.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq2.source.RabbitMQSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The base class for RabbitMQ tests. It sets up a flink cluster and a docker 
image for RabbitMQ. It
+ * provides behavior to easily add onto the stream, send message to RabbitMQ 
and get the messages in
+ * RabbitMQ.
+ */
+public abstract class RabbitMQBaseTest {
+
+    private static final int RABBITMQ_PORT = 5672;
+    private RabbitMQContainerClient<String> client;
+
+    @Rule public Timeout globalTimeout = Timeout.seconds(20);
+
+    @Rule
+    public MiniClusterWithClientResource flinkCluster =

Review comment:
       This is never used. Please remove.

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceITCase.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ source with different consistency modes. As the 
tests are working a
+ * lot with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSourceITCase extends RabbitMQBaseTest {
+
+    private static final List<String> collectedMessages =
+            Collections.synchronizedList(new ArrayList<>());
+    private static CountDownLatch messageLatch;
+    private static CountDownLatch checkpointLatch;
+    private static int failAtNthMessage;
+
+    @Before
+    public void setup() {
+        collectedMessages.clear();
+        failAtNthMessage = -1;
+        messageLatch = null;
+    }
+
+    /** CollectSink to access the messages from the stream. */
+    public static class CollectSink implements SinkFunction<String>, 
CheckpointListener {
+        @Override
+        public void invoke(String value, Context context) throws Exception {
+            if (failAtNthMessage > 0) {
+                failAtNthMessage -= 1;
+                if (failAtNthMessage == 0) {
+                    throw new Exception("This is supposed to be thrown.");
+                }
+            }
+            collectedMessages.add(value);
+            messageLatch.countDown();
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long l) {
+            if (checkpointLatch != null) {
+                checkpointLatch.countDown();

Review comment:
       Very good 👍 

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceITCase.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ source with different consistency modes. As the 
tests are working a
+ * lot with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSourceITCase extends RabbitMQBaseTest {
+
+    private static final List<String> collectedMessages =
+            Collections.synchronizedList(new ArrayList<>());
+    private static CountDownLatch messageLatch;
+    private static CountDownLatch checkpointLatch;
+    private static int failAtNthMessage;
+
+    @Before
+    public void setup() {
+        collectedMessages.clear();
+        failAtNthMessage = -1;
+        messageLatch = null;
+    }
+
+    /** CollectSink to access the messages from the stream. */
+    public static class CollectSink implements SinkFunction<String>, 
CheckpointListener {

Review comment:
       private?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkITCase.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQContainerClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ sink with different consistency modes. As the 
tests are working a lot
+ * with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSinkITCase extends RabbitMQBaseTest {
+
+    private static AtomicBoolean shouldFail;
+
+    @Before
+    public void setup() {
+        shouldFail = new AtomicBoolean(true);
+    }
+
+    private class GeneratorFailureSource implements SourceFunction<String> {
+
+        private final BlockingQueue<String> messagesToSend;
+        private int failAtNthMessage;
+
+        public GeneratorFailureSource(BlockingQueue<String> messagesToSend, 
int failAtNthMessage) {
+            this.messagesToSend = messagesToSend;
+            this.failAtNthMessage = failAtNthMessage;

Review comment:
       The value of `failAtNthMessage` is never changed. Is this correct?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSourceITCase.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ source with different consistency modes. As the 
tests are working a
+ * lot with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSourceITCase extends RabbitMQBaseTest {
+
+    private static final List<String> collectedMessages =
+            Collections.synchronizedList(new ArrayList<>());
+    private static CountDownLatch messageLatch;
+    private static CountDownLatch checkpointLatch;
+    private static int failAtNthMessage;
+
+    @Before
+    public void setup() {
+        collectedMessages.clear();
+        failAtNthMessage = -1;
+        messageLatch = null;
+    }
+
+    /** CollectSink to access the messages from the stream. */
+    public static class CollectSink implements SinkFunction<String>, 
CheckpointListener {
+        @Override
+        public void invoke(String value, Context context) throws Exception {
+            if (failAtNthMessage > 0) {
+                failAtNthMessage -= 1;
+                if (failAtNthMessage == 0) {
+                    throw new Exception("This is supposed to be thrown.");
+                }
+            }
+            collectedMessages.add(value);
+            messageLatch.countDown();
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long l) {
+            if (checkpointLatch != null) {
+                checkpointLatch.countDown();
+            }
+        }
+    }
+
+    // --------------- at most once ---------------
+    @Test
+    public void atMostOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+        messageLatch = new CountDownLatch(messages.size());
+
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.AT_MOST_ONCE);
+        stream.addSink(new CollectSink());
+        env.executeAsync();
+
+        sendToRabbit(messages);
+        messageLatch.await();
+
+        assertEquals(
+                CollectionUtils.getCardinalityMap(messages),
+                CollectionUtils.getCardinalityMap(collectedMessages));
+    }
+
+    // --------------- at least once ---------------
+    @Test
+    public void atLeastOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.AT_LEAST_ONCE);
+        messageLatch = new CountDownLatch(messages.size());
+        stream.addSink(new CollectSink());
+        env.executeAsync();
+
+        sendToRabbit(messages);
+        messageLatch.await();
+
+        assertEquals(
+                CollectionUtils.getCardinalityMap(messages),
+                CollectionUtils.getCardinalityMap(collectedMessages));
+    }
+
+    @Test
+    public void atLeastOnceFailureTest() throws Exception {
+        // An exception is thrown in the MapFunction in order to trigger a 
restart of Flink and it
+        // is assured that the source receives the messages again.
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.AT_LEAST_ONCE);
+
+        List<String> messages = getSequentialMessages(100);
+        failAtNthMessage = 30;
+        messageLatch = new CountDownLatch(messages.size() + failAtNthMessage - 
1);
+        stream.addSink(new CollectSink());
+
+        env.executeAsync();
+
+        sendToRabbit(messages);
+        messageLatch.await();
+
+        assertTrue(collectedMessages.containsAll(messages));
+    }
+
+    // --------------- exactly once ---------------
+    @Test
+    public void exactlyOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(1000);
+        messageLatch = new CountDownLatch(messages.size());
+
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.EXACTLY_ONCE);
+        stream.addSink(new CollectSink());
+        env.executeAsync();
+
+        // use messages as correlation ids here
+        sendToRabbit(messages, messages);
+
+        messageLatch.await();
+
+        assertEquals(messages, collectedMessages);
+    }
+
+    @Test
+    public void exactlyOnceFilterCorrelationIdsTest() throws Exception {
+        List<String> messages = getRandomMessages(5);
+        messageLatch = new CountDownLatch(3);
+
+        env.enableCheckpointing(5000);
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.EXACTLY_ONCE);
+        stream.addSink(new CollectSink());
+        env.executeAsync();
+
+        List<String> correlationIds = Arrays.asList("1", "2", "3", "3", "3");
+        sendToRabbit(messages, correlationIds);
+
+        messageLatch.await();
+
+        List<String> expectedMessages = messages.subList(0, 3);
+        assertEquals(expectedMessages, collectedMessages);
+    }
+
+    @Test
+    public void exactlyOnceWithFailureAndMessageDuplicationTest() throws 
Exception {
+        // An exception is thrown in order to trigger a restart of Flink and it
+        // is assured that the system receives the messages only once. We 
disable
+        // (by setting the interval higher than the test duration) checkpoint 
to
+        // expect receiving all pre-exception messages once again.
+        env.enableCheckpointing(500000);
+        DataStream<String> stream = addSourceOn(env, 
ConsistencyMode.EXACTLY_ONCE);
+
+        List<String> messages = getRandomMessages(100);
+
+        int originalFailAthNthMessage = 30;
+        failAtNthMessage = originalFailAthNthMessage;
+        messageLatch = new CountDownLatch(messages.size() + failAtNthMessage - 
1);
+        stream.addSink(new CollectSink());
+        env.executeAsync();
+
+        sendToRabbit(messages, messages);
+        messageLatch.await();
+
+        List<String> expectedMessage =
+                collectedMessages.subList(originalFailAthNthMessage - 1, 
collectedMessages.size());
+        assertEquals(messages, expectedMessage);
+    }
+
+    @Test
+    public void exactlyOnceWithFailureWithNoMessageDuplicationTest() throws 
Exception {

Review comment:
       I am not sure I understand the difference between both tests. Can you 
either rename one of them or add a small docstring want kind of scenario you 
are testing?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.source.reader;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.connector.rabbitmq2.source.common.RabbitMQSourceMessageWrapper;
+import 
org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source reader for RabbitMQ queues. This is the base class of the 
different consistency modes.
+ *
+ * @param <T> The output type of the source.
+ */
+public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T, 
RabbitMQSourceSplit> {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSourceReaderBase.class);
+
+    // The assigned split from the enumerator.
+    private RabbitMQSourceSplit split;
+
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    private final SourceReaderContext sourceReaderContext;
+    // The deserialization schema for the messages of RabbitMQ.
+    private final DeserializationSchema<T> deliveryDeserializer;
+    // The collector keeps the messages received from RabbitMQ.
+    private final RabbitMQCollector<T> collector;
+
+    public RabbitMQSourceReaderBase(
+            SourceReaderContext sourceReaderContext,
+            DeserializationSchema<T> deliveryDeserializer) {
+        this.sourceReaderContext = requireNonNull(sourceReaderContext);
+        this.deliveryDeserializer = requireNonNull(deliveryDeserializer);
+        this.collector = new RabbitMQCollector<>();
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting source reader and send split request");
+        sourceReaderContext.sendSplitRequest();
+    }
+
+    // ------------- start RabbitMQ methods  --------------
+
+    private void setupRabbitMQ() {
+        try {
+            setupConnection();
+            setupChannel();
+            LOG.info(
+                    "RabbitMQ Connection was successful: Waiting for messages 
from the queue. To exit press CTRL+C");
+        } catch (Exception e) {
+            LOG.error(e.getMessage());

Review comment:
       This case should probably be fatal and throw a `RuntimeException`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to