austince commented on a change in pull request #12729:
URL: https://github.com/apache/flink/pull/12729#discussion_r446980990
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -141,6 +140,22 @@ protected Connection setupConnection() throws Exception {
return setupConnectionFactory().newConnection();
}
+ /**
+ * Initializes the consumer's {@link Channel}. If a prefetch count has
been set in {@link RMQConnectionConfig},
+ * the new channel will be use it for {@link Channel#basicQos(int)}.
+ *
+ * @param connection the consumer's {@link Connection}.
+ * @return the channel.
+ * @throws Exception if there is an issue creating or configuring the
channel.
+ */
+ protected Channel setupChannel(Connection connection) throws Exception {
+ Channel chan = connection.createChannel();
+ if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
+
chan.basicQos(rmqConnectionConfig.getPrefetchCount().get());
Review comment:
Thanks for going through this! That's a good point, I'm not sure what a
good answer is. I've read in the Nodejs client I used that [sending that flag
to a RMQ cluster < version 3.3.0 will kill it
entirely](https://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch),
but I haven't tested that out at all or seen it elsewhere. I'll put together a
playground this week.
We're always reading off of one queue per source in our jobs, but our use
case is pretty simple so I'm not sure I've got the best understanding of other
RMQ users. What kinds of reasons do people have for sharing a channel?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]