cmick commented on a change in pull request #16023:
URL: https://github.com/apache/flink/pull/16023#discussion_r654998778
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -325,11 +325,15 @@ private void processMessage(Delivery delivery,
RMQCollectorImpl collector) throw
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
final RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
+ final long timeout = rmqConnectionConfig.getDeliveryTimeout();
while (running) {
- Delivery delivery = consumer.nextDelivery();
+ Delivery delivery =
+ timeout > 0 ? consumer.nextDelivery(timeout) :
consumer.nextDelivery();
Review comment:
It can be set to zero. Zero is the current, problematic, behavior (no
timeout). Any positive number will enable the timeout (and it will be enabled
by default). Do you mean, the "no timeout" case should be dropped completely?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]