xianhualiu commented on code in PR #30877:
URL: https://github.com/apache/beam/pull/30877#discussion_r1556357679


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -518,8 +525,11 @@ private ConsumerRecords<byte[], byte[]> poll(
         return rawRecords;
       }
       elapsed = sw.elapsed();
-      if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
+      if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
         // timeout is over
+        LOG.warn(
+            "No messages retrieved with polling timeout {} seconds. Consider 
increasing the consumer polling timeout using withConsumerPollingTimeout 
method.",
+            consumerPollingTimeout.getSeconds());

Review Comment:
   hmm, if rawRecords is not empty, it should already returned before at:
   
   if (!rawRecords.isEmpty()) {
           // return as we have found some entries
           return rawRecords;
         }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to