This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 77b3f0f  GEODE-6913: Add Aggregate Functions Unit Tests (#3785)
77b3f0f is described below

commit 77b3f0f9b9bd9327a0b3a5be6dc25ae4217a11e4
Author: Juan José Ramos <[email protected]>
AuthorDate: Thu Jul 18 09:17:26 2019 -0300

    GEODE-6913: Add Aggregate Functions Unit Tests (#3785)
    
    - Fixed some minor warnings.
    - Added Unit tests for all aggregate functions.
---
 .../org/apache/geode/cache/query/Aggregator.java   |  12 +-
 .../internal/aggregate/AbstractAggregator.java     |   6 +-
 .../geode/cache/query/internal/aggregate/Avg.java  |  15 +-
 .../query/internal/aggregate/AvgBucketNode.java    |  13 +-
 .../query/internal/aggregate/AvgDistinct.java      |  12 --
 .../internal/aggregate/AvgDistinctPRQueryNode.java |   2 -
 .../query/internal/aggregate/AvgPRQueryNode.java   |   8 +-
 .../cache/query/internal/aggregate/Count.java      |  17 +-
 .../query/internal/aggregate/CountDistinct.java    |   6 +-
 .../aggregate/CountDistinctPRQueryNode.java        |  12 +-
 .../query/internal/aggregate/CountPRQueryNode.java |  19 +-
 .../internal/aggregate/DistinctAggregator.java     |  18 +-
 .../cache/query/internal/aggregate/MaxMin.java     |  21 +--
 .../geode/cache/query/internal/aggregate/Sum.java  |  15 +-
 .../query/internal/aggregate/SumDistinct.java      |   3 +-
 .../internal/aggregate/SumDistinctPRQueryNode.java |  10 +-
 .../internal/aggregate/AbstractAggregatorTest.java |  61 +++++++
 .../internal/aggregate/AggregatorJUnitTest.java    | 202 ---------------------
 .../internal/aggregate/AvgBucketNodeTest.java      |  74 ++++++++
 .../aggregate/AvgDistinctPRQueryNodeTest.java      |  55 ++++++
 .../query/internal/aggregate/AvgDistinctTest.java} |  36 ++--
 .../internal/aggregate/AvgPRQueryNodeTest.java     |  49 +++++
 .../cache/query/internal/aggregate/AvgTest.java    |  75 ++++++++
 .../aggregate/CountDistinctPRQueryNodeTest.java    |  55 ++++++
 .../internal/aggregate/CountDistinctTest.java}     |  36 ++--
 .../internal/aggregate/CountPRQueryNodeTest.java}  |  49 +++--
 .../cache/query/internal/aggregate/CountTest.java  |  64 +++++++
 .../aggregate/DistinctAggregatorTest.java}         |  49 ++---
 .../cache/query/internal/aggregate/MaxMinTest.java |  91 ++++++++++
 .../aggregate/SumDistinctPRQueryNodeTest.java      |  55 ++++++
 .../query/internal/aggregate/SumDistinctTest.java} |  34 ++--
 .../cache/query/internal/aggregate/SumTest.java    |  74 ++++++++
 32 files changed, 838 insertions(+), 410 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/Aggregator.java 
