scwhittle commented on code in PR #33073:
URL: https://github.com/apache/beam/pull/33073#discussion_r1836465914
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -674,13 +674,21 @@ public AverageRecordSize() {
this.avgRecordGap = new MovingAvg();
}
- public void update(int recordSize, long gap) {
+ public synchronized void update(int recordSize, long gap) {
avgRecordSize.update(recordSize);
avgRecordGap.update(gap);
}
public double getTotalSize(double numRecords) {
- return avgRecordSize.get() * numRecords / (1 + avgRecordGap.get());
+ double avgRecordSize;
+ double avgRecordGap;
+
+ synchronized (this) {
+ avgRecordSize = this.avgRecordSize.get();
+ avgRecordGap = this.avgRecordGap.get();
+ }
+
+ return avgRecordSize * numRecords / (1 + avgRecordGap);
Review Comment:
Not introduced by your change but why do we divide by the gap? why is it not
just avgRecordSize * totalRecords?
Would be good to comment if there is a reason
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]