Repository: flink
Updated Branches:
  refs/heads/master 776253cbb -> 6cc1c179a


[FLINK-3763] Some RMQ cleanups

This closes #2054


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cc1c179
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cc1c179
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cc1c179

Branch: refs/heads/master
Commit: 6cc1c179af5e48b97d66fd3bcbd23f6704d43f62
Parents: 86a8033
Author: Robert Metzger <[email protected]>
Authored: Thu Jun 9 13:31:12 2016 +0200
Committer: Robert Metzger <[email protected]>
Committed: Thu Jun 9 15:16:32 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  | 72 +++++++++++++-------
 .../connectors/rabbitmq/RMQSource.java          | 17 ++---
 .../rabbitmq/common/RMQConnectionConfig.java    | 14 ++--
 3 files changed, 59 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6cc1c179/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index bf0cef7..6473164 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -30,6 +30,10 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
+/**
+ * A Sink for publishing data into RabbitMQ
+ * @param <IN>
+ */
 public class RMQSink<IN> extends RichSinkFunction<IN> {
        private static final long serialVersionUID = 1L;
 
@@ -40,6 +44,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
        private transient Connection connection;
        private transient Channel channel;
        private SerializationSchema<IN> schema;
+       private boolean logFailuresOnly = false;
 
        /**
         * @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
@@ -53,6 +58,31 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
        }
 
        /**
+        * Defines whether the producer should fail on errors, or only log them.
+        * If this is set to true, then exceptions will be only logged, if set 
to false,
+        * exceptions will be eventually thrown and cause the streaming program 
to
+        * fail (and enter recovery).
+        *
+        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+        */
+       public void setLogFailuresOnly(boolean logFailuresOnly) {
+               this.logFailuresOnly = logFailuresOnly;
+       }
+
+
+       @Override
+       public void open(Configuration config) throws Exception {
+               ConnectionFactory factory = 
rmqConnectionConfig.getConnectionFactory();
+               try {
+                       connection = factory.newConnection();
+                       channel = connection.createChannel();
+                       channel.queueDeclare(queueName, false, false, false, 
null);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while creating the 
channel", e);
+               }
+       }
+
+       /**
         * Called when new data arrives to the sink, and forwards it to RMQ.
         *
         * @param value
@@ -64,45 +94,37 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
                        byte[] msg = schema.serialize(value);
 
                        channel.basicPublish("", queueName, null, msg);
-
                } catch (IOException e) {
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Cannot send RMQ message {} at {}", 
queueName, rmqConnectionConfig.getHost());
+                       if (logFailuresOnly) {
+                               LOG.error("Cannot send RMQ message {} at {}", 
queueName, rmqConnectionConfig.getHost(), e);
+                       } else {
+                               throw new RuntimeException("Cannot send RMQ 
message " + queueName +" at " + rmqConnectionConfig.getHost(), e);
                        }
                }
 
        }
 
-       /**
-        * Closes the connection.
-        */
-       private void closeChannel() {
+       @Override
+       public void close() {
+               IOException t = null;
                try {
                        channel.close();
-                       connection.close();
                } catch (IOException e) {
-                       throw new RuntimeException("Error while closing RMQ 
connection with " + queueName
-                               + " at " + rmqConnectionConfig.getHost(), e);
+                       t = e;
                }
 
-       }
-
-       @Override
-       public void open(Configuration config) throws Exception {
-               ConnectionFactory factory = 
rmqConnectionConfig.getConnectionFactory();
                try {
-                       connection = factory.newConnection();
-                       channel = connection.createChannel();
-                       channel.queueDeclare(queueName, false, false, false, 
null);
-
+                       connection.close();
                } catch (IOException e) {
-                       throw new RuntimeException(e);
+                       if(t != null) {
+                               LOG.warn("Both channel and connection closing 
failed. Logging channel exception and failing with connection exception", t);
+                       }
+                       t = e;
+               }
+               if(t != null) {
+                       throw new RuntimeException("Error while closing RMQ 
connection with " + queueName
+                                       + " at " + 
rmqConnectionConfig.getHost(), t);
                }
-       }
-
-       @Override
-       public void close() {
-               closeChannel();
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cc1c179/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 8297f9c..20a8b0b 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -141,10 +141,9 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
                channel.queueDeclare(queueName, true, false, false, null);
        }
 
-       /**
-        * Initializes the connection to RMQ.
-        */
-       private void initializeConnection() throws Exception {
+       @Override
+       public void open(Configuration config) throws Exception {
+               super.open(config);
                ConnectionFactory factory = setupConnectionFactory();
                try {
                        connection = factory.newConnection();
@@ -154,7 +153,7 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
 
                        RuntimeContext runtimeContext = getRuntimeContext();
                        if (runtimeContext instanceof StreamingRuntimeContext
-                               && ((StreamingRuntimeContext) 
runtimeContext).isCheckpointingEnabled()) {
+                                       && ((StreamingRuntimeContext) 
runtimeContext).isCheckpointingEnabled()) {
                                autoAck = false;
                                // enables transaction mode
                                channel.txSelect();
@@ -167,14 +166,8 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
 
                } catch (IOException e) {
                        throw new RuntimeException("Cannot create RMQ 
connection with " + queueName + " at "
-                               + rmqConnectionConfig.getHost(), e);
+                                       + rmqConnectionConfig.getHost(), e);
                }
-       }
-
-       @Override
-       public void open(Configuration config) throws Exception {
-               super.open(config);
-               initializeConnection();
                running = true;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6cc1c179/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
index 0ce7e79..72bac1c 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -73,7 +73,7 @@ public class RMQConnectionConfig implements Serializable {
        * @param requestedFrameMax requested maximum frame size
        * @param requestedHeartbeat requested heartbeat interval
        * @throws NullPointerException if host or virtual host or username or 
password is null
-    */
+       */
        private RMQConnectionConfig(String host, Integer port, String 
virtualHost, String username, String password,
                                                                Integer 
networkRecoveryInterval, Boolean automaticRecovery,
                                                                Boolean 
topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
@@ -162,7 +162,7 @@ public class RMQConnectionConfig implements Serializable {
        /**
         * Retrieve the URI.
         * @return the connection URI when connecting to the broker
-     */
+        */
        public String getUri() {
                return uri;
        }
@@ -227,15 +227,15 @@ public class RMQConnectionConfig implements Serializable {
         *
         * @return Connection Factory for RMQ
         * @throws URISyntaxException, NoSuchAlgorithmException, 
KeyManagementException if Malformed URI has been passed
-     */
+        */
        public ConnectionFactory getConnectionFactory() throws 
URISyntaxException,
                NoSuchAlgorithmException, KeyManagementException {
                ConnectionFactory factory = new ConnectionFactory();
                if (this.uri != null && !this.uri.isEmpty()){
                        try {
-                               factory.setUri(getUri());
-                       }catch (URISyntaxException | NoSuchAlgorithmException | 
KeyManagementException e){
-                               LOG.error("Failed to parse uri {}", 
e.getMessage());
+                               factory.setUri(this.uri);
+                       } catch (URISyntaxException | NoSuchAlgorithmException 
| KeyManagementException e) {
+                               LOG.error("Failed to parse uri", e);
                                throw e;
                        }
                } else {
@@ -432,7 +432,7 @@ public class RMQConnectionConfig implements Serializable {
                 * else URI will be used to initialize the client connection
                 * {@link RMQConnectionConfig#RMQConnectionConfig(String, 
Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
                 * @return RMQConnectionConfig
-         */
+                */
                public RMQConnectionConfig build(){
                        if(this.uri != null) {
                                return new RMQConnectionConfig(this.uri, 
this.networkRecoveryInterval,

Reply via email to