Github user greghogan commented on a diff in the pull request:
https://github.com/apache/flink/pull/2639#discussion_r84054277
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
---
@@ -28,51 +28,52 @@
public class AverageAccumulator implements SimpleAccumulator<Double> {
private static final long serialVersionUID = 3672555084179165255L;
-
- private double localValue;
+
private long count;
+ private double sum;
+
@Override
public void add(Double value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(double value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(long value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(int value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
@Override
public Double getLocalValue() {
if (this.count == 0) {
return 0.0;
}
- return this.localValue / (double)this.count;
+ return this.sum / this.count;
}
@Override
public void resetLocal() {
this.count = 0;
- this.localValue = 0;
+ this.sum = 0;
}
@Override
public void merge(Accumulator<Double, Double> other) {
if (other instanceof AverageAccumulator) {
- AverageAccumulator temp = (AverageAccumulator)other;
- this.count += temp.count;
- this.localValue += other.getLocalValue();
--- End diff --
Yes, and the test did not catch the bug since it was only merging the
average of two single values (where the sum is the average).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---