This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 444e10d29e0 Kafka adaptive timeout implementation to handle empty 
topic cases (#29400)
444e10d29e0 is described below

commit 444e10d29e04e3e144ef08c0a505bd76aa08984f
Author: Talat UYARER <[email protected]>
AuthorDate: Tue Nov 21 11:02:57 2023 -0800

    Kafka adaptive timeout implementation to handle empty topic cases (#29400)
    
    * Kafka adaptive timeout implementation to handle empty topic cases
    
    * Format fix with spotless
    
    ---------
    
    Co-authored-by: tuyarer <[email protected]>
---
 .../apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 55c71384162..054eb502cd8 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -357,7 +357,9 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
    */
   private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
 
-  private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = 
Duration.millis(10);
+  private Duration recordsDequeuePollTimeout;
+  private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MIN = 
Duration.millis(1);
+  private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX = 
Duration.millis(20);
   private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = 
Duration.millis(100);
 
   // Use a separate thread to read Kafka messages. Kafka Consumer does all its 
work including
@@ -543,6 +545,7 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
     backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
     backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
+    recordsDequeuePollTimeout = Duration.millis(10);
   }
 
   private void consumerPollLoop() {
@@ -614,8 +617,7 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     try {
       // poll available records, wait (if necessary) up to the specified 
timeout.
       records =
-          availableRecordsQueue.poll(
-              RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
+          availableRecordsQueue.poll(recordsDequeuePollTimeout.getMillis(), 
TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOG.warn("{}: Unexpected", this, e);
@@ -627,9 +629,19 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
       if (consumerPollException.get() != null) {
         throw new IOException("Exception while reading from Kafka", 
consumerPollException.get());
       }
+      if 
(recordsDequeuePollTimeout.isLongerThan(RECORDS_DEQUEUE_POLL_TIMEOUT_MIN)) {
+        recordsDequeuePollTimeout = 
recordsDequeuePollTimeout.minus(Duration.millis(1));
+        LOG.debug("Reducing poll timeout for reader to " + 
recordsDequeuePollTimeout.getMillis());
+      }
       return;
     }
 
+    if 
(recordsDequeuePollTimeout.isShorterThan(RECORDS_DEQUEUE_POLL_TIMEOUT_MAX)) {
+      recordsDequeuePollTimeout = 
recordsDequeuePollTimeout.plus(Duration.millis(1));
+      LOG.debug("Increasing poll timeout for reader to " + 
recordsDequeuePollTimeout.getMillis());
+      LOG.debug("Record count: " + records.count());
+    }
+
     partitionStates.forEach(p -> p.recordIter = 
records.records(p.topicPartition).iterator());
 
     // cycle through the partitions in order to interleave records from each.

Reply via email to