jbsabbagh commented on code in PR #30877:
URL: https://github.com/apache/beam/pull/30877#discussion_r1556092498


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -518,8 +525,11 @@ private ConsumerRecords<byte[], byte[]> poll(
         return rawRecords;
       }
       elapsed = sw.elapsed();
-      if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
+      if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
         // timeout is over
+        LOG.warn(
+            "No messages retrieved with polling timeout {} seconds. Consider 
increasing the consumer polling timeout using withConsumerPollingTimeout 
method.",
+            consumerPollingTimeout.getSeconds());

Review Comment:
   Whoops - sorry I missed this. I think this warning should be emitted only 
when rawRecords is empty. There are still cases where the timeout is over and 
`rawRecords` is not empty
   ```suggestion
          if rawRecords.isEmpty() {
           LOG.warn(
               "No messages retrieved with polling timeout {} seconds. Consider 
increasing the consumer polling timeout using withConsumerPollingTimeout 
method.",
               consumerPollingTimeout.getSeconds());
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to