This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 35cfad98818 remove listing topics when processing each element (#31897)
35cfad98818 is described below
commit 35cfad988187f45d62004579a38a331ab336053b
Author: Naireen Hussain <[email protected]>
AuthorDate: Tue Jul 23 10:18:23 2024 -0700
remove listing topics when processing each element (#31897)
Co-authored-by: Naireen <[email protected]>
---
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 34 +++++++++++++---------
.../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 2 +-
2 files changed, 22 insertions(+), 14 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 100f06d42d0..d6ec9015a95 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -436,19 +436,6 @@ abstract class ReadFromKafkaDoFn<K, V>
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());
try (Consumer<byte[], byte[]> consumer =
consumerFactoryFn.apply(updatedConsumerConfig)) {
- // Check whether current TopicPartition is still available to read.
- Set<TopicPartition> existingTopicPartitions = new HashSet<>();
- for (List<PartitionInfo> topicPartitionList :
consumer.listTopics().values()) {
- topicPartitionList.forEach(
- partitionInfo -> {
- existingTopicPartitions.add(
- new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()));
- });
- }
- if
(!existingTopicPartitions.contains(kafkaSourceDescriptor.getTopicPartition())) {
- return ProcessContinuation.stop();
- }
-
ConsumerSpEL.evaluateAssign(
consumer,
ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
long startOffset = tracker.currentRestriction().getFrom();
@@ -462,6 +449,10 @@ abstract class ReadFromKafkaDoFn<K, V>
// When there are no records available for the current TopicPartition,
self-checkpoint
// and move to process the next element.
if (rawRecords.isEmpty()) {
+ if (!topicPartitionExists(
+ kafkaSourceDescriptor.getTopicPartition(),
consumer.listTopics())) {
+ return ProcessContinuation.stop();
+ }
if (timestampPolicy != null) {
updateWatermarkManually(timestampPolicy, watermarkEstimator,
tracker);
}
@@ -522,6 +513,23 @@ abstract class ReadFromKafkaDoFn<K, V>
}
}
+ private boolean topicPartitionExists(
+ TopicPartition topicPartition, Map<String, List<PartitionInfo>>
topicListMap) {
+ // Check if the current TopicPartition still exists.
+ Set<TopicPartition> existingTopicPartitions = new HashSet<>();
+ for (List<PartitionInfo> topicPartitionList : topicListMap.values()) {
+ topicPartitionList.forEach(
+ partitionInfo -> {
+ existingTopicPartitions.add(
+ new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()));
+ });
+ }
+ if (!existingTopicPartitions.contains(topicPartition)) {
+ return false;
+ }
+ return true;
+ }
+
// see https://github.com/apache/beam/issues/25962
private ConsumerRecords<byte[], byte[]> poll(
Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index b8ff08485c3..612b20393d7 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -515,7 +515,7 @@ public class ReadFromKafkaDoFnTest {
public void testProcessElementWhenTopicPartitionIsRemoved() throws Exception
{
MockMultiOutputReceiver receiver = new MockMultiOutputReceiver();
consumer.setRemoved();
- consumer.setNumOfRecordsPerPoll(10);
+ consumer.setNumOfRecordsPerPoll(-1);
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L,
Long.MAX_VALUE));
ProcessContinuation result =
dofnInstance.processElement(