sjvanrossum commented on code in PR #33073:
URL: https://github.com/apache/beam/pull/33073#discussion_r1836904057
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -674,13 +674,21 @@ public AverageRecordSize() {
this.avgRecordGap = new MovingAvg();
}
- public void update(int recordSize, long gap) {
+ public synchronized void update(int recordSize, long gap) {
avgRecordSize.update(recordSize);
avgRecordGap.update(gap);
}
public double getTotalSize(double numRecords) {
- return avgRecordSize.get() * numRecords / (1 + avgRecordGap.get());
+ double avgRecordSize;
+ double avgRecordGap;
+
+ synchronized (this) {
+ avgRecordSize = this.avgRecordSize.get();
+ avgRecordGap = this.avgRecordGap.get();
+ }
+
+ return avgRecordSize * numRecords / (1 + avgRecordGap);
Review Comment:
The next record offset for a partition is incremented for every record
stored in that partition. Topic compaction only retains records with the
greatest offset for the record's key within a partition, but the offsets remain
untouched. The size of the backlog (`endOffset - beginningOffset`) may be
greater than the number of records stored in the partition. Fetches from these
partitions may return relatively few records compared to the expected size of
the backlog. The average record gap scales the average record size to the
expected fraction of retained records compared to the observed offsets. If I'm
not mistaken, this estimator can be simplified to only use a (Guava)
`AtomicDouble`.
Adding a `TODO` and some context in a comment block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]