This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 93f2c68  [FLINK-14562] Let RabbitMQ source close consumer and channel 
on close
93f2c68 is described below

commit 93f2c68c02080d30044924640db9643fdeb4dba8
Author: Nicolas Deslandes <[email protected]>
AuthorDate: Tue Oct 29 10:41:59 2019 -0400

    [FLINK-14562] Let RabbitMQ source close consumer and channel on close
    
    Closing method of RabbitMQ source must close consumer and channel in order 
to prevent leaving idle consumer
    
    This closes #10036.
---
 .../streaming/connectors/rabbitmq/RMQSource.java      | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index d454153..f079369 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -177,6 +177,25 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
        @Override
        public void close() throws Exception {
                super.close();
+
+               try {
+                       if (consumer != null && channel != null) {
+                               channel.basicCancel(consumer.getConsumerTag());
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while cancelling RMQ 
consumer on " + queueName
+                               + " at " + rmqConnectionConfig.getHost(), e);
+               }
+
+               try {
+                       if (channel != null) {
+                               channel.close();
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while closing RMQ 
channel with " + queueName
+                               + " at " + rmqConnectionConfig.getHost(), e);
+               }
+
                try {
                        if (connection != null) {
                                connection.close();

Reply via email to