Repository: flink Updated Branches: refs/heads/master bf9cc81a7 -> 39ec54ff1
[FLINK-2130] [streaming] RMQ Source properly propagates exceptions Closes #767 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39ec54ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39ec54ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39ec54ff Branch: refs/heads/master Commit: 39ec54ff141fe3c0da58a53b695b7585eb9d5418 Parents: 3527f40 Author: mbalassi <[email protected]> Authored: Wed Jun 3 11:16:48 2015 +0200 Committer: mbalassi <[email protected]> Committed: Wed Jun 3 12:47:41 2015 +0200 ---------------------------------------------------------------------- .../streaming/connectors/rabbitmq/RMQSource.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/39ec54ff/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index a4c833e..d706b8c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -22,8 +22,6 @@ import java.io.IOException; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.ConnectorSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -33,8 +31,6 @@ import com.rabbitmq.client.QueueingConsumer; public class RMQSource<OUT> extends ConnectorSource<OUT> { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class); - private final String QUEUE_NAME; private final String HOST_NAME; @@ -44,8 +40,6 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> { private transient QueueingConsumer consumer; private transient QueueingConsumer.Delivery delivery; - private volatile boolean isRunning = false; - OUT out; public RMQSource(String HOST_NAME, String QUEUE_NAME, @@ -97,9 +91,8 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> { try { delivery = consumer.nextDelivery(); } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME); - } + throw new RuntimeException("Error while reading message from RMQ source from " + QUEUE_NAME + + " at " + HOST_NAME, e); } out = schema.deserialize(delivery.getBody()); @@ -121,14 +114,13 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> { try { delivery = consumer.nextDelivery(); } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME); - } + throw new RuntimeException("Error while reading message from RMQ source from " + QUEUE_NAME + + " at " + HOST_NAME, e); } out = schema.deserialize(delivery.getBody()); if (schema.isEndOfStream(out)) { - throw new RuntimeException("RMQ source is at end."); + throw new RuntimeException("RMQ source is at end for " + QUEUE_NAME + " at " + HOST_NAME); } OUT result = out; out = null;