b/geode-core/src/main/java/org/apache/geode/cache/query/Aggregator.java
index 4fa55e0..8a9f9f4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/Aggregator.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/Aggregator.java
@@ -17,23 +17,21 @@ package org.apache.geode.cache.query;
 /**
  * Behavior of a user-defined aggregator. Aggregates values and returns a 
result. In addition to the
  * methods in the interface, implementing classes must have a 0-arg public 
constructor.
- *
- *
  */
 public interface Aggregator {
 
   /**
-   * Accumulate the next scalar value
-   *
+   * Initialize the Aggregator
    */
-  void accumulate(Object value);
+  void init();
 
   /**
-   * Initialize the Aggregator
+   * Accumulate the next scalar value
    */
-  void init();
+  void accumulate(Object value);
 
   /**
+   * Finish the aggregation and return the operation result.
    *
    * @return Return the result scalar value
    */
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AbstractAggregator.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AbstractAggregator.java
index c7a3c5f..18ee950 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AbstractAggregator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AbstractAggregator.java
@@ -17,14 +17,13 @@ package org.apache.geode.cache.query.internal.aggregate;
 import org.apache.geode.cache.query.Aggregator;
 
 /**
- * Abstract Aggregator class providing support for downcasting the result
- *
- *
+ * Abstract Aggregator class providing support for down casting the result.
  */
 public abstract class AbstractAggregator implements Aggregator {
 
   public static Number downCast(double value) {
     Number retVal;
+
     if (value % 1 == 0) {
       long longValue = (long) value;
       if (longValue <= Integer.MAX_VALUE && longValue >= Integer.MIN_VALUE) {
@@ -39,6 +38,7 @@ public abstract class AbstractAggregator implements 
Aggregator {
         retVal = Double.valueOf(value);
       }
     }
+
     return retVal;
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Avg.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Avg.java
index 9b325e8..2528a4b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Avg.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Avg.java
@@ -18,12 +18,17 @@ import org.apache.geode.cache.query.QueryService;
 
 /**
  * Computes the non distinct average for replicated region based queries
- *
- *
  */
 public class Avg extends Sum {
   private int num = 0;
 
+  int getNum() {
+    return num;
+  }
+
+  @Override
+  public void init() {}
+
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -33,15 +38,9 @@ public class Avg extends Sum {
   }
 
   @Override
-  public void init() {
-
-  }
-
-  @Override
   public Object terminate() {
     double sum = ((Number) super.terminate()).doubleValue();
     double result = sum / num;
     return downCast(result);
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgBucketNode.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgBucketNode.java
index f4cb923..7e41ba2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgBucketNode.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgBucketNode.java
@@ -17,15 +17,17 @@ package org.apache.geode.cache.query.internal.aggregate;
 import org.apache.geode.cache.query.QueryService;
 
 /**
- * The aggregator for compuing average which is used on the bucket node for 
partitioned region based
+ * The aggregator for computing average which is used on the bucket node for 
partitioned region
+ * based
  * queries.
- *
- *
  */
 public class AvgBucketNode extends Sum {
-
   private int count = 0;
 
+  int getCount() {
+    return count;
+  }
+
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -39,7 +41,6 @@ public class AvgBucketNode extends Sum {
    */
   @Override
   public Object terminate() {
-    return new Object[] {Integer.valueOf(count), super.terminate()};
+    return new Object[] {count, super.terminate()};
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinct.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinct.java
index a6d0d43..5374023 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinct.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinct.java
@@ -14,27 +14,15 @@
  */
 package org.apache.geode.cache.query.internal.aggregate;
 
-import org.apache.geode.cache.query.QueryService;
-
 /**
  * Computes the average of distinct values for replicated region based queries.
- *
- *
  */
 public class AvgDistinct extends SumDistinct {
 
   @Override
-  public void accumulate(Object value) {
-    if (value != null && value != QueryService.UNDEFINED) {
-      super.accumulate(value);
-    }
-  }
-
-  @Override
   public Object terminate() {
     double sum = ((Number) super.terminate()).doubleValue();
     double result = sum / this.distinct.size();
     return downCast(result);
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java
index cea3dd1..8aa277d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java
@@ -17,8 +17,6 @@ package org.apache.geode.cache.query.internal.aggregate;
 /**
  * Computes the final average of distinct values for the partitioned region 
based queries. This
  * aggregator is initialized on the PR query node & acts on the results 
obtained from bucket nodes.
- *
- *
  */
 public class AvgDistinctPRQueryNode extends SumDistinctPRQueryNode {
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgPRQueryNode.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgPRQueryNode.java
index 6a52818..38996b1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgPRQueryNode.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/AvgPRQueryNode.java
@@ -17,12 +17,14 @@ package org.apache.geode.cache.query.internal.aggregate;
 /**
  * Computes the final non distinct average for a partitioned region based 
query. This aggregator is
  * instantiated on the PR query node.
- *
- *
  */
 public class AvgPRQueryNode extends Sum {
   private int count = 0;
 
+  int getCount() {
+    return count;
+  }
+
   /**
    * Takes the input of data received from bucket nodes. The data is of the 
form of two element
    * array. The first element is the number of values, while the second 
element is the sum of the
@@ -31,7 +33,7 @@ public class AvgPRQueryNode extends Sum {
   @Override
   public void accumulate(Object value) {
     Object[] array = (Object[]) value;
-    this.count += ((Integer) array[0]).intValue();
+    this.count += ((Integer) array[0]);
     super.accumulate(array[1]);
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Count.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Count.java
index 38a2aba..051102d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Count.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Count.java
@@ -19,12 +19,17 @@ import org.apache.geode.cache.query.QueryService;
 
 /**
  * Computes the count of the non distinct rows for replicated & PR based 
queries.
- *
- *
  */
 public class Count implements Aggregator {
   private int count = 0;
 
+  int getCount() {
+    return count;
+  }
+
+  @Override
+  public void init() {}
+
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -33,13 +38,7 @@ public class Count implements Aggregator {
   }
 
   @Override
-  public void init() {
-
-  }
-
-  @Override
   public Object terminate() {
-    return Integer.valueOf(count);
+    return count;
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinct.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinct.java
index 792d98b..5fb7ca8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinct.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinct.java
@@ -15,16 +15,12 @@
 package org.apache.geode.cache.query.internal.aggregate;
 
 /**
- *
  * Computes the count of the distinct rows for replicated region based queries.
- *
  */
-
 public class CountDistinct extends DistinctAggregator {
 
   @Override
   public Object terminate() {
-    return Integer.valueOf(this.distinct.size());
+    return this.distinct.size();
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
index d4fe2e1..196dd08 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
@@ -16,10 +16,10 @@ package org.apache.geode.cache.query.internal.aggregate;
 
 import java.util.Set;
 
+import org.apache.geode.cache.query.QueryService;
+
 /**
  * Computes the count of the distinct rows on the PR query node.
- *
- *
  */
 public class CountDistinctPRQueryNode extends DistinctAggregator {
 
@@ -28,13 +28,13 @@ public class CountDistinctPRQueryNode extends 
DistinctAggregator {
    */
   @Override
   public void accumulate(Object value) {
-    this.distinct.addAll((Set) value);
-
+    if (value != null && value != QueryService.UNDEFINED) {
+      this.distinct.addAll((Set) value);
+    }
   }
 
   @Override
   public Object terminate() {
-    return Integer.valueOf(this.distinct.size());
+    return this.distinct.size();
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountPRQueryNode.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountPRQueryNode.java
index 73aa4f6..9f33c52 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountPRQueryNode.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountPRQueryNode.java
@@ -18,28 +18,27 @@ import org.apache.geode.cache.query.Aggregator;
 
 /**
  * Computes the count of the rows on the PR query node
- *
- *
  */
 public class CountPRQueryNode implements Aggregator {
   private int count = 0;
 
+  int getCount() {
+    return count;
+  }
+
+  @Override
+  public void init() {}
+
   /**
    * Receives the input of the individual counts from the bucket nodes.
    */
   @Override
   public void accumulate(Object value) {
-    this.count += ((Integer) value).intValue();
-  }
-
-  @Override
-  public void init() {
-
+    this.count += ((Integer) value);
   }
 
   @Override
   public Object terminate() {
-    return Integer.valueOf(count);
+    return count;
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
index f44768d..baa5f90 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
@@ -22,17 +22,22 @@ import org.apache.geode.cache.query.QueryService;
 /**
  * The class used to hold the distinct values. This will get instantiated on 
the bucket node as part
  * of distinct queries for sum, count, average.
- *
- *
  */
 public class DistinctAggregator extends AbstractAggregator {
   protected final Set<Object> distinct;
 
+  Set<Object> getDistinct() {
+    return distinct;
+  }
+
   public DistinctAggregator() {
-    this.distinct = new HashSet<Object>();
+    this.distinct = new HashSet<>();
   }
 
   @Override
+  public void init() {}
+
+  @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
       this.distinct.add(value);
@@ -40,14 +45,7 @@ public class DistinctAggregator extends AbstractAggregator {
   }
 
   @Override
-  public void init() {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
   public Object terminate() {
     return this.distinct;
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/MaxMin.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/MaxMin.java
index 863aff4..cecddb9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/MaxMin.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/MaxMin.java
@@ -19,47 +19,46 @@ import org.apache.geode.cache.query.QueryService;
 
 /**
  * Computes the Max or Min
- *
- *
  */
-
 public class MaxMin implements Aggregator {
   private final boolean findMax;
   private Comparable currentOptima;
 
+  Comparable getCurrentOptima() {
+    return currentOptima;
+  }
+
   public MaxMin(boolean findMax) {
     this.findMax = findMax;
   }
 
   @Override
+  public void init() {}
+
+  @Override
   public void accumulate(Object value) {
     if (value == null || value == QueryService.UNDEFINED) {
       return;
     }
+
     Comparable comparable = (Comparable) value;
 
     if (currentOptima == null) {
       currentOptima = comparable;
     } else {
+      @SuppressWarnings("unchecked")
       int compare = currentOptima.compareTo(comparable);
+
       if (findMax) {
         currentOptima = compare < 0 ? comparable : currentOptima;
       } else {
         currentOptima = compare > 0 ? comparable : currentOptima;
       }
     }
-
-  }
-
-  @Override
-  public void init() {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public Object terminate() {
     return currentOptima;
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Sum.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Sum.java
index 4a0dc62..4754b8d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Sum.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/Sum.java
@@ -18,13 +18,17 @@ import org.apache.geode.cache.query.QueryService;
 
 /**
  * Computes the sum for replicated & PR based queries.
- *
- *
  */
 public class Sum extends AbstractAggregator {
-
   private double result = 0;
 
+  double getResult() {
+    return result;
+  }
+
+  @Override
+  public void init() {}
+
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -34,11 +38,6 @@ public class Sum extends AbstractAggregator {
   }
 
   @Override
-  public void init() {
-
-  }
-
-  @Override
   public Object terminate() {
     return downCast(result);
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinct.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinct.java
index 4732892..e5c1479 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinct.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinct.java
@@ -16,7 +16,6 @@ package org.apache.geode.cache.query.internal.aggregate;
 
 /**
  * Computes the sum of distinct values for replicated region based queries.
- *
  */
 public class SumDistinct extends DistinctAggregator {
 
@@ -26,7 +25,7 @@ public class SumDistinct extends DistinctAggregator {
     for (Object o : this.distinct) {
       sum += ((Number) o).doubleValue();
     }
+
     return downCast(sum);
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctPRQueryNode.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctPRQueryNode.java
index 05ffcaa..b2c4e7e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctPRQueryNode.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctPRQueryNode.java
@@ -16,19 +16,21 @@ package org.apache.geode.cache.query.internal.aggregate;
 
 import java.util.Set;
 
+import org.apache.geode.cache.query.QueryService;
+
 /**
  * Computes the sum of distinct values on the PR query node.
- *
- *
  */
 public class SumDistinctPRQueryNode extends DistinctAggregator {
 
   /**
-   * The input data is the Set of values(distinct) receieved from each of the 
bucket nodes.
+   * The input data is the Set of values(distinct) received from each of the 
bucket nodes.
    */
   @Override
   public void accumulate(Object value) {
-    this.distinct.addAll((Set) value);
+    if (value != null && value != QueryService.UNDEFINED) {
+      this.distinct.addAll((Set) value);
+    }
   }
 
   @Override
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AbstractAggregatorTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AbstractAggregatorTest.java
new file mode 100644
index 0000000..a54344d
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AbstractAggregatorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Random;
+
+import org.junit.Test;
+
+public class AbstractAggregatorTest {
+  private final Random random = new Random();
+
+  @Test
+  public void 
downcastShouldReturnIntegerWhenValueIsAnIntegerAndFitsWithinTheIntegerRange() {
+    int randomInteger = random.nextInt();
+    Number integerResult = AbstractAggregator.downCast(randomInteger);
+
+    assertThat(integerResult).isInstanceOf(Number.class);
+    assertThat(integerResult).isInstanceOf(Integer.class);
+  }
+
+  @Test
+  public void 
downcastShouldReturnLongWhenValueIsAnIntegerButDoesNotFitsWithinTheIntegerRange()
 {
+    long randomLong = random.nextLong();
+    Number longResult = AbstractAggregator.downCast(randomLong);
+
+    assertThat(longResult).isInstanceOf(Number.class);
+    assertThat(longResult).isInstanceOf(Long.class);
+  }
+
+  @Test
+  public void 
downcastShouldReturnFloatWhenValueIsNotAnIntegerAndFitsWithinTheFloatRange() {
+    float randomFloat = random.nextFloat();
+    Number floatResult = AbstractAggregator.downCast(randomFloat);
+
+    assertThat(floatResult).isInstanceOf(Number.class);
+    assertThat(floatResult).isInstanceOf(Float.class);
+  }
+
+  @Test
+  public void 
downcastShouldReturnDoubleWhenValueIsNotAnIntegerAndDoesNotFitsWithinTheFloatRange()
 {
+    double randomDouble = Float.MIN_VALUE - random.nextDouble();
+    Number doubleResult = AbstractAggregator.downCast(randomDouble);
+
+    assertThat(doubleResult).isInstanceOf(Number.class);
+    assertThat(doubleResult).isInstanceOf(Double.class);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AggregatorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AggregatorJUnitTest.java
deleted file mode 100644
index 66526ab..0000000
--- 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AggregatorJUnitTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.cache.query.internal.aggregate;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Test;
-
-
-public class AggregatorJUnitTest {
-
-  @Test
-  public void testCount() throws Exception {
-    Count count = new Count();
-    count.accumulate(new Integer(5));
-    count.accumulate(new Integer(6));
-    count.accumulate(null);
-    assertEquals(2, ((Number) count.terminate()).intValue());
-
-    CountPRQueryNode countPrQ = new CountPRQueryNode();
-    countPrQ.accumulate(new Integer(5));
-    countPrQ.accumulate(new Integer(6));
-    assertEquals(11, ((Number) countPrQ.terminate()).intValue());
-  }
-
-  @Test
-  public void testCountDistinct() throws Exception {
-    CountDistinct count = new CountDistinct();
-    count.accumulate(new Integer(5));
-    count.accumulate(new Integer(6));
-    count.accumulate(new Integer(5));
-    count.accumulate(new Integer(6));
-    count.accumulate(null);
-    count.accumulate(null);
-    assertEquals(2, ((Number) count.terminate()).intValue());
-
-    CountDistinctPRQueryNode cdpr = new CountDistinctPRQueryNode();
-
-    Set<Integer> set1 = new HashSet<Integer>();
-    set1.add(1);
-    set1.add(2);
-    set1.add(3);
-
-    Set<Integer> set2 = new HashSet<Integer>();
-    set2.add(3);
-    set2.add(4);
-    set2.add(5);
-
-    cdpr.accumulate(set1);
-    cdpr.accumulate(set2);
-    assertEquals(5, ((Number) cdpr.terminate()).intValue());
-  }
-
-  @Test
-  public void testSum() throws Exception {
-    Sum sum = new Sum();
-    sum.accumulate(new Integer(5));
-    sum.accumulate(new Integer(6));
-    sum.accumulate(null);
-    assertEquals(11, ((Number) sum.terminate()).intValue());
-  }
-
-  @Test
-  public void testSumDistinct() throws Exception {
-    SumDistinct sum = new SumDistinct();
-    sum.accumulate(new Integer(5));
-    sum.accumulate(new Integer(6));
-    sum.accumulate(null);
-    sum.accumulate(new Integer(5));
-    sum.accumulate(new Integer(6));
-    assertEquals(11, ((Number) sum.terminate()).intValue());
-
-    SumDistinctPRQueryNode sdpr = new SumDistinctPRQueryNode();
-
-    Set<Integer> set1 = new HashSet<Integer>();
-    set1.add(5);
-    set1.add(6);
-    set1.add(3);
-
-    Set<Integer> set2 = new HashSet<Integer>();
-    set2.add(3);
-    set2.add(7);
-    set2.add(8);
-
-    sdpr.accumulate(set1);
-    sdpr.accumulate(set2);
-    assertEquals(29, ((Number) sdpr.terminate()).intValue());
-  }
-
-  @Test
-  public void testAvg() throws Exception {
-    Avg avg = new Avg();
-    avg.accumulate(new Integer(1));
-    avg.accumulate(new Integer(2));
-    avg.accumulate(new Integer(3));
-    avg.accumulate(new Integer(4));
-    avg.accumulate(new Integer(5));
-    avg.accumulate(new Integer(6));
-    avg.accumulate(new Integer(7));
-    avg.accumulate(new Integer(7));
-    avg.accumulate(null);
-    avg.accumulate(null);
-    float expected = (1 + 2 + 3 + 4 + 5 + 6 + 7 + 7) / 8.0f;
-    assertEquals(expected, ((Number) avg.terminate()).floatValue(), 0);
-
-    AvgBucketNode abn = new AvgBucketNode();
-    abn.accumulate(new Integer(1));
-    abn.accumulate(new Integer(2));
-    abn.accumulate(new Integer(3));
-    abn.accumulate(new Integer(4));
-    abn.accumulate(new Integer(5));
-    abn.accumulate(new Integer(6));
-    abn.accumulate(new Integer(7));
-    abn.accumulate(new Integer(7));
-    abn.accumulate(null);
-    abn.accumulate(null);
-    Object[] arr = (Object[]) abn.terminate();
-    assertEquals(8, ((Integer) arr[0]).intValue());
-    assertEquals(35, ((Number) arr[1]).intValue());
-
-    AvgPRQueryNode apqn = new AvgPRQueryNode();
-    Object[] val1 = new Object[] {new Integer(7), new Double(43)};
-    Object[] val2 = new Object[] {new Integer(5), new Double(273.86)};
-    apqn.accumulate(val1);
-    apqn.accumulate(val2);
-    expected = (43 + 273.86f) / 12.0f;
-    assertEquals(expected, ((Number) apqn.terminate()).floatValue(), 0);
-  }
-
-  @Test
-  public void testAvgDistinct() throws Exception {
-    AvgDistinct avg = new AvgDistinct();
-    avg.accumulate(new Integer(1));
-    avg.accumulate(new Integer(2));
-    avg.accumulate(new Integer(2));
-    avg.accumulate(new Integer(3));
-    avg.accumulate(new Integer(3));
-    avg.accumulate(new Integer(4));
-    avg.accumulate(new Integer(5));
-    avg.accumulate(new Integer(6));
-    avg.accumulate(new Integer(7));
-    avg.accumulate(new Integer(7));
-    avg.accumulate(new Integer(6));
-    avg.accumulate(null);
-    avg.accumulate(null);
-    float expected = (1 + 2 + 3 + 4 + 5 + 6 + 7) / 7.0f;
-    assertEquals(expected, ((Number) avg.terminate()).floatValue(), 0);
-
-    AvgDistinctPRQueryNode adpqn = new AvgDistinctPRQueryNode();
-
-    Set<Integer> set1 = new HashSet<Integer>();
-    set1.add(5);
-    set1.add(6);
-    set1.add(3);
-    set1.add(4);
-
-    Set<Integer> set2 = new HashSet<Integer>();
-    set2.add(3);
-    set2.add(7);
-    set2.add(8);
-    set2.add(4);
-
-    adpqn.accumulate(set1);
-    adpqn.accumulate(set2);
-
-    expected = (3 + 4 + 5 + 6 + 7 + 8) / 6.0f;
-    assertEquals(expected, ((Number) adpqn.terminate()).floatValue(), 0);
-  }
-
-  @Test
-  public void testMaxMin() throws Exception {
-    MaxMin max = new MaxMin(true);
-    max.accumulate(new Integer(1));
-    assertEquals(1, ((Integer) max.terminate()).intValue());
-    max.accumulate(new Integer(2));
-    max.accumulate(null);
-    assertEquals(2, ((Integer) max.terminate()).intValue());
-
-    MaxMin min = new MaxMin(false);
-    min.accumulate(new Integer(1));
-    assertEquals(1, ((Integer) min.terminate()).intValue());
-    min.accumulate(new Integer(2));
-    min.accumulate(null);
-    assertEquals(1, ((Integer) min.terminate()).intValue());
-  }
-
-}
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgBucketNodeTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgBucketNodeTest.java
new file mode 100644
index 0000000..26fe0a0
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgBucketNodeTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.query.QueryService;
+
+public class AvgBucketNodeTest {
+  private AvgBucketNode avgBucketNode;
+
+  @Before
+  public void setUp() {
+    avgBucketNode = new AvgBucketNode();
+  }
+
+  @Test
+  public void accumulateShouldIgnoreNull() {
+    avgBucketNode.accumulate(null);
+
+    assertThat(avgBucketNode.getCount()).isEqualTo(0);
+    assertThat(avgBucketNode.getResult()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldIgnoreUndefined() {
+    avgBucketNode.accumulate(QueryService.UNDEFINED);
+
+    assertThat(avgBucketNode.getCount()).isEqualTo(0);
+    assertThat(avgBucketNode.getResult()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldIncreaseAccumulatedCount() {
+    avgBucketNode.accumulate(1);
+    avgBucketNode.accumulate(25.78f);
+
+    assertThat(avgBucketNode.getCount()).isEqualTo(2);
+    assertThat(avgBucketNode.getResult()).isEqualTo(26.78f);
+  }
+
+  @Test
+  public void terminateShouldCorrectlyComputeAverageUponAccumulatedValues() {
+    avgBucketNode.accumulate(1);
+    avgBucketNode.accumulate(2);
+    avgBucketNode.accumulate(3);
+    avgBucketNode.accumulate(4);
+    avgBucketNode.accumulate(5);
+    avgBucketNode.accumulate(6);
+    avgBucketNode.accumulate(7);
+    avgBucketNode.accumulate(null);
+    avgBucketNode.accumulate(QueryService.UNDEFINED);
+
+    Object result = avgBucketNode.terminate();
+    assertThat(result).isInstanceOf(Object[].class);
+    assertThat(((Integer) ((Object[]) result)[0]).intValue()).isEqualTo(7);
+    assertThat(((Number) ((Object[]) result)[1]).intValue()).isEqualTo(28);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctPRQueryNodeTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctPRQueryNodeTest.java
new file mode 100644
index 0000000..1fd1f87
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctPRQueryNodeTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class AvgDistinctPRQueryNodeTest extends DistinctAggregatorTest {
+
+  @Before
+  public void setUp() {
+    distinctAggregator = new AvgDistinctPRQueryNode();
+  }
+
+  @Test
+  public void accumulateShouldComputeIntermediateAdditions() {
+    distinctAggregator.accumulate(new HashSet<>());
+    assertThat(distinctAggregator.getDistinct()).isEmpty();
+
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(1, 80.00d)));
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(2);
+
+    distinctAggregator.accumulate(new 
HashSet<>(Collections.singletonList(80.00d)));
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(2);
+  }
+
+  @Test
+  public void 
terminateShouldCorrectlyComputeAverageUponDistinctAccumulatedValues() {
+    distinctAggregator.accumulate(new HashSet<>());
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(5, 6, 3, 4)));
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(3, 7, 8, 4)));
+
+    Object result = distinctAggregator.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).floatValue()).isEqualTo((3 + 4 + 5 + 6 + 7 + 
8) / 6.0f);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctTest.java
similarity index 53%
copy from 
geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
copy to 
geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctTest.java
index d4fe2e1..681ad00 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgDistinctTest.java
@@ -14,27 +14,29 @@
  */
 package org.apache.geode.cache.query.internal.aggregate;
 
-import java.util.Set;
+import static org.assertj.core.api.Assertions.assertThat;
 
-/**
- * Computes the count of the distinct rows on the PR query node.
- *
- *
- */
-public class CountDistinctPRQueryNode extends DistinctAggregator {
+import org.junit.Before;
+import org.junit.Test;
 
-  /**
-   * The input data is the Set containing distinct values from each of the 
bucket nodes.
-   */
-  @Override
-  public void accumulate(Object value) {
-    this.distinct.addAll((Set) value);
+public class AvgDistinctTest extends DistinctAggregatorTest {
 
+  @Before
+  public void setUp() {
+    distinctAggregator = new AvgDistinct();
   }
 
-  @Override
-  public Object terminate() {
-    return Integer.valueOf(this.distinct.size());
-  }
+  @Test
+  public void 
terminateShouldCorrectlyComputeAverageUponDistinctAccumulatedValues() {
+    distinctAggregator.accumulate(5);
+    distinctAggregator.accumulate(5);
+    distinctAggregator.accumulate(10);
+    distinctAggregator.accumulate(10);
+    distinctAggregator.accumulate(15);
+    distinctAggregator.accumulate(15);
 
+    Object result = distinctAggregator.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).floatValue()).isEqualTo(10);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgPRQueryNodeTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgPRQueryNodeTest.java
new file mode 100644
index 0000000..dbcf4d5
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgPRQueryNodeTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class AvgPRQueryNodeTest {
+  private AvgPRQueryNode avgPRQueryNode;
+
+  @Before
+  public void setUp() {
+    avgPRQueryNode = new AvgPRQueryNode();
+  }
+
+  @Test
+  public void accumulateShouldIncreaseAccumulatedCount() {
+    avgPRQueryNode.accumulate(new Integer[] {2, 10});
+    avgPRQueryNode.accumulate(new Integer[] {3, 30});
+
+    assertThat(avgPRQueryNode.getCount()).isEqualTo(5);
+    assertThat(avgPRQueryNode.getResult()).isEqualTo(40);
+  }
+
+  @Test
+  public void terminateShouldCorrectlyComputeAverageUponAccumulatedValues() {
+    avgPRQueryNode.accumulate(new Object[] {7, 43d});
+    avgPRQueryNode.accumulate(new Object[] {5, 273.86d});
+
+    Object result = avgPRQueryNode.terminate();
+    assertThat(avgPRQueryNode.getCount()).isEqualTo(12);
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).floatValue()).isEqualTo((43 + 273.86f) / 
12.0f);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgTest.java
new file mode 100644
index 0000000..4c7a41f
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/AvgTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.query.QueryService;
+
+public class AvgTest {
+  private Avg avg;
+
+  @Before
+  public void setUp() {
+    avg = new Avg();
+  }
+
+  @Test
+  public void accumulateShouldIgnoreNull() {
+    avg.accumulate(null);
+
+    assertThat(avg.getNum()).isEqualTo(0);
+    assertThat(avg.getResult()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldIgnoreUndefined() {
+    avg.accumulate(QueryService.UNDEFINED);
+
+    assertThat(avg.getNum()).isEqualTo(0);
+    assertThat(avg.getResult()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldIncreaseAccumulatedCount() {
+    avg.accumulate(1);
+    avg.accumulate(25.78f);
+
+    assertThat(avg.getNum()).isEqualTo(2);
+    assertThat(avg.getResult()).isEqualTo(26.78f);
+  }
+
+  @Test
+  public void terminateShouldCorrectlyComputeAverageUponAccumulatedValues() {
+    avg.accumulate(1);
+    avg.accumulate(2);
+    avg.accumulate(3);
+    avg.accumulate(4);
+    avg.accumulate(5);
+    avg.accumulate(6);
+    avg.accumulate(7);
+    avg.accumulate(null);
+    avg.accumulate(QueryService.UNDEFINED);
+    float expected = (1 + 2 + 3 + 4 + 5 + 6 + 7) / 7.0f;
+
+    Object result = avg.terminate();
+    assertThat(avg.getNum()).isEqualTo(7);
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).floatValue()).isEqualTo(expected);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNodeTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNodeTest.java
new file mode 100644
index 0000000..a6cc097
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNodeTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class CountDistinctPRQueryNodeTest extends DistinctAggregatorTest {
+
+  @Before
+  public void setUp() {
+    distinctAggregator = new CountDistinctPRQueryNode();
+  }
+
+  @Test
+  public void accumulateShouldComputeIntermediateAdditions() {
+    distinctAggregator.accumulate(new HashSet<>());
+    assertThat(distinctAggregator.getDistinct()).isEmpty();
+
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(1, 80.00d)));
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(2);
+
+    distinctAggregator.accumulate(new 
HashSet<>(Collections.singletonList(80.00d)));
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(2);
+  }
+
+  @Test
+  public void terminateShouldCorrectlyComputeDistinctAccumulatedValuesCount() {
+    distinctAggregator.accumulate(new HashSet<>());
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(5, 6, 3, 4)));
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(3, 7, 8, 4)));
+
+    Object result = distinctAggregator.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).intValue()).isEqualTo(6);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctTest.java
similarity index 54%
copy from 
geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
copy to 
geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctTest.java
index d4fe2e1..cfc1838 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctTest.java
@@ -14,27 +14,29 @@
  */
 package org.apache.geode.cache.query.internal.aggregate;
 
-import java.util.Set;
+import static org.assertj.core.api.Assertions.assertThat;
 
-/**
- * Computes the count of the distinct rows on the PR query node.
- *
- *
- */
-public class CountDistinctPRQueryNode extends DistinctAggregator {
+import org.junit.Before;
+import org.junit.Test;
 
-  /**
-   * The input data is the Set containing distinct values from each of the 
bucket nodes.
-   */
-  @Override
-  public void accumulate(Object value) {
-    this.distinct.addAll((Set) value);
+public class CountDistinctTest extends DistinctAggregatorTest {
 
+  @Before
+  public void setUp() {
+    distinctAggregator = new CountDistinct();
   }
 
-  @Override
-  public Object terminate() {
-    return Integer.valueOf(this.distinct.size());
-  }
+  @Test
+  public void terminateShouldCorrectlyComputeDistinctAccumulatedValuesCount() {
+    distinctAggregator.accumulate(5);
+    distinctAggregator.accumulate(5);
+    distinctAggregator.accumulate(10);
+    distinctAggregator.accumulate(10);
+    distinctAggregator.accumulate(15);
+    distinctAggregator.accumulate(15);
 
+    Object result = distinctAggregator.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).intValue()).isEqualTo(3);
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountPRQueryNodeTest.java
similarity index 51%
copy from 
geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
copy to 
geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountPRQueryNodeTest.java
index f44768d..f74530c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountPRQueryNodeTest.java
@@ -14,40 +14,35 @@
  */
 package org.apache.geode.cache.query.internal.aggregate;
 
-import java.util.HashSet;
-import java.util.Set;
+import static org.assertj.core.api.Assertions.assertThat;
 
-import org.apache.geode.cache.query.QueryService;
+import org.junit.Before;
+import org.junit.Test;
 
-/**
- * The class used to hold the distinct values. This will get instantiated on 
the bucket node as part
- * of distinct queries for sum, count, average.
- *
- *
- */
-public class DistinctAggregator extends AbstractAggregator {
-  protected final Set<Object> distinct;
+public class CountPRQueryNodeTest {
+  private CountPRQueryNode countPRQueryNode;
 
-  public DistinctAggregator() {
-    this.distinct = new HashSet<Object>();
+  @Before
+  public void setUp() {
+    countPRQueryNode = new CountPRQueryNode();
   }
 
-  @Override
-  public void accumulate(Object value) {
-    if (value != null && value != QueryService.UNDEFINED) {
-      this.distinct.add(value);
-    }
-  }
-
-  @Override
-  public void init() {
-    // TODO Auto-generated method stub
+  @Test
+  public void accumulateShouldComputeIntermediateAdditions() {
+    countPRQueryNode.accumulate(2);
+    countPRQueryNode.accumulate(3);
 
+    assertThat(countPRQueryNode.getCount()).isEqualTo(5);
   }
 
-  @Override
-  public Object terminate() {
-    return this.distinct;
-  }
+  @Test
+  public void terminateShouldCorrectlyComputeAggregatedValuesCount() {
+    countPRQueryNode.accumulate(50);
+    countPRQueryNode.accumulate(50);
+    countPRQueryNode.accumulate(23);
 
+    Object result = countPRQueryNode.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).intValue()).isEqualTo(123);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountTest.java
new file mode 100644
index 0000000..f5993f9
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/CountTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.query.QueryService;
+
+public class CountTest {
+  private Count count;
+
+  @Before
+  public void setUp() {
+    count = new Count();
+  }
+
+  @Test
+  public void accumulateShouldIgnoreNull() {
+    count.accumulate(null);
+
+    assertThat(count.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldIgnoreUndefined() {
+    count.accumulate(QueryService.UNDEFINED);
+
+    assertThat(count.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldIncreaseAccumulatedCount() {
+    count.accumulate(1);
+    count.accumulate(25.78f);
+
+    assertThat(count.getCount()).isEqualTo(2);
+  }
+
+  @Test
+  public void terminateShouldCorrectlyComputeAverageUponAccumulatedValues() {
+    for (int i = 0; i < 10; i++) {
+      count.accumulate(i);
+    }
+
+    Object result = count.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).intValue()).isEqualTo(10);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregatorTest.java
similarity index 50%
copy from 
geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
copy to 
geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregatorTest.java
index f44768d..4639caa 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregator.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/DistinctAggregatorTest.java
@@ -14,40 +14,41 @@
  */
 package org.apache.geode.cache.query.internal.aggregate;
 
-import java.util.HashSet;
-import java.util.Set;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
 
 import org.apache.geode.cache.query.QueryService;
 
-/**
- * The class used to hold the distinct values. This will get instantiated on 
the bucket node as part
- * of distinct queries for sum, count, average.
- *
- *
- */
-public class DistinctAggregator extends AbstractAggregator {
-  protected final Set<Object> distinct;
+public class DistinctAggregatorTest {
+  DistinctAggregator distinctAggregator;
 
-  public DistinctAggregator() {
-    this.distinct = new HashSet<Object>();
+  @Before
+  public void setUp() {
+    distinctAggregator = new DistinctAggregator();
   }
 
-  @Override
-  public void accumulate(Object value) {
-    if (value != null && value != QueryService.UNDEFINED) {
-      this.distinct.add(value);
-    }
+  @Test
+  public void accumulateShouldIgnoreNull() {
+    distinctAggregator.accumulate(null);
+
+    assertThat(distinctAggregator.getDistinct()).isEmpty();
   }
 
-  @Override
-  public void init() {
-    // TODO Auto-generated method stub
+  @Test
+  public void accumulateShouldIgnoreUndefined() {
+    distinctAggregator.accumulate(QueryService.UNDEFINED);
 
+    assertThat(distinctAggregator.getDistinct()).isEmpty();
   }
 
-  @Override
-  public Object terminate() {
-    return this.distinct;
-  }
+  @Test
+  public void accumulateShouldComputeIntermediateAdditions() {
+    distinctAggregator.accumulate(20);
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(1);
 
+    distinctAggregator.accumulate(20);
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(1);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/MaxMinTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/MaxMinTest.java
new file mode 100644
index 0000000..3612582
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/MaxMinTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.AssertionsForClassTypes;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.query.QueryService;
+
+public class MaxMinTest {
+  private MaxMin min;
+  private MaxMin max;
+
+  @Before
+  public void setUp() {
+    max = new MaxMin(true);
+    min = new MaxMin(false);
+  }
+
+  @Test
+  public void accumulateShouldIgnoreNull() {
+    max.accumulate(null);
+    min.accumulate(null);
+
+    AssertionsForClassTypes.assertThat(max.getCurrentOptima()).isNull();
+    AssertionsForClassTypes.assertThat(min.getCurrentOptima()).isNull();
+  }
+
+  @Test
+  public void accumulateShouldIgnoreUndefined() {
+    max.accumulate(QueryService.UNDEFINED);
+    min.accumulate(QueryService.UNDEFINED);
+
+    AssertionsForClassTypes.assertThat(max.getCurrentOptima()).isNull();
+    AssertionsForClassTypes.assertThat(min.getCurrentOptima()).isNull();
+  }
+
+  @Test
+  public void accumulateShouldUpdateOptimaOnIntermediateAdditions() {
+    AssertionsForClassTypes.assertThat(max.getCurrentOptima()).isNull();
+    max.accumulate(10);
+    AssertionsForClassTypes.assertThat(max.getCurrentOptima()).isEqualTo(10);
+    max.accumulate(5);
+    AssertionsForClassTypes.assertThat(max.getCurrentOptima()).isEqualTo(10);
+    max.accumulate(15);
+    AssertionsForClassTypes.assertThat(max.getCurrentOptima()).isEqualTo(15);
+
+    AssertionsForClassTypes.assertThat(min.getCurrentOptima()).isNull();
+    min.accumulate(10);
+    AssertionsForClassTypes.assertThat(min.getCurrentOptima()).isEqualTo(10);
+    min.accumulate(5);
+    AssertionsForClassTypes.assertThat(min.getCurrentOptima()).isEqualTo(5);
+    min.accumulate(6);
+    AssertionsForClassTypes.assertThat(min.getCurrentOptima()).isEqualTo(5);
+  }
+
+  @Test
+  public void terminateShouldReturnOptimaFound() {
+    List<String> comparableStrings = Arrays.asList("B", "D", "Z", "E", "A");
+    comparableStrings.forEach(string -> {
+      max.accumulate(string);
+      min.accumulate(string);
+    });
+
+    Object maxResult = max.terminate();
+    assertThat(maxResult).isInstanceOf(String.class);
+    assertThat(maxResult).isEqualTo("Z");
+
+    Object minResult = min.terminate();
+    assertThat(minResult).isInstanceOf(String.class);
+    assertThat(minResult).isEqualTo("A");
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctPRQueryNodeTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctPRQueryNodeTest.java
new file mode 100644
index 0000000..2190c89
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctPRQueryNodeTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class SumDistinctPRQueryNodeTest extends DistinctAggregatorTest {
+
+  @Before
+  public void setUp() {
+    distinctAggregator = new SumDistinctPRQueryNode();
+  }
+
+  @Test
+  public void accumulateShouldComputeIntermediateAdditions() {
+    distinctAggregator.accumulate(new HashSet<>());
+    assertThat(distinctAggregator.getDistinct()).isEmpty();
+
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(1, 10.12f)));
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(2);
+
+    distinctAggregator.accumulate(new 
HashSet<>(Collections.singletonList(10.12f)));
+    assertThat(distinctAggregator.getDistinct()).isNotEmpty().hasSize(2);
+  }
+
+  @Test
+  public void terminateShouldCorrectlyComputeDistinctValuesAddition() {
+    distinctAggregator.accumulate(new HashSet<>());
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(5, 6, 3)));
+    distinctAggregator.accumulate(new HashSet<>(Arrays.asList(3, 7, 8)));
+
+    Object result = distinctAggregator.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).intValue()).isEqualTo(29);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctTest.java
similarity index 57%
copy from 
geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
copy to 
geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctTest.java
index d4fe2e1..3235f62 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumDistinctTest.java
@@ -14,27 +14,27 @@
  */
 package org.apache.geode.cache.query.internal.aggregate;
 
-import java.util.Set;
+import static org.assertj.core.api.Assertions.assertThat;
 
-/**
- * Computes the count of the distinct rows on the PR query node.
- *
- *
- */
-public class CountDistinctPRQueryNode extends DistinctAggregator {
+import org.junit.Before;
+import org.junit.Test;
 
-  /**
-   * The input data is the Set containing distinct values from each of the 
bucket nodes.
-   */
-  @Override
-  public void accumulate(Object value) {
-    this.distinct.addAll((Set) value);
+public class SumDistinctTest extends DistinctAggregatorTest {
 
+  @Before
+  public void setUp() {
+    distinctAggregator = new SumDistinct();
   }
 
-  @Override
-  public Object terminate() {
-    return Integer.valueOf(this.distinct.size());
-  }
+  @Test
+  public void terminateShouldCorrectlyComputeDistinctValuesAddition() {
+    distinctAggregator.accumulate(5);
+    distinctAggregator.accumulate(10);
+    distinctAggregator.accumulate(10);
+    distinctAggregator.accumulate(15);
 
+    Object result = distinctAggregator.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).intValue()).isEqualTo(30);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumTest.java
new file mode 100644
index 0000000..da3ee28
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/aggregate/SumTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.aggregate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.query.QueryService;
+
+public class SumTest {
+  private Sum sum;
+
+  @Before
+  public void setUp() {
+    sum = new Sum();
+  }
+
+  @Test
+  public void accumulateShouldIgnoreNull() {
+    sum.accumulate(null);
+
+    assertThat(sum.getResult()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldIgnoreUndefined() {
+    sum.accumulate(QueryService.UNDEFINED);
+
+    assertThat(sum.getResult()).isEqualTo(0);
+  }
+
+  @Test
+  public void accumulateShouldComputeIntermediateAdditions() {
+    sum.accumulate(1);
+    assertThat(sum.getResult()).isEqualTo(1);
+
+    sum.accumulate(25.78);
+    assertThat(sum.getResult()).isEqualTo(26.78);
+
+    sum.accumulate(1.22);
+    assertThat(sum.getResult()).isEqualTo(28.00);
+  }
+
+  @Test
+  public void terminateShouldCorrectlyComputeValuesAddition() {
+    sum.accumulate(1);
+    sum.accumulate(2);
+    sum.accumulate(3);
+    sum.accumulate(4);
+    sum.accumulate(5);
+    sum.accumulate(6);
+    sum.accumulate(7);
+    sum.accumulate(null);
+    sum.accumulate(QueryService.UNDEFINED);
+
+    Object result = sum.terminate();
+    assertThat(result).isInstanceOf(Number.class);
+    assertThat(((Number) result).intValue()).isEqualTo(28);
+  }
+}

Reply via email to