This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 35cfad98818 remove listing topics when processing each element (#31897)
35cfad98818 is described below

commit 35cfad988187f45d62004579a38a331ab336053b
Author: Naireen Hussain <[email protected]>
AuthorDate: Tue Jul 23 10:18:23 2024 -0700

    remove listing topics when processing each element (#31897)
    
    Co-authored-by: Naireen <[email protected]>
---
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 34 +++++++++++++---------
 .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java   |  2 +-
 2 files changed, 22 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 100f06d42d0..d6ec9015a95 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -436,19 +436,6 @@ abstract class ReadFromKafkaDoFn<K, V>
         "Creating Kafka consumer for process continuation for {}",
         kafkaSourceDescriptor.getTopicPartition());
     try (Consumer<byte[], byte[]> consumer = 
consumerFactoryFn.apply(updatedConsumerConfig)) {
-      // Check whether current TopicPartition is still available to read.
-      Set<TopicPartition> existingTopicPartitions = new HashSet<>();
-      for (List<PartitionInfo> topicPartitionList : 
consumer.listTopics().values()) {
-        topicPartitionList.forEach(
-            partitionInfo -> {
-              existingTopicPartitions.add(
-                  new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()));
-            });
-      }
-      if 
(!existingTopicPartitions.contains(kafkaSourceDescriptor.getTopicPartition())) {
-        return ProcessContinuation.stop();
-      }
-
       ConsumerSpEL.evaluateAssign(
           consumer, 
ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
       long startOffset = tracker.currentRestriction().getFrom();
@@ -462,6 +449,10 @@ abstract class ReadFromKafkaDoFn<K, V>
         // When there are no records available for the current TopicPartition, 
self-checkpoint
         // and move to process the next element.
         if (rawRecords.isEmpty()) {
+          if (!topicPartitionExists(
+              kafkaSourceDescriptor.getTopicPartition(), 
consumer.listTopics())) {
+            return ProcessContinuation.stop();
+          }
           if (timestampPolicy != null) {
             updateWatermarkManually(timestampPolicy, watermarkEstimator, 
tracker);
           }
@@ -522,6 +513,23 @@ abstract class ReadFromKafkaDoFn<K, V>
     }
   }
 
+  private boolean topicPartitionExists(
+      TopicPartition topicPartition, Map<String, List<PartitionInfo>> 
topicListMap) {
+    // Check if the current TopicPartition still exists.
+    Set<TopicPartition> existingTopicPartitions = new HashSet<>();
+    for (List<PartitionInfo> topicPartitionList : topicListMap.values()) {
+      topicPartitionList.forEach(
+          partitionInfo -> {
+            existingTopicPartitions.add(
+                new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()));
+          });
+    }
+    if (!existingTopicPartitions.contains(topicPartition)) {
+      return false;
+    }
+    return true;
+  }
+
   // see https://github.com/apache/beam/issues/25962
   private ConsumerRecords<byte[], byte[]> poll(
       Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index b8ff08485c3..612b20393d7 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -515,7 +515,7 @@ public class ReadFromKafkaDoFnTest {
   public void testProcessElementWhenTopicPartitionIsRemoved() throws Exception 
{
     MockMultiOutputReceiver receiver = new MockMultiOutputReceiver();
     consumer.setRemoved();
-    consumer.setNumOfRecordsPerPoll(10);
+    consumer.setNumOfRecordsPerPoll(-1);
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, 
Long.MAX_VALUE));
     ProcessContinuation result =
         dofnInstance.processElement(

Reply via email to