cmick commented on a change in pull request #16023:
URL: https://github.com/apache/flink/pull/16023#discussion_r654998778



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -325,11 +325,15 @@ private void processMessage(Delivery delivery, 
RMQCollectorImpl collector) throw
     @Override
     public void run(SourceContext<OUT> ctx) throws Exception {
         final RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
+        final long timeout = rmqConnectionConfig.getDeliveryTimeout();
         while (running) {
-            Delivery delivery = consumer.nextDelivery();
+            Delivery delivery =
+                    timeout > 0 ? consumer.nextDelivery(timeout) : 
consumer.nextDelivery();

Review comment:
       It can be set to zero. Zero is the current, problematic, behavior (no 
timeout). Any positive number will enable the timeout (and it will be enabled 
by default). Do you mean, the "no timeout" case should be dropped completely?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to