This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f4b2561c58 [bug30870]: make consumer polling timeout configurable for 
KafkaIO.Read (#30877)
3f4b2561c58 is described below

commit 3f4b2561c58c9c0bf76f4144fb9f06aaaad424f5
Author: xianhualiu <[email protected]>
AuthorDate: Tue Apr 9 19:44:06 2024 -0400

    [bug30870]: make consumer polling timeout configurable for KafkaIO.Read 
(#30877)
    
    * [bug30870]: make consumer polling timeout configurable for KafkaIO.Read
    
    * fixed spotless complains
    
    * fixed unit tests
    
    * added logs and increased default polling timeout from 1 to 2 seconds.
    
    * spotless apply changes
    
    * Update CHANGES.md
    
    updated changes.md with changes to make consumer polling timeout 
configurable for KafkaIO.Read
    
    * Update CHANGES.md
    
    * Update CHANGES.md
    
    added break changes
    
    * Update CHANGES.md
---
 CHANGES.md                                         |  1 +
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 35 +++++++++++++++++++++-
 .../KafkaIOReadImplementationCompatibility.java    |  1 +
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 16 ++++++++--
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 12 ++++++++
 .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java   | 14 +++++++++
 6 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5824c71a98d..941ba23a757 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -73,6 +73,7 @@
 ## Breaking Changes
 
 * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* Default consumer polling timeout for KafkaIO.Read was increased from 1 
second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration 
duration) to configure this timeout value when necessary 
([#30870](https://github.com/apache/beam/issues/30870)).
 
 ## Deprecations
 
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 231a1b9e49e..c56071e85ad 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -587,6 +587,7 @@ public class KafkaIO {
         .setCommitOffsetsInFinalizeEnabled(false)
         .setDynamicRead(false)
         .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
+        .setConsumerPollingTimeout(Duration.standardSeconds(2L))
         .build();
   }
 
@@ -706,6 +707,9 @@ public class KafkaIO {
     @Pure
     public abstract @Nullable ErrorHandler<BadRecord, ?> 
getBadRecordErrorHandler();
 
+    @Pure
+    public abstract @Nullable Duration getConsumerPollingTimeout();
+
     abstract Builder<K, V> toBuilder();
 
     @AutoValue.Builder
@@ -762,6 +766,8 @@ public class KafkaIO {
         return 
setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
       }
 
+      abstract Builder<K, V> setConsumerPollingTimeout(Duration 
consumerPollingTimeout);
+
       abstract Read<K, V> build();
 
       static <K, V> void setupExternalBuilder(
@@ -1334,6 +1340,17 @@ public class KafkaIO {
       return 
toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build();
     }
 
+    /**
+     * Sets the timeout time for Kafka consumer polling request in the {@link 
ReadFromKafkaDoFn}.
+     * The default is 2 second.
+     */
+    public Read<K, V> withConsumerPollingTimeout(Duration duration) {
+      checkState(
+          duration == null || duration.compareTo(Duration.ZERO) > 0,
+          "Consumer polling timeout must be greater than 0.");
+      return toBuilder().setConsumerPollingTimeout(duration).build();
+    }
+
     /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping 
Kafka metatdata. */
     public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
       return new TypedWithoutMetadata<>(this);
@@ -1596,7 +1613,8 @@ public class KafkaIO {
                 
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
                 .withManualWatermarkEstimator()
                 
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
-                .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
+                .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn())
+                
.withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout());
         if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
           readTransform = readTransform.commitOffsets();
         }
@@ -2036,6 +2054,9 @@ public class KafkaIO {
     @Pure
     abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
 
+    @Pure
+    abstract @Nullable Duration getConsumerPollingTimeout();
+
     abstract boolean isBounded();
 
     abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
@@ -2086,6 +2107,9 @@ public class KafkaIO {
       abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
           ErrorHandler<BadRecord, ?> badRecordErrorHandler);
 
+      abstract ReadSourceDescriptors.Builder<K, V> setConsumerPollingTimeout(
+          @Nullable Duration duration);
+
       abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded);
 
       abstract ReadSourceDescriptors<K, V> build();
@@ -2099,6 +2123,7 @@ public class KafkaIO {
           .setBounded(false)
           .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
           .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
+          .setConsumerPollingTimeout(Duration.standardSeconds(2L))
           .build()
           .withProcessingTime()
           .withMonotonicallyIncreasingWatermarkEstimator();
@@ -2360,6 +2385,14 @@ public class KafkaIO {
           .build();
     }
 
+    /**
+     * Sets the timeout time for Kafka consumer polling request in the {@link 
ReadFromKafkaDoFn}.
+     * The default is 2 second.
+     */
+    public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable 
Duration duration) {
+      return toBuilder().setConsumerPollingTimeout(duration).build();
+    }
+
     ReadAllFromRow<K, V> forExternalBuild() {
       return new ReadAllFromRow<>(this);
     }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
index a2cc9aaeb4d..7e54407300d 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
@@ -112,6 +112,7 @@ class KafkaIOReadImplementationCompatibility {
     VALUE_DESERIALIZER_PROVIDER,
     CHECK_STOP_READING_FN(SDF),
     BAD_RECORD_ERROR_HANDLER(SDF),
+    CONSUMER_POLLING_TIMEOUT,
     ;
 
     @Nonnull private final ImmutableSet<KafkaIOReadImplementation> 
supportedImplementations;
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 924833290f1..3a821ef9519 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -191,6 +191,12 @@ abstract class ReadFromKafkaDoFn<K, V>
     this.checkStopReadingFn = transform.getCheckStopReadingFn();
     this.badRecordRouter = transform.getBadRecordRouter();
     this.recordTag = recordTag;
+    if (transform.getConsumerPollingTimeout() != null) {
+      this.consumerPollingTimeout =
+          
java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis());
+    } else {
+      this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT;
+    }
   }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
@@ -217,8 +223,9 @@ abstract class ReadFromKafkaDoFn<K, V>
 
   private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> 
avgRecordSize;
 
-  private static final java.time.Duration KAFKA_POLL_TIMEOUT = 
java.time.Duration.ofSeconds(1);
+  private static final java.time.Duration KAFKA_POLL_TIMEOUT = 
java.time.Duration.ofSeconds(2);
 
+  @VisibleForTesting final java.time.Duration consumerPollingTimeout;
   @VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
   @VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
   @VisibleForTesting final Map<String, Object> consumerConfig;
@@ -508,7 +515,7 @@ abstract class ReadFromKafkaDoFn<K, V>
     java.time.Duration elapsed = java.time.Duration.ZERO;
     while (true) {
       final ConsumerRecords<byte[], byte[]> rawRecords =
-          consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed));
+          consumer.poll(consumerPollingTimeout.minus(elapsed));
       if (!rawRecords.isEmpty()) {
         // return as we have found some entries
         return rawRecords;
@@ -518,8 +525,11 @@ abstract class ReadFromKafkaDoFn<K, V>
         return rawRecords;
       }
       elapsed = sw.elapsed();
-      if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
+      if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
         // timeout is over
+        LOG.warn(
+            "No messages retrieved with polling timeout {} seconds. Consider 
increasing the consumer polling timeout using withConsumerPollingTimeout 
method.",
+            consumerPollingTimeout.getSeconds());
         return rawRecords;
       }
     }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 9b15b86051f..44c028f08a2 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -2121,6 +2121,18 @@ public class KafkaIOTest {
     }
   }
 
