Repository: flink
Updated Branches:
  refs/heads/master bf9cc81a7 -> 39ec54ff1


[FLINK-2130] [streaming] RMQ Source properly propagates exceptions

Closes #767


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39ec54ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39ec54ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39ec54ff

Branch: refs/heads/master
Commit: 39ec54ff141fe3c0da58a53b695b7585eb9d5418
Parents: 3527f40
Author: mbalassi <[email protected]>
Authored: Wed Jun 3 11:16:48 2015 +0200
Committer: mbalassi <[email protected]>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSource.java  | 18 +++++-------------
 1 file changed, 5 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39ec54ff/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index a4c833e..d706b8c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.ConnectorSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -33,8 +31,6 @@ import com.rabbitmq.client.QueueingConsumer;
 public class RMQSource<OUT> extends ConnectorSource<OUT> {
        private static final long serialVersionUID = 1L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(RMQSource.class);
-
        private final String QUEUE_NAME;
        private final String HOST_NAME;
 
@@ -44,8 +40,6 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
        private transient QueueingConsumer consumer;
        private transient QueueingConsumer.Delivery delivery;
 
-       private volatile boolean isRunning = false;
-
        OUT out;
 
        public RMQSource(String HOST_NAME, String QUEUE_NAME,
@@ -97,9 +91,8 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
                try {
                        delivery = consumer.nextDelivery();
                } catch (Exception e) {
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Cannot recieve RMQ message {} at 
{}", QUEUE_NAME, HOST_NAME);
-                       }
+                       throw new RuntimeException("Error while reading message 
from RMQ source from " + QUEUE_NAME
+                                       + " at " + HOST_NAME, e);
                }
 
                out = schema.deserialize(delivery.getBody());
@@ -121,14 +114,13 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
                try {
                        delivery = consumer.nextDelivery();
                } catch (Exception e) {
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Cannot recieve RMQ message {} at 
{}", QUEUE_NAME, HOST_NAME);
-                       }
+                       throw new RuntimeException("Error while reading message 
from RMQ source from " + QUEUE_NAME
+                                       + " at " + HOST_NAME, e);
                }
 
                out = schema.deserialize(delivery.getBody());
                if (schema.isEndOfStream(out)) {
-                       throw new RuntimeException("RMQ source is at end.");
+                       throw new RuntimeException("RMQ source is at end for " 
+ QUEUE_NAME + " at " + HOST_NAME);
                }
                OUT result = out;
                out = null;

Reply via email to