jbsabbagh commented on code in PR #30877:
URL: https://github.com/apache/beam/pull/30877#discussion_r1555950658


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -191,6 +191,12 @@ private ReadFromKafkaDoFn(
     this.checkStopReadingFn = transform.getCheckStopReadingFn();
     this.badRecordRouter = transform.getBadRecordRouter();
     this.recordTag = recordTag;
+    if (transform.getConsumerPollingTimeout() != null) {
+      this.consumerPollingTimeout =
+          
java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis());
+    } else {
+      this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT;
+    }

Review Comment:
   Maybe we could add some sort of log here that mentions what the timeout is? 
This will help signal to users that a timeout outside of the Kafka Consumer 
configs exists.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -518,7 +525,7 @@ private ConsumerRecords<byte[], byte[]> poll(
         return rawRecords;
       }
       elapsed = sw.elapsed();
-      if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
+      if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
         // timeout is over
         return rawRecords;

Review Comment:
   In the case where the `rawRecords.isEmpty()`, might be worth adding some 
sort of count that tracks how many times `rawRecords` has been empty. If this 
count is too high, maybe emit a warning log that it is happening too often and 
that perhaps they should increase the poll timeout



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -587,6 +587,7 @@ public static <K, V> Read<K, V> read() {
         .setCommitOffsetsInFinalizeEnabled(false)
         .setDynamicRead(false)
         .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
+        .setConsumerPollingTimeout(Duration.standardSeconds(1L))

Review Comment:
   Should the default be increased slightly? Maybe 2 seconds? Given that this 
is even causing issues in the US to US cross-regional reads, it does seem to 
suggest that 1 second might be a bit too aggressive as a general default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to