jbsabbagh commented on code in PR #30877:
URL: https://github.com/apache/beam/pull/30877#discussion_r1556092498
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -518,8 +525,11 @@ private ConsumerRecords<byte[], byte[]> poll(
return rawRecords;
}
elapsed = sw.elapsed();
- if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
+ if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
// timeout is over
+ LOG.warn(
+ "No messages retrieved with polling timeout {} seconds. Consider
increasing the consumer polling timeout using withConsumerPollingTimeout
method.",
+ consumerPollingTimeout.getSeconds());
Review Comment:
Whoops - sorry I missed this. I think this warning should be emitted only
when `rawRecords` is empty. There are still cases where the timeout is over and
`rawRecords` is not empty
```suggestion
if (rawRecords.isEmpty()) {
LOG.warn(
"No messages retrieved with polling timeout {} seconds. Consider
increasing the consumer polling timeout using withConsumerPollingTimeout
method.",
consumerPollingTimeout.getSeconds());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]