Repository: flink Updated Branches: refs/heads/release-1.1 210230c4a -> 9c87f92cb
[FLINK-4586] [core] Broken AverageAccumulator This closes #2639 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c87f92c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c87f92c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c87f92c Branch: refs/heads/release-1.1 Commit: 9c87f92cbb9989281024c5c300cd18b962ac4357 Parents: 210230c Author: Greg Hogan <[email protected]> Authored: Fri Oct 14 16:18:52 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Wed Oct 19 10:39:23 2016 -0400 ---------------------------------------------------------------------- .../common/accumulators/AverageAccumulator.java | 27 ++++++++++---------- .../accumulators/AverageAccumulatorTest.java | 18 ++++++++----- 2 files changed, 26 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9c87f92c/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java index 9c0f62f..67cf572 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java @@ -28,29 +28,30 @@ import org.apache.flink.annotation.Public; public class AverageAccumulator implements SimpleAccumulator<Double> { private static final long serialVersionUID = 3672555084179165255L; - - private double localValue; + private long count; + private double sum; + @Override public void add(Double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(long value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(int value) { this.count++; - this.localValue += value; + this.sum += value; } @Override @@ -58,21 +59,21 @@ public class AverageAccumulator implements SimpleAccumulator<Double> { if (this.count == 0) { return 0.0; } - return this.localValue / (double)this.count; + return this.sum / this.count; } @Override public void resetLocal() { this.count = 0; - this.localValue = 0; + this.sum = 0; } @Override public void merge(Accumulator<Double, Double> other) { if (other instanceof AverageAccumulator) { - AverageAccumulator temp = (AverageAccumulator)other; - this.count += temp.count; - this.localValue += other.getLocalValue(); + AverageAccumulator avg = (AverageAccumulator)other; + this.count += avg.count; + this.sum += avg.sum; } else { throw new IllegalArgumentException("The merged accumulator must be AverageAccumulator."); } @@ -81,13 +82,13 @@ public class AverageAccumulator implements SimpleAccumulator<Double> { @Override public AverageAccumulator clone() { AverageAccumulator average = new AverageAccumulator(); - average.localValue = this.localValue; average.count = this.count; + average.sum = this.sum; return average; } @Override public String toString() { - return "AverageAccumulator " + this.localValue + " count " + this.count; + return "AverageAccumulator " + this.getLocalValue() + " for " + this.count + " elements"; } } http://git-wip-us.apache.org/repos/asf/flink/blob/9c87f92c/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java index 9ebd27c..585511f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java @@ -83,12 +83,18 @@ public class AverageAccumulatorTest { @Test public void testMergeSuccess() { - AverageAccumulator average = new AverageAccumulator(); - AverageAccumulator averageNew = new AverageAccumulator(); - average.add(1); - averageNew.add(2); - average.merge(averageNew); - assertEquals(1.5, average.getLocalValue(), 0.0); + AverageAccumulator avg1 = new AverageAccumulator(); + for (int i = 0; i < 5; i++) { + avg1.add(i); + } + + AverageAccumulator avg2 = new AverageAccumulator(); + for (int i = 5; i < 10; i++) { + avg2.add(i); + } + + avg1.merge(avg2); + assertEquals(4.5, avg1.getLocalValue(), 0.0); } @Test
