senegalo commented on a change in pull request #12729:
URL: https://github.com/apache/flink/pull/12729#discussion_r447089528
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -141,6 +140,22 @@ protected Connection setupConnection() throws Exception {
return setupConnectionFactory().newConnection();
}
+ /**
+ * Initializes the consumer's {@link Channel}. If a prefetch count has
been set in {@link RMQConnectionConfig},
+ * the new channel will be use it for {@link Channel#basicQos(int)}.
+ *
+ * @param connection the consumer's {@link Connection}.
+ * @return the channel.
+ * @throws Exception if there is an issue creating or configuring the
channel.
+ */
+ protected Channel setupChannel(Connection connection) throws Exception {
+ Channel chan = connection.createChannel();
+ if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
+
chan.basicQos(rmqConnectionConfig.getPrefetchCount().get());
Review comment:
Hemm ... you're absolutely right i also have a very basic setup several
independent brokers and we duplicate and deduplicate on the clients.
`To keep life interesting, using the global flag with an RabbitMQ older than
v3.3.0 will bring down the whole connection.` :rofl: okay then experimenting is
the way to go ! do you need a helping hand ? i can spare a couple of hours
next weekend if you need help.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]