sjvanrossum commented on code in PR #33073:
URL: https://github.com/apache/beam/pull/33073#discussion_r1836904057


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -674,13 +674,21 @@ public AverageRecordSize() {
       this.avgRecordGap = new MovingAvg();
     }
 
-    public void update(int recordSize, long gap) {
+    public synchronized void update(int recordSize, long gap) {
       avgRecordSize.update(recordSize);
       avgRecordGap.update(gap);
     }
 
     public double getTotalSize(double numRecords) {
-      return avgRecordSize.get() * numRecords / (1 + avgRecordGap.get());
+      double avgRecordSize;
+      double avgRecordGap;
+
+      synchronized (this) {
+        avgRecordSize = this.avgRecordSize.get();
+        avgRecordGap = this.avgRecordGap.get();
+      }
+
+      return avgRecordSize * numRecords / (1 + avgRecordGap);

Review Comment:
   The next record offset for a partition is incremented for every record 
stored in that partition. Topic compaction only retains records with the 
greatest offset for the record's key within a partition, but the offsets remain 
untouched. The size of the backlog (`endOffset - beginningOffset`) may be 
greater than the number of records stored in the partition. Fetches from these 
partitions may return relatively few records compared to the expected size of 
the backlog. The average record gap scales the average record size to the 
expected fraction of retained records compared to the observed offsets. If I'm 
not mistaken, this estimator can be simplified to only use a (Guava) 
`AtomicDouble`.
   
   Adding a `TODO` and some context in a comment block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to