This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 682eaeff69d [KafkaIO] Fix potential data race in
ReadFromKafkaDoFn.AverageRecordSize (#33073)
682eaeff69d is described below
commit 682eaeff69d944dc1ed399db9e4ceeacdc72e710
Author: Steven van Rossum <[email protected]>
AuthorDate: Tue Nov 12 13:55:03 2024 +0100
[KafkaIO] Fix potential data race in ReadFromKafkaDoFn.AverageRecordSize
(#33073)
* Add comments clarifying offets and record size calculation
---
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 44 +++++++++++++++++++---
1 file changed, 38 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 7c206488348..add76c9682a 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
@@ -338,13 +340,18 @@ abstract class ReadFromKafkaDoFn<K, V>
public double getSize(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction
OffsetRange offsetRange)
throws Exception {
+ // If present, estimates the record size to offset gap ratio. Compacted
topics may hold less
+ // records than the estimated offset range due to record deletion within a
partition.
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize =
Preconditions.checkStateNotNull(this.avgRecordSize);
- double numRecords =
+ // The tracker estimates the offset range by subtracting the last claimed
position from the
+ // currently observed end offset for the partition belonging to this split.
+ double estimatedOffsetRange =
restrictionTracker(kafkaSourceDescriptor,
offsetRange).getProgress().getWorkRemaining();
// Before processing elements, we don't have a good estimated size of
records and offset gap.
+ // Return the estimated offset range without scaling by a size to gap
ratio.
if
(!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition()))
{
- return numRecords;
+ return estimatedOffsetRange;
}
if (offsetEstimatorCache != null) {
for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
@@ -353,7 +360,12 @@ abstract class ReadFromKafkaDoFn<K, V>
}
}
- return
avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords);
+ // When processing elements, a moving average estimates the size of
records and offset gap.
+ // Return the estimated offset range scaled by the estimated size to gap
ratio.
+ return estimatedOffsetRange
+ * avgRecordSize
+ .get(kafkaSourceDescriptor.getTopicPartition())
+ .estimateRecordByteSizeToOffsetCountRatio();
}
@NewTracker
@@ -665,8 +677,15 @@ abstract class ReadFromKafkaDoFn<K, V>
return config;
}
+ // TODO: Collapse the two moving average trackers into a single accumulator
using a single Guava
+ // AtomicDouble. Note that this requires that a single thread will call
update and that while get
+ // may be called by multiple threads the method must only load the
accumulator itself.
+ @ThreadSafe
private static class AverageRecordSize {
+ @GuardedBy("this")
private MovingAvg avgRecordSize;
+
+ @GuardedBy("this")
private MovingAvg avgRecordGap;
public AverageRecordSize() {
@@ -674,13 +693,26 @@ abstract class ReadFromKafkaDoFn<K, V>
this.avgRecordGap = new MovingAvg();
}
- public void update(int recordSize, long gap) {
+ public synchronized void update(int recordSize, long gap) {
avgRecordSize.update(recordSize);
avgRecordGap.update(gap);
}
- public double getTotalSize(double numRecords) {
- return avgRecordSize.get() * numRecords / (1 + avgRecordGap.get());
+ public double estimateRecordByteSizeToOffsetCountRatio() {
+ double avgRecordSize;
+ double avgRecordGap;
+
+ synchronized (this) {
+ avgRecordSize = this.avgRecordSize.get();
+ avgRecordGap = this.avgRecordGap.get();
+ }
+
+ // The offset increases between records in a batch fetched from a
compacted topic may be
+ // greater than 1. Compacted topics only store records with the greatest
offset per key per
+ // partition, the records in between are deleted and will not be
observed by a consumer.
+ // The observed gap between offsets is used to estimate the number of
records that are likely
+ // to be observed for the provided number of records.
+ return avgRecordSize / (1 + avgRecordGap);
}
}