senegalo commented on a change in pull request #12729:
URL: https://github.com/apache/flink/pull/12729#discussion_r446643314
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -141,6 +140,22 @@ protected Connection setupConnection() throws Exception {
return setupConnectionFactory().newConnection();
}
+ /**
+ * Initializes the consumer's {@link Channel}. If a prefetch count has
been set in {@link RMQConnectionConfig},
+ * the new channel will be use it for {@link Channel#basicQos(int)}.
+ *
+ * @param connection the consumer's {@link Connection}.
+ * @return the channel.
+ * @throws Exception if there is an issue creating or configuring the
channel.
+ */
+ protected Channel setupChannel(Connection connection) throws Exception {
+ Channel chan = connection.createChannel();
+ if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
+
chan.basicQos(rmqConnectionConfig.getPrefetchCount().get());
Review comment:
should we set the global flag to true ? to enforce having only that
limit throughout the channel ?
i am worried if someone adds a functionality that say requires having
multiple consumers on the same channel that then the `preFetchCount` would be
per consumer and the preFetchCount set by the client would suddenly mean a
completely different thing. But if enforced here to be per channel it will have
the same effect for the user.
According to the documentation `chan.basicQos(prefectchCount, boolean
global)` and the boolean `global` is:
```
false | applied separately to each new consumer on the channel
true | shared across all consumers on the channel
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]