This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 682eaeff69d [KafkaIO] Fix potential data race in 
ReadFromKafkaDoFn.AverageRecordSize (#33073)
682eaeff69d is described below

commit 682eaeff69d944dc1ed399db9e4ceeacdc72e710
Author: Steven van Rossum <[email protected]>
AuthorDate: Tue Nov 12 13:55:03 2024 +0100

    [KafkaIO] Fix potential data race in ReadFromKafkaDoFn.AverageRecordSize 
(#33073)
    
    * Add comments clarifying offets and record size calculation
---
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 44 +++++++++++++++++++---
 1 file changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 7c206488348..add76c9682a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
 import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
@@ -338,13 +340,18 @@ abstract class ReadFromKafkaDoFn<K, V>
   public double getSize(
       @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction 
OffsetRange offsetRange)
       throws Exception {
+    // If present, estimates the record size to offset gap ratio. Compacted 
topics may hold less
+    // records than the estimated offset range due to record deletion within a 
partition.
     final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize =
         Preconditions.checkStateNotNull(this.avgRecordSize);
-    double numRecords =
+    // The tracker estimates the offset range by subtracting the last claimed 
position from the
+    // currently observed end offset for the partition belonging to this split.
+    double estimatedOffsetRange =
         restrictionTracker(kafkaSourceDescriptor, 
offsetRange).getProgress().getWorkRemaining();
     // Before processing elements, we don't have a good estimated size of 
records and offset gap.
+    // Return the estimated offset range without scaling by a size to gap 
ratio.
     if 
(!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) 
{
-      return numRecords;
+      return estimatedOffsetRange;
     }
     if (offsetEstimatorCache != null) {
       for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
@@ -353,7 +360,12 @@ abstract class ReadFromKafkaDoFn<K, V>
       }
     }
 
-    return 
avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords);
+    // When processing elements, a moving average estimates the size of 
records and offset gap.
+    // Return the estimated offset range scaled by the estimated size to gap 
ratio.
+    return estimatedOffsetRange
+        * avgRecordSize
+            .get(kafkaSourceDescriptor.getTopicPartition())
+            .estimateRecordByteSizeToOffsetCountRatio();
   }
 
   @NewTracker
@@ -665,8 +677,15 @@ abstract class ReadFromKafkaDoFn<K, V>
     return config;
   }
 
+  // TODO: Collapse the two moving average trackers into a single accumulator 
using a single Guava
+  // AtomicDouble. Note that this requires that a single thread will call 
update and that while get
+  // may be called by multiple threads the method must only load the 
accumulator itself.
+  @ThreadSafe
   private static class AverageRecordSize {
+    @GuardedBy("this")
     private MovingAvg avgRecordSize;
+
+    @GuardedBy("this")
     private MovingAvg avgRecordGap;
 
     public AverageRecordSize() {
@@ -674,13 +693,26 @@ abstract class ReadFromKafkaDoFn<K, V>
       this.avgRecordGap = new MovingAvg();
     }
 
-    public void update(int recordSize, long gap) {
+    public synchronized void update(int recordSize, long gap) {
       avgRecordSize.update(recordSize);
       avgRecordGap.update(gap);
     }
 
-    public double getTotalSize(double numRecords) {
-      return avgRecordSize.get() * numRecords / (1 + avgRecordGap.get());
+    public double estimateRecordByteSizeToOffsetCountRatio() {
+      double avgRecordSize;
+      double avgRecordGap;
+
+      synchronized (this) {
+        avgRecordSize = this.avgRecordSize.get();
+        avgRecordGap = this.avgRecordGap.get();
+      }
+
+      // The offset increases between records in a batch fetched from a 
compacted topic may be
+      // greater than 1. Compacted topics only store records with the greatest 
offset per key per
+      // partition, the records in between are deleted and will not be 
observed by a consumer.
+      // The observed gap between offsets is used to estimate the number of 
records that are likely
+      // to be observed for the provided number of records.
+      return avgRecordSize / (1 + avgRecordGap);
     }
   }
 

Reply via email to