Repository: flink
Updated Branches:
  refs/heads/master 45edafda5 -> a0c3b879b


[FLINK-4053] Add tests for RMQ sink and check connection for null

This closes #2128


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0c3b879
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0c3b879
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0c3b879

Branch: refs/heads/master
Commit: a0c3b879b25b5e935e9540422b50bbc031a20735
Parents: 45edafd
Author: Ivan Mushketyk <ivan.mushke...@gmail.com>
Authored: Fri Jun 17 21:45:25 2016 +0100
Committer: zentol <ches...@apache.org>
Committed: Fri Jul 15 12:16:53 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  |   5 +-
 .../connectors/rabbitmq/RMQSource.java          |   3 +
 .../connectors/rabbitmq/RMQSourceTest.java      |  18 +++
 .../connectors/rabbitmq/common/RMQSinkTest.java | 125 +++++++++++++++++++
 4 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 6473164..be7e946 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -47,7 +47,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
        private boolean logFailuresOnly = false;
 
        /**
-        * @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+        * @param rmqConnectionConfig The RabbitMQ connection configuration 
{@link RMQConnectionConfig}.
         * @param queueName The queue to publish messages to.
         * @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
      */
@@ -76,6 +76,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
                try {
                        connection = factory.newConnection();
                        channel = connection.createChannel();
+                       if (channel == null) {
+                               throw new RuntimeException("None of RabbitMQ 
channels are available");
+                       }
                        channel.queueDeclare(queueName, false, false, false, 
null);
                } catch (IOException e) {
                        throw new RuntimeException("Error while creating the 
channel", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 0892d61..33cf52c 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -148,6 +148,9 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
                try {
                        connection = factory.newConnection();
                        channel = connection.createChannel();
+                       if (channel == null) {
+                               throw new RuntimeException("None of RabbitMQ 
channels are available");
+                       }
                        setupQueue();
                        consumer = new QueueingConsumer(channel);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 31128a9..b63c835 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -108,6 +108,24 @@ public class RMQSourceTest {
        }
 
        @Test
+       public void throwExceptionIfConnectionFactoryReturnNull() throws 
Exception {
+               RMQConnectionConfig connectionConfig = 
Mockito.mock(RMQConnectionConfig.class);
+               ConnectionFactory connectionFactory = 
Mockito.mock(ConnectionFactory.class);
+               Connection connection = Mockito.mock(Connection.class);
+               
Mockito.when(connectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+               
Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+               Mockito.when(connection.createChannel()).thenReturn(null);
+
+               RMQSource<String> rmqSource = new RMQSource<>(
+                       connectionConfig, "queueDummy", true, new 
StringDeserializationScheme());
+               try {
+                       rmqSource.open(new Configuration());
+               } catch (RuntimeException ex) {
+                       assertEquals("None of RabbitMQ channels are available", 
ex.getMessage());
+               }
+       }
+
+       @Test
        public void testCheckpointing() throws Exception {
                source.autoAck = false;
                sourceThread.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
new file mode 100644
index 0000000..199cd1e
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class RMQSinkTest {
+
+       private static final String QUEUE_NAME = "queue";
+       private static final String MESSAGE_STR = "msg";
+       private static final byte[] MESSAGE = new byte[1];
+
+       private RMQConnectionConfig rmqConnectionConfig;
+       private ConnectionFactory connectionFactory;
+       private Connection connection;
+       private Channel channel;
+       private SerializationSchema<String> serializationSchema;
+
+
+       @Before
+       public void before() throws Exception {
+               serializationSchema = spy(new DummySerializationSchema());
+               rmqConnectionConfig = mock(RMQConnectionConfig.class);
+               connectionFactory = mock(ConnectionFactory.class);
+               connection = mock(Connection.class);
+               channel = mock(Channel.class);
+
+               
when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+               when(connectionFactory.newConnection()).thenReturn(connection);
+               when(connection.createChannel()).thenReturn(channel);
+       }
+
+       @Test
+       public void openCallDeclaresQueue() throws Exception {
+               createRMQSink();
+
+               verify(channel).queueDeclare(QUEUE_NAME, false, false, false, 
null);
+       }
+
+       @Test
+       public void throwExceptionIfChannelIsNull() throws Exception {
+               when(connection.createChannel()).thenReturn(null);
+               try {
+                       createRMQSink();
+               } catch (RuntimeException ex) {
+                       assertEquals("None of RabbitMQ channels are available", 
ex.getMessage());
+               }
+       }
+
+       private RMQSink<String> createRMQSink() throws Exception {
+               RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+               rmqSink.open(new Configuration());
+               return rmqSink;
+       }
+
+       @Test
+       public void invokePublishBytesToQueue() throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+
+               rmqSink.invoke(MESSAGE_STR);
+               verify(serializationSchema).serialize(MESSAGE_STR);
+               verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void exceptionDuringPublishingIsNotIgnored() throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+
+               doThrow(IOException.class).when(channel).basicPublish("", 
QUEUE_NAME, null, MESSAGE);
+               rmqSink.invoke("msg");
+       }
+
+       @Test
+       public void exceptionDuringPublishingIsIgnoredIfLogFailuresOnly() 
throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+               rmqSink.setLogFailuresOnly(true);
+
+               doThrow(IOException.class).when(channel).basicPublish("", 
QUEUE_NAME, null, MESSAGE);
+               rmqSink.invoke("msg");
+       }
+
+       @Test
+       public void closeAllResources() throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+
+               rmqSink.close();
+
+               verify(channel).close();
+               verify(connection).close();
+       }
+
+       private class DummySerializationSchema implements 
SerializationSchema<String> {
+               @Override
+               public byte[] serialize(String element) {
+                       return MESSAGE;
+               }
+       }
+}

Reply via email to