Repository: flink
Updated Branches:
  refs/heads/release-1.1 210230c4a -> 9c87f92cb


[FLINK-4586] [core] Broken AverageAccumulator

This closes #2639


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c87f92c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c87f92c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c87f92c

Branch: refs/heads/release-1.1
Commit: 9c87f92cbb9989281024c5c300cd18b962ac4357
Parents: 210230c
Author: Greg Hogan <[email protected]>
Authored: Fri Oct 14 16:18:52 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Wed Oct 19 10:39:23 2016 -0400

----------------------------------------------------------------------
 .../common/accumulators/AverageAccumulator.java | 27 ++++++++++----------
 .../accumulators/AverageAccumulatorTest.java    | 18 ++++++++-----
 2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c87f92c/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
index 9c0f62f..67cf572 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
@@ -28,29 +28,30 @@ import org.apache.flink.annotation.Public;
 public class AverageAccumulator implements SimpleAccumulator<Double> {
 
        private static final long serialVersionUID = 3672555084179165255L;
-       
-       private double localValue;
+
        private long count;
 
+       private double sum;
+
        @Override
        public void add(Double value) {
                this.count++;
-               this.localValue += value;
+               this.sum += value;
        }
 
        public void add(double value) {
                this.count++;
-               this.localValue += value;
+               this.sum += value;
        }
 
        public void add(long value) {
                this.count++;
-               this.localValue += value;
+               this.sum += value;
        }
 
        public void add(int value) {
                this.count++;
-               this.localValue += value;
+               this.sum += value;
        }
 
        @Override
@@ -58,21 +59,21 @@ public class AverageAccumulator implements 
SimpleAccumulator<Double> {
                if (this.count == 0) {
                        return 0.0;
                }
-               return this.localValue / (double)this.count;
+               return this.sum / this.count;
        }
 
        @Override
        public void resetLocal() {
                this.count = 0;
-               this.localValue = 0;
+               this.sum = 0;
        }
 
        @Override
        public void merge(Accumulator<Double, Double> other) {
                if (other instanceof AverageAccumulator) {
-                       AverageAccumulator temp = (AverageAccumulator)other;
-                       this.count += temp.count;
-                       this.localValue += other.getLocalValue();
+                       AverageAccumulator avg = (AverageAccumulator)other;
+                       this.count += avg.count;
+                       this.sum += avg.sum;
                } else {
                        throw new IllegalArgumentException("The merged 
accumulator must be AverageAccumulator.");
                }
@@ -81,13 +82,13 @@ public class AverageAccumulator implements 
SimpleAccumulator<Double> {
        @Override
        public AverageAccumulator clone() {
                AverageAccumulator average = new AverageAccumulator();
-               average.localValue = this.localValue;
                average.count = this.count;
+               average.sum = this.sum;
                return average;
        }
 
        @Override
        public String toString() {
-               return "AverageAccumulator " + this.localValue + " count " + 
this.count;
+               return "AverageAccumulator " + this.getLocalValue() + " for " + 
this.count + " elements";
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c87f92c/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
index 9ebd27c..585511f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
@@ -83,12 +83,18 @@ public class AverageAccumulatorTest {
 
        @Test
        public void testMergeSuccess() {
-               AverageAccumulator average = new AverageAccumulator();
-               AverageAccumulator averageNew = new AverageAccumulator();
-               average.add(1);
-               averageNew.add(2);
-               average.merge(averageNew);
-               assertEquals(1.5, average.getLocalValue(), 0.0);
+               AverageAccumulator avg1 = new AverageAccumulator();
+               for (int i = 0; i < 5; i++) {
+                       avg1.add(i);
+               }
+
+               AverageAccumulator avg2 = new AverageAccumulator();
+               for (int i = 5; i < 10; i++) {
+                       avg2.add(i);
+               }
+
+               avg1.merge(avg2);
+               assertEquals(4.5, avg1.getLocalValue(), 0.0);
        }
 
        @Test

Reply via email to