scwhittle commented on code in PR #33073:
URL: https://github.com/apache/beam/pull/33073#discussion_r1837724469
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -665,22 +668,42 @@ private Map<String, Object>
overrideBootstrapServersConfig(
return config;
}
+ // TODO: Collapse the two moving average trackers into a single accumulator
using a single Guava
+ // AtomicDouble. Note that this requires that a single thread will call
update and that while get
+ // may be called by multiple threads the method must only load the
accumulator itself.
+ @ThreadSafe
private static class AverageRecordSize {
+ @GuardedBy("this")
private MovingAvg avgRecordSize;
+
+ @GuardedBy("this")
private MovingAvg avgRecordGap;
public AverageRecordSize() {
this.avgRecordSize = new MovingAvg();
this.avgRecordGap = new MovingAvg();
}
- public void update(int recordSize, long gap) {
+ public synchronized void update(int recordSize, long gap) {
avgRecordSize.update(recordSize);
avgRecordGap.update(gap);
}
- public double getTotalSize(double numRecords) {
- return avgRecordSize.get() * numRecords / (1 + avgRecordGap.get());
+ public double getGapsFilledWithZeros() {
Review Comment:
this method name isn't particuarly clear to me.
How about estimateRecordByteToOffsetCountRatio?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -340,11 +342,11 @@ public double getSize(
throws Exception {
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize =
Preconditions.checkStateNotNull(this.avgRecordSize);
- double numRecords =
+ double estimatedNumRecords =
Review Comment:
Can you add comment that kafka offset is in terms of records?
Part of my confusion is thinking that it was bytes.
You could also name this offsetCount or offsetRange to capture what it is
measuring more precisely. Then when returning directly you can comment that
since we don't have mroe information we treat it as records. But in the bottom
case it will be easier to see the units line up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]