This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a44c4f19ce3 [#30870]: support consumer polling timeout in KafkaIO
expansion service (#30915)
a44c4f19ce3 is described below
commit a44c4f19ce318c91f744ead9e0dd12dfd9eefac6
Author: xianhualiu <[email protected]>
AuthorDate: Tue Apr 16 09:22:35 2024 -0400
[#30870]: support consumer polling timeout in KafkaIO expansion service
(#30915)
* [#30870]: support consumer polling timeout in KafkaIO expansion service
* fixed spotless complains
* fixed python format complains
* Update sdks/python/apache_beam/io/kafka.py
Co-authored-by: Jonathan Sabbagh
<[email protected]>
* Update
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Co-authored-by: Jonathan Sabbagh
<[email protected]>
* Update
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Co-authored-by: Jonathan Sabbagh
<[email protected]>
* fixed formating issue
* fixed pylint and pydoc issues
* shorten the variable name
* fixed format and upgrade test
* fixed test
* fixed test
---------
Co-authored-by: Jonathan Sabbagh
<[email protected]>
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 25 ++++++++++++++++++----
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 5 ++++-
.../io/kafka/upgrade/KafkaIOTranslationTest.java | 1 +
sdks/python/apache_beam/io/kafka.py | 11 ++++++++--
4 files changed, 35 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index c56071e85ad..6a0df589706 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -831,6 +831,16 @@ public class KafkaIO {
// We can expose dynamic read to external build when ReadFromKafkaDoFn
is the default
// implementation.
builder.setDynamicRead(false);
+
+ if (config.consumerPollingTimeout != null) {
+ if (config.consumerPollingTimeout <= 0) {
+ throw new IllegalArgumentException("consumerPollingTimeout should
be > 0.");
+ }
+ builder.setConsumerPollingTimeout(
+ Duration.standardSeconds(config.consumerPollingTimeout));
+ } else {
+ builder.setConsumerPollingTimeout(Duration.standardSeconds(2L));
+ }
}
private static <T> Coder<T> resolveCoder(Class<Deserializer<T>>
deserializer) {
@@ -893,6 +903,7 @@ public class KafkaIO {
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
+ private Long consumerPollingTimeout;
private String timestampPolicy;
public void setConsumerConfig(Map<String, String> consumerConfig) {
@@ -934,6 +945,10 @@ public class KafkaIO {
public void setTimestampPolicy(String timestampPolicy) {
this.timestampPolicy = timestampPolicy;
}
+
+ public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
+ this.consumerPollingTimeout = consumerPollingTimeout;
+ }
}
}
@@ -1341,8 +1356,9 @@ public class KafkaIO {
}
/**
- * Sets the timeout time for Kafka consumer polling request in the {@link
ReadFromKafkaDoFn}.
- * The default is 2 second.
+ * Sets the timeout time for Kafka consumer polling request in the {@link
ReadFromKafkaDoFn}. A
+ * lower timeout optimizes for latency. Increase the timeout if the
consumer is not fetching
+ * enough (or any) records. The default is 2 seconds.
*/
public Read<K, V> withConsumerPollingTimeout(Duration duration) {
checkState(
@@ -2386,8 +2402,9 @@ public class KafkaIO {
}
/**
- * Sets the timeout time for Kafka consumer polling request in the {@link
ReadFromKafkaDoFn}.
- * The default is 2 second.
+ * Sets the timeout time for Kafka consumer polling request in the {@link
ReadFromKafkaDoFn}. A
+ * lower timeout optimizes for latency. Increase the timeout if the
consumer is not fetching
+ * enough (or any) records. The default is 2 seconds.
*/
public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable
Duration duration) {
return toBuilder().setConsumerPollingTimeout(duration).build();
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index dd859af5086..246fdd80d73 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -107,7 +107,8 @@ public class KafkaIOExternalTest {
Field.of("value_deserializer", FieldType.STRING),
Field.of("start_read_time", FieldType.INT64),
Field.of("commit_offset_in_finalize",
FieldType.BOOLEAN),
- Field.of("timestamp_policy", FieldType.STRING)))
+ Field.of("timestamp_policy", FieldType.STRING),
+ Field.of("consumer_polling_timeout", FieldType.INT64)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
@@ -115,6 +116,7 @@ public class KafkaIOExternalTest {
.withFieldValue("start_read_time", startReadTime)
.withFieldValue("commit_offset_in_finalize", false)
.withFieldValue("timestamp_policy", "ProcessingTime")
+ .withFieldValue("consumer_polling_timeout", 5L)
.build());
RunnerApi.Components defaultInstance =
RunnerApi.Components.getDefaultInstance();
@@ -265,6 +267,7 @@ public class KafkaIOExternalTest {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();
+
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
diff --git
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index 18708c56001..f69b9c3649b 100644
---
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -75,6 +75,7 @@ public class KafkaIOTranslationTest {
READ_TRANSFORM_SCHEMA_MAPPING.put(
"getValueDeserializerProvider", "value_deserializer_provider");
READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn",
"check_stop_reading_fn");
+ READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout",
"consumer_polling_timeout");
}
// A mapping from Write transform builder methods to the corresponding
schema fields in
diff --git a/sdks/python/apache_beam/io/kafka.py
b/sdks/python/apache_beam/io/kafka.py
index b96576b4efb..e1aeab8d3a8 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -93,7 +93,8 @@ ReadFromKafkaSchema = typing.NamedTuple(
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
- ('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
+ ('commit_offset_in_finalize', bool), ('timestamp_policy', str),
+ ('consumer_polling_timeout', typing.Optional[int])])
def default_io_expansion_service(append_args=None):
@@ -134,6 +135,7 @@ class ReadFromKafka(ExternalTransform):
max_read_time=None,
commit_offset_in_finalize=False,
timestamp_policy=processing_time_policy,
+ consumer_polling_timeout=None,
with_metadata=False,
expansion_service=None,
):
@@ -159,6 +161,10 @@ class ReadFromKafka(ExternalTransform):
:param commit_offset_in_finalize: Whether to commit offsets when
finalizing.
:param timestamp_policy: The built-in timestamp policy which is used for
extracting timestamp from KafkaRecord.
+ :param consumer_polling_timeout: Kafka client polling request
+ timeout time in seconds. A lower timeout optimizes for latency.
Increase
+ the timeout if the consumer is not fetching any records. Default is 2
+ seconds.
:param with_metadata: whether the returned PCollection should contain
Kafka related metadata or not. If False (default), elements of the
returned PCollection will be of type 'bytes' if True, elements of the
@@ -186,7 +192,8 @@ class ReadFromKafka(ExternalTransform):
max_read_time=max_read_time,
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
- timestamp_policy=timestamp_policy)),
+ timestamp_policy=timestamp_policy,
+ consumer_polling_timeout=consumer_polling_timeout)),
expansion_service or default_io_expansion_service())