This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a44c4f19ce3 [#30870]: support consumer polling timeout in KafkaIO 
expansion service (#30915)
a44c4f19ce3 is described below

commit a44c4f19ce318c91f744ead9e0dd12dfd9eefac6
Author: xianhualiu <[email protected]>
AuthorDate: Tue Apr 16 09:22:35 2024 -0400

    [#30870]: support consumer polling timeout in KafkaIO expansion service 
(#30915)
    
    * [#30870]: support consumer polling timeout in KafkaIO expansion service
    
    * fixed spotless complains
    
    * fixed python format complains
    
    * Update sdks/python/apache_beam/io/kafka.py
    
    Co-authored-by: Jonathan Sabbagh 
<[email protected]>
    
    * Update 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
    
    Co-authored-by: Jonathan Sabbagh 
<[email protected]>
    
    * Update 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
    
    Co-authored-by: Jonathan Sabbagh 
<[email protected]>
    
    * fixed formating issue
    
    * fixed pylint and pydoc issues
    
    * shorten the variable name
    
    * fixed format and upgrade test
    
    * fixed test
    
    * fixed test
    
    ---------
    
    Co-authored-by: Jonathan Sabbagh 
<[email protected]>
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 25 ++++++++++++++++++----
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     |  5 ++++-
 .../io/kafka/upgrade/KafkaIOTranslationTest.java   |  1 +
 sdks/python/apache_beam/io/kafka.py                | 11 ++++++++--
 4 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index c56071e85ad..6a0df589706 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -831,6 +831,16 @@ public class KafkaIO {
         // We can expose dynamic read to external build when ReadFromKafkaDoFn 
is the default
         // implementation.
         builder.setDynamicRead(false);
+
+        if (config.consumerPollingTimeout != null) {
+          if (config.consumerPollingTimeout <= 0) {
+            throw new IllegalArgumentException("consumerPollingTimeout should 
be > 0.");
+          }
+          builder.setConsumerPollingTimeout(
+              Duration.standardSeconds(config.consumerPollingTimeout));
+        } else {
+          builder.setConsumerPollingTimeout(Duration.standardSeconds(2L));
+        }
       }
 
       private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> 
deserializer) {
@@ -893,6 +903,7 @@ public class KafkaIO {
         private Long maxNumRecords;
         private Long maxReadTime;
         private Boolean commitOffsetInFinalize;
+        private Long consumerPollingTimeout;
         private String timestampPolicy;
 
         public void setConsumerConfig(Map<String, String> consumerConfig) {
@@ -934,6 +945,10 @@ public class KafkaIO {
         public void setTimestampPolicy(String timestampPolicy) {
           this.timestampPolicy = timestampPolicy;
         }
+
+        public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
+          this.consumerPollingTimeout = consumerPollingTimeout;
+        }
       }
     }
 
@@ -1341,8 +1356,9 @@ public class KafkaIO {
     }
 
     /**
-     * Sets the timeout time for Kafka consumer polling request in the {@link 
ReadFromKafkaDoFn}.
-     * The default is 2 second.
+     * Sets the timeout time for Kafka consumer polling request in the {@link 
ReadFromKafkaDoFn}. A
+     * lower timeout optimizes for latency. Increase the timeout if the 
consumer is not fetching
+     * enough (or any) records. The default is 2 seconds.
      */
     public Read<K, V> withConsumerPollingTimeout(Duration duration) {
       checkState(
@@ -2386,8 +2402,9 @@ public class KafkaIO {
     }
 
     /**
-     * Sets the timeout time for Kafka consumer polling request in the {@link 
ReadFromKafkaDoFn}.
-     * The default is 2 second.
+     * Sets the timeout time for Kafka consumer polling request in the {@link 
ReadFromKafkaDoFn}. A
+     * lower timeout optimizes for latency. Increase the timeout if the 
consumer is not fetching
+     * enough (or any) records. The default is 2 seconds.
      */
     public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable 
Duration duration) {
       return toBuilder().setConsumerPollingTimeout(duration).build();
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index dd859af5086..246fdd80d73 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -107,7 +107,8 @@ public class KafkaIOExternalTest {
                         Field.of("value_deserializer", FieldType.STRING),
                         Field.of("start_read_time", FieldType.INT64),
                         Field.of("commit_offset_in_finalize", 
FieldType.BOOLEAN),
-                        Field.of("timestamp_policy", FieldType.STRING)))
+                        Field.of("timestamp_policy", FieldType.STRING),
+                        Field.of("consumer_polling_timeout", FieldType.INT64)))
                 .withFieldValue("topics", topics)
                 .withFieldValue("consumer_config", consumerConfig)
                 .withFieldValue("key_deserializer", keyDeserializer)
@@ -115,6 +116,7 @@ public class KafkaIOExternalTest {
                 .withFieldValue("start_read_time", startReadTime)
                 .withFieldValue("commit_offset_in_finalize", false)
                 .withFieldValue("timestamp_policy", "ProcessingTime")
+                .withFieldValue("consumer_polling_timeout", 5L)
                 .build());
 
     RunnerApi.Components defaultInstance = 
RunnerApi.Components.getDefaultInstance();
@@ -265,6 +267,7 @@ public class KafkaIOExternalTest {
     expansionService.expand(request, observer);
     ExpansionApi.ExpansionResponse result = observer.result;
     RunnerApi.PTransform transform = result.getTransform();
+
     assertThat(
         transform.getSubtransformsList(),
         Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
diff --git 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index 18708c56001..f69b9c3649b 100644
--- 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++ 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -75,6 +75,7 @@ public class KafkaIOTranslationTest {
     READ_TRANSFORM_SCHEMA_MAPPING.put(
         "getValueDeserializerProvider", "value_deserializer_provider");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", 
"check_stop_reading_fn");
+    READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", 
"consumer_polling_timeout");
   }
 
   // A mapping from Write transform builder methods to the corresponding 
schema fields in
diff --git a/sdks/python/apache_beam/io/kafka.py 
b/sdks/python/apache_beam/io/kafka.py
index b96576b4efb..e1aeab8d3a8 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -93,7 +93,8 @@ ReadFromKafkaSchema = typing.NamedTuple(
      ('value_deserializer', str), ('start_read_time', typing.Optional[int]),
      ('max_num_records', typing.Optional[int]),
      ('max_read_time', typing.Optional[int]),
-     ('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
+     ('commit_offset_in_finalize', bool), ('timestamp_policy', str),
+     ('consumer_polling_timeout', typing.Optional[int])])
 
 
 def default_io_expansion_service(append_args=None):
@@ -134,6 +135,7 @@ class ReadFromKafka(ExternalTransform):
       max_read_time=None,
       commit_offset_in_finalize=False,
       timestamp_policy=processing_time_policy,
+      consumer_polling_timeout=None,
       with_metadata=False,
       expansion_service=None,
   ):
@@ -159,6 +161,10 @@ class ReadFromKafka(ExternalTransform):
     :param commit_offset_in_finalize: Whether to commit offsets when 
finalizing.
     :param timestamp_policy: The built-in timestamp policy which is used for
         extracting timestamp from KafkaRecord.
+    :param consumer_polling_timeout: Kafka client polling request
+        timeout time in seconds. A lower timeout optimizes for latency. 
Increase                                   
+        the timeout if the consumer is not fetching any records. Default is 2
+        seconds.
     :param with_metadata: whether the returned PCollection should contain
         Kafka related metadata or not. If False (default), elements of the
         returned PCollection will be of type 'bytes' if True, elements of the
@@ -186,7 +192,8 @@ class ReadFromKafka(ExternalTransform):
                 max_read_time=max_read_time,
                 start_read_time=start_read_time,
                 commit_offset_in_finalize=commit_offset_in_finalize,
-                timestamp_policy=timestamp_policy)),
+                timestamp_policy=timestamp_policy,
+                consumer_polling_timeout=consumer_polling_timeout)),
         expansion_service or default_io_expansion_service())
 
 

Reply via email to