+  @Test(expected = IllegalStateException.class)
+  public void testWithInvalidConsumerPollingTimeout() {
+    KafkaIO.<Integer, 
Long>read().withConsumerPollingTimeout(Duration.standardSeconds(-5));
+  }
+
+  @Test
+  public void testWithValidConsumerPollingTimeout() {
+    KafkaIO.Read<Integer, Long> reader =
+        KafkaIO.<Integer, 
Long>read().withConsumerPollingTimeout(Duration.standardSeconds(15));
+    assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds());
+  }
+
   private static void verifyProducerRecords(
       MockProducer<Integer, Long> mockProducer,
       String topic,
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 48b5b060a29..8902f22164b 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -641,6 +641,20 @@ public class ReadFromKafkaDoFnTest {
     Assert.assertNotEquals(0, visitor.unboundedPCollections.size());
   }
 
+  @Test
+  public void testConstructorWithPollTimeout() {
+    ReadSourceDescriptors<String, String> descriptors = 
makeReadSourceDescriptor(consumer);
+    // default poll timeout = 1 scond
+    ReadFromKafkaDoFn<String, String> dofnInstance = 
ReadFromKafkaDoFn.create(descriptors, RECORDS);
+    Assert.assertEquals(Duration.ofSeconds(2L), 
dofnInstance.consumerPollingTimeout);
+    // updated timeout = 5 seconds
+    descriptors =
+        
descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L));
+    ReadFromKafkaDoFn<String, String> dofnInstanceNew =
+        ReadFromKafkaDoFn.create(descriptors, RECORDS);
+    Assert.assertEquals(Duration.ofSeconds(5L), 
dofnInstanceNew.consumerPollingTimeout);
+  }
+
   private BoundednessVisitor testBoundedness(
       Function<ReadSourceDescriptors<String, String>, 
ReadSourceDescriptors<String, String>>
           readSourceDescriptorsDecorator) {

Reply via email to