This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3f4b2561c58 [bug30870]: make consumer polling timeout configurable for
KafkaIO.Read (#30877)
3f4b2561c58 is described below
commit 3f4b2561c58c9c0bf76f4144fb9f06aaaad424f5
Author: xianhualiu <[email protected]>
AuthorDate: Tue Apr 9 19:44:06 2024 -0400
[bug30870]: make consumer polling timeout configurable for KafkaIO.Read
(#30877)
* [bug30870]: make consumer polling timeout configurable for KafkaIO.Read
* fixed spotless complains
* fixed unit tests
* added logs and increased default polling timeout from 1 to 2 seconds.
* spotless apply changes
* Update CHANGES.md
updated changes.md with changes to make consumer polling timeout
configurable for KafkaIO.Read
* Update CHANGES.md
* Update CHANGES.md
added break changes
* Update CHANGES.md
---
CHANGES.md | 1 +
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 35 +++++++++++++++++++++-
.../KafkaIOReadImplementationCompatibility.java | 1 +
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 16 ++++++++--
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 12 ++++++++
.../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 14 +++++++++
6 files changed, 75 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5824c71a98d..941ba23a757 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -73,6 +73,7 @@
## Breaking Changes
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* Default consumer polling timeout for KafkaIO.Read was increased from 1
second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration
duration) to configure this timeout value when necessary
([#30870](https://github.com/apache/beam/issues/30870)).
## Deprecations
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 231a1b9e49e..c56071e85ad 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -587,6 +587,7 @@ public class KafkaIO {
.setCommitOffsetsInFinalizeEnabled(false)
.setDynamicRead(false)
.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
+ .setConsumerPollingTimeout(Duration.standardSeconds(2L))
.build();
}
@@ -706,6 +707,9 @@ public class KafkaIO {
@Pure
public abstract @Nullable ErrorHandler<BadRecord, ?>
getBadRecordErrorHandler();
+ @Pure
+ public abstract @Nullable Duration getConsumerPollingTimeout();
+
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
@@ -762,6 +766,8 @@ public class KafkaIO {
return
setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
}
+ abstract Builder<K, V> setConsumerPollingTimeout(Duration
consumerPollingTimeout);
+
abstract Read<K, V> build();
static <K, V> void setupExternalBuilder(
@@ -1334,6 +1340,17 @@ public class KafkaIO {
return
toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build();
}
+ /**
+ * Sets the timeout time for Kafka consumer polling request in the {@link
ReadFromKafkaDoFn}.
+ * The default is 2 second.
+ */
+ public Read<K, V> withConsumerPollingTimeout(Duration duration) {
+ checkState(
+ duration == null || duration.compareTo(Duration.ZERO) > 0,
+ "Consumer polling timeout must be greater than 0.");
+ return toBuilder().setConsumerPollingTimeout(duration).build();
+ }
+
/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping
Kafka metatdata. */
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
@@ -1596,7 +1613,8 @@ public class KafkaIO {
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
.withManualWatermarkEstimator()
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
- .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
+ .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn())
+
.withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout());
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
readTransform = readTransform.commitOffsets();
}
@@ -2036,6 +2054,9 @@ public class KafkaIO {
@Pure
abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
+ @Pure
+ abstract @Nullable Duration getConsumerPollingTimeout();
+
abstract boolean isBounded();
abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
@@ -2086,6 +2107,9 @@ public class KafkaIO {
abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);
+ abstract ReadSourceDescriptors.Builder<K, V> setConsumerPollingTimeout(
+ @Nullable Duration duration);
+
abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded);
abstract ReadSourceDescriptors<K, V> build();
@@ -2099,6 +2123,7 @@ public class KafkaIO {
.setBounded(false)
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
+ .setConsumerPollingTimeout(Duration.standardSeconds(2L))
.build()
.withProcessingTime()
.withMonotonicallyIncreasingWatermarkEstimator();
@@ -2360,6 +2385,14 @@ public class KafkaIO {
.build();
}
+ /**
+ * Sets the timeout time for Kafka consumer polling request in the {@link
ReadFromKafkaDoFn}.
+ * The default is 2 second.
+ */
+ public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable
Duration duration) {
+ return toBuilder().setConsumerPollingTimeout(duration).build();
+ }
+
ReadAllFromRow<K, V> forExternalBuild() {
return new ReadAllFromRow<>(this);
}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
index a2cc9aaeb4d..7e54407300d 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
@@ -112,6 +112,7 @@ class KafkaIOReadImplementationCompatibility {
VALUE_DESERIALIZER_PROVIDER,
CHECK_STOP_READING_FN(SDF),
BAD_RECORD_ERROR_HANDLER(SDF),
+ CONSUMER_POLLING_TIMEOUT,
;
@Nonnull private final ImmutableSet<KafkaIOReadImplementation>
supportedImplementations;
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 924833290f1..3a821ef9519 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -191,6 +191,12 @@ abstract class ReadFromKafkaDoFn<K, V>
this.checkStopReadingFn = transform.getCheckStopReadingFn();
this.badRecordRouter = transform.getBadRecordRouter();
this.recordTag = recordTag;
+ if (transform.getConsumerPollingTimeout() != null) {
+ this.consumerPollingTimeout =
+
java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis());
+ } else {
+ this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT;
+ }
}
private static final Logger LOG =
LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
@@ -217,8 +223,9 @@ abstract class ReadFromKafkaDoFn<K, V>
private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize>
avgRecordSize;
- private static final java.time.Duration KAFKA_POLL_TIMEOUT =
java.time.Duration.ofSeconds(1);
+ private static final java.time.Duration KAFKA_POLL_TIMEOUT =
java.time.Duration.ofSeconds(2);
+ @VisibleForTesting final java.time.Duration consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
@VisibleForTesting final Map<String, Object> consumerConfig;
@@ -508,7 +515,7 @@ abstract class ReadFromKafkaDoFn<K, V>
java.time.Duration elapsed = java.time.Duration.ZERO;
while (true) {
final ConsumerRecords<byte[], byte[]> rawRecords =
- consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed));
+ consumer.poll(consumerPollingTimeout.minus(elapsed));
if (!rawRecords.isEmpty()) {
// return as we have found some entries
return rawRecords;
@@ -518,8 +525,11 @@ abstract class ReadFromKafkaDoFn<K, V>
return rawRecords;
}
elapsed = sw.elapsed();
- if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
+ if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
// timeout is over
+ LOG.warn(
+ "No messages retrieved with polling timeout {} seconds. Consider
increasing the consumer polling timeout using withConsumerPollingTimeout
method.",
+ consumerPollingTimeout.getSeconds());
return rawRecords;
}
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 9b15b86051f..44c028f08a2 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -2121,6 +2121,18 @@ public class KafkaIOTest {
}
}
+ @Test(expected = IllegalStateException.class)
+ public void testWithInvalidConsumerPollingTimeout() {
+ KafkaIO.<Integer,
Long>read().withConsumerPollingTimeout(Duration.standardSeconds(-5));
+ }
+
+ @Test
+ public void testWithValidConsumerPollingTimeout() {
+ KafkaIO.Read<Integer, Long> reader =
+ KafkaIO.<Integer,
Long>read().withConsumerPollingTimeout(Duration.standardSeconds(15));
+ assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds());
+ }
+
private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 48b5b060a29..8902f22164b 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -641,6 +641,20 @@ public class ReadFromKafkaDoFnTest {
Assert.assertNotEquals(0, visitor.unboundedPCollections.size());
}
+ @Test
+ public void testConstructorWithPollTimeout() {
+ ReadSourceDescriptors<String, String> descriptors =
makeReadSourceDescriptor(consumer);
+ // default poll timeout = 1 scond
+ ReadFromKafkaDoFn<String, String> dofnInstance =
ReadFromKafkaDoFn.create(descriptors, RECORDS);
+ Assert.assertEquals(Duration.ofSeconds(2L),
dofnInstance.consumerPollingTimeout);
+ // updated timeout = 5 seconds
+ descriptors =
+
descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L));
+ ReadFromKafkaDoFn<String, String> dofnInstanceNew =
+ ReadFromKafkaDoFn.create(descriptors, RECORDS);
+ Assert.assertEquals(Duration.ofSeconds(5L),
dofnInstanceNew.consumerPollingTimeout);
+ }
+
private BoundednessVisitor testBoundedness(
Function<ReadSourceDescriptors<String, String>,
ReadSourceDescriptors<String, String>>
readSourceDescriptorsDecorator) {