austince commented on a change in pull request #12729:
URL: https://github.com/apache/flink/pull/12729#discussion_r458283453



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -141,6 +140,22 @@ protected Connection setupConnection() throws Exception {
                return setupConnectionFactory().newConnection();
        }
 
+       /**
+        * Initializes the consumer's {@link Channel}. If a prefetch count has 
been set in {@link RMQConnectionConfig},
+        * the new channel will be use it for {@link Channel#basicQos(int)}.
+        *
+        * @param connection the consumer's {@link Connection}.
+        * @return the channel.
+        * @throws Exception if there is an issue creating or configuring the 
channel.
+        */
+       protected Channel setupChannel(Connection connection) throws Exception {
+               Channel chan = connection.createChannel();
+               if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
+                       
chan.basicQos(rmqConnectionConfig.getPrefetchCount().get());

Review comment:
       Finally got around to this, sorry about the late reply! When I was 
putting together the example, I found that RMQ 3.3 is not even available on 
Docker hub and has [been out of service since 
2015](https://www.rabbitmq.com/versions.html) and [the last patch was released 
in 
2016](https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_3_5),
 so I think we can safely just set the global flag and not care about that 
version. Here's the playground running this branch's code: 
https://github.com/austince/flink-rmq-playground




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to