This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 93f2c68 [FLINK-14562] Let RabbitMQ source close consumer and channel
on close
93f2c68 is described below
commit 93f2c68c02080d30044924640db9643fdeb4dba8
Author: Nicolas Deslandes <[email protected]>
AuthorDate: Tue Oct 29 10:41:59 2019 -0400
[FLINK-14562] Let RabbitMQ source close consumer and channel on close
Closing method of RabbitMQ source must close consumer and channel in order
to prevent leaving idle consumer
This closes #10036.
---
.../streaming/connectors/rabbitmq/RMQSource.java | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index d454153..f079369 100644
---
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -177,6 +177,25 @@ public class RMQSource<OUT> extends
MultipleIdsMessageAcknowledgingSourceBase<OU
@Override
public void close() throws Exception {
super.close();
+
+ try {
+ if (consumer != null && channel != null) {
+ channel.basicCancel(consumer.getConsumerTag());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error while cancelling RMQ
consumer on " + queueName
+ + " at " + rmqConnectionConfig.getHost(), e);
+ }
+
+ try {
+ if (channel != null) {
+ channel.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing RMQ
channel with " + queueName
+ + " at " + rmqConnectionConfig.getHost(), e);
+ }
+
try {
if (connection != null) {
connection.close();