[FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to customize queue config
This closes #2281 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4d9f571 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4d9f571 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4d9f571 Branch: refs/heads/release-1.1 Commit: a4d9f57153d4ff84395e56b68d8453384437254f Parents: 60b1085 Author: philippgrulich <[email protected]> Authored: Thu Jul 21 13:31:24 2016 -0700 Committer: Ufuk Celebi <[email protected]> Committed: Tue Aug 2 20:24:36 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/rabbitmq/RMQSink.java | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4d9f571/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index be7e946..a0795d6 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -39,11 +39,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - private String queueName; - private RMQConnectionConfig rmqConnectionConfig; - private transient Connection connection; - private transient Channel channel; - private SerializationSchema<IN> schema; + protected final String queueName; + private final RMQConnectionConfig rmqConnectionConfig; + protected transient Connection connection; + protected transient Channel channel; + protected SerializationSchema<IN> schema; private boolean logFailuresOnly = false; /** @@ -58,6 +58,15 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { } /** + * Sets up the queue. The default implementation just declares the queue. The user may override + * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or + * defining custom queue parameters) + */ + protected void setupQueue() throws IOException { + channel.queueDeclare(queueName, false, false, false, null); + } + + /** * Defines whether the producer should fail on errors, or only log them. * If this is set to true, then exceptions will be only logged, if set to false, * exceptions will be eventually thrown and cause the streaming program to @@ -79,7 +88,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { if (channel == null) { throw new RuntimeException("None of RabbitMQ channels are available"); } - channel.queueDeclare(queueName, false, false, false, null); + setupQueue(); } catch (IOException e) { throw new RuntimeException("Error while creating the channel", e); }
