[FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to 
customize queue config

This closes #2281


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4d9f571
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4d9f571
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4d9f571

Branch: refs/heads/release-1.1
Commit: a4d9f57153d4ff84395e56b68d8453384437254f
Parents: 60b1085
Author: philippgrulich <[email protected]>
Authored: Thu Jul 21 13:31:24 2016 -0700
Committer: Ufuk Celebi <[email protected]>
Committed: Tue Aug 2 20:24:36 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4d9f571/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index be7e946..a0795d6 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -39,11 +39,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-       private String queueName;
-       private RMQConnectionConfig rmqConnectionConfig;
-       private transient Connection connection;
-       private transient Channel channel;
-       private SerializationSchema<IN> schema;
+       protected final String queueName;
+       private final RMQConnectionConfig rmqConnectionConfig;
+       protected transient Connection connection;
+       protected transient Channel channel;
+       protected SerializationSchema<IN> schema;
        private boolean logFailuresOnly = false;
 
        /**
@@ -58,6 +58,15 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
        }
 
        /**
+        * Sets up the queue. The default implementation just declares the 
queue. The user may override
+        * this method to have a custom setup for the queue (i.e. binding the 
queue to an exchange or
+        * defining custom queue parameters)
+        */
+       protected void setupQueue() throws IOException {
+               channel.queueDeclare(queueName, false, false, false, null);
+       }
+
+       /**
         * Defines whether the producer should fail on errors, or only log them.
         * If this is set to true, then exceptions will be only logged, if set 
to false,
         * exceptions will be eventually thrown and cause the streaming program 
to
@@ -79,7 +88,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
                        if (channel == null) {
                                throw new RuntimeException("None of RabbitMQ 
channels are available");
                        }
-                       channel.queueDeclare(queueName, false, false, false, 
null);
+                       setupQueue();
                } catch (IOException e) {
                        throw new RuntimeException("Error while creating the 
channel", e);
                }

Reply via email to