Repository: phoenix
Updated Branches:
  refs/heads/master d97eb4967 -> ae14e38cc


PHOENIX-2818 Optimize ORDERED group by


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d414505d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d414505d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d414505d

Branch: refs/heads/master
Commit: d414505df8afeba553734db688547c6a4e9e90d9
Parents: d97eb49
Author: James Taylor <[email protected]>
Authored: Thu May 12 14:29:29 2016 -0700
Committer: James Taylor <[email protected]>
Committed: Thu May 12 14:29:29 2016 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/GroupByCaseIT.java   |  92 +++++++++
 .../apache/phoenix/execute/AggregatePlan.java   |   3 +-
 .../iterate/AggregatingResultIterator.java      |   4 +-
 .../BaseGroupedAggregatingResultIterator.java   |   3 +-
 .../DistinctAggregatingResultIterator.java      |  15 +-
 .../FilterAggregatingResultIterator.java        |   8 +-
 .../OrderedAggregatingResultIterator.java       |   5 +-
 .../RowKeyOrderedAggregateResultIterator.java   | 190 +++++++++++++++++++
 .../iterate/AggregateResultScannerTest.java     | 109 +----------
 .../iterate/ConcatResultIteratorTest.java       |  31 +--
 .../iterate/MaterializedResultIterators.java    |  66 +++++++
 ...owKeyOrderedAggregateResultIteratorTest.java | 149 +++++++++++++++
 .../java/org/apache/phoenix/util/TestUtil.java  |  70 +++++++
 13 files changed, 596 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
index 44f43b7..b0524da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
@@ -32,6 +32,9 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.junit.Test;
@@ -343,4 +346,93 @@ public class GroupByCaseIT extends BaseHBaseManagedTimeIT {
                 "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", 
QueryUtil.getExplainPlan(rs));
     }
     
+    @Test
+    public void testSumGroupByOrderPreservingDesc() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("CREATE TABLE 
GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary 
key (k1,k2)) split on (?,?,?)");
+        stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), 
PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), 
PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), 
PInteger.INSTANCE.toBytes(3)));
+        stmt.execute();
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
4)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 
5)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
4)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
4)");
+        conn.commit();
+        String query = "SELECT k1,sum(k2) FROM GROUP_BY_DESC GROUP BY k1 ORDER 
BY k1 DESC";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals("n", rs.getString(1));
+        assertEquals(10, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("j", rs.getString(1));
+        assertEquals(10, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("b", rs.getString(1));
+        assertEquals(5, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals(10, rs.getInt(2));
+        assertFalse(rs.next());
+        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        assertEquals(
+                "CLIENT PARALLEL 1-WAY REVERSE FULL SCAN OVER GROUP_BY_DESC\n" 
+ 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", 
QueryUtil.getExplainPlan(rs));
+    }
+
+    @Test
+    public void testAvgGroupByOrderPreserving() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("CREATE TABLE 
GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary 
key (k1,k2)) split on (?,?,?)");
+        stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), 
PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), 
PInteger.INSTANCE.toBytes(3)));
+        stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), 
PInteger.INSTANCE.toBytes(3)));
+        stmt.execute();
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 
6)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 
5)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 
10)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
1)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
2)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
3)");
+        conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 
2)");
+        conn.commit();
+        String query = "SELECT k1,avg(k2) FROM GROUP_BY_DESC GROUP BY k1";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals(3, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("b", rs.getString(1));
+        assertEquals(5, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("j", rs.getString(1));
+        assertEquals(4, rs.getInt(2));
+        assertTrue(rs.next());
+        assertEquals("n", rs.getString(1));
+        assertEquals(2, rs.getInt(2));
+        assertFalse(rs.next());
+        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        assertEquals(
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER GROUP_BY_DESC\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", 
QueryUtil.getExplainPlan(rs));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 94d1fc8..770cf71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
 import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
 import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.OffsetResultIterator;
@@ -226,7 +227,7 @@ public class AggregatePlan extends BaseQueryPlan {
             aggResultIterator = new UngroupedAggregatingResultIterator(new 
ConcatResultIterator(iterators), aggregators);
         // If salted or local index we still need a merge sort as we'll 
potentially have multiple group by keys that aren't contiguous.
         } else if (groupBy.isOrderPreserving() && 
!(this.getTableRef().getTable().getBucketNum() != null || 
this.getTableRef().getTable().getIndexType() == IndexType.LOCAL)) {
-            aggResultIterator = new GroupedAggregatingResultIterator(new 
ConcatResultIterator(iterators), aggregators);
+            aggResultIterator = new 
RowKeyOrderedAggregateResultIterator(iterators, aggregators);
         } else {
             aggResultIterator = new GroupedAggregatingResultIterator(new 
MergeSortRowKeyResultIterator(iterators), aggregators);            
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
index abd0545..59a89ad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 
@@ -33,6 +34,7 @@ public interface AggregatingResultIterator extends 
ResultIterator {
      * Provides a means of re-aggregating a result row. For
      * scanners that need to look ahead (i.e. {@link 
org.apache.phoenix.iterate.OrderedAggregatingResultIterator}
      * @param result the row to re-aggregate
+     * @return Aggregator[] results
      */
-    void aggregate(Tuple result);
+    Aggregator[] aggregate(Tuple result);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
index 8fd36b3..84d29ff 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
@@ -91,10 +91,11 @@ public abstract class BaseGroupedAggregatingResultIterator 
implements
     }
     
     @Override
-    public void aggregate(Tuple result) {
+    public Aggregator[] aggregate(Tuple result) {
         Aggregator[] rowAggregators = aggregators.getAggregators();
         aggregators.reset(rowAggregators);
         aggregators.aggregate(rowAggregators, result);
+        return rowAggregators;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
index 1ba134b..cf3fb38 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -18,17 +18,20 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
 /**
  * Result scanner that dedups the incoming tuples to make them distinct.
  * <p>
@@ -155,8 +158,8 @@ public class DistinctAggregatingResultIterator implements 
AggregatingResultItera
     }
 
     @Override
-    public void aggregate(Tuple result) {
-        delegate.aggregate(result);
+    public Aggregator[] aggregate(Tuple result) {
+        return delegate.aggregate(result);
     }
 
        @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
index 4fa2011..5fd2028 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
@@ -21,10 +21,10 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBoolean;
 
 
 /**
@@ -66,8 +66,8 @@ public class FilterAggregatingResultIterator  implements 
AggregatingResultIterat
     }
 
     @Override
-    public void aggregate(Tuple result) {
-        delegate.aggregate(result);
+    public Aggregator[] aggregate(Tuple result) {
+        return delegate.aggregate(result);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
index da2be48..51a7dd8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 
@@ -55,7 +56,7 @@ public class OrderedAggregatingResultIterator extends 
OrderedResultIterator impl
     }
     
     @Override
-    public void aggregate(Tuple result) {
-        getDelegate().aggregate(result);
+    public Aggregator[] aggregate(Tuple result) {
+        return getDelegate().aggregate(result);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
new file mode 100644
index 0000000..3c52e51
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ * 
+ * Client-side aggregate for key ordered aggregation. Prevents the comparison 
of
+ * row keys for rows returned unless we cross a scan boundary.
+ * 
+ */
+public class RowKeyOrderedAggregateResultIterator extends 
LookAheadResultIterator implements AggregatingResultIterator {
+    private final ResultIterators resultIterators;
+    private List<PeekingResultIterator> iterators;
+    private final Aggregators aggregators;
+    private final ImmutableBytesWritable currentKey = new 
ImmutableBytesWritable();
+    private final ImmutableBytesWritable previousKey = new 
ImmutableBytesWritable();
+    private boolean traversedIterator = true;
+    private boolean nextTraversedIterators;
+    private Tuple next;
+    
+   private int index;
+    
+    public RowKeyOrderedAggregateResultIterator(ResultIterators iterators, 
Aggregators aggregators) {
+        this.resultIterators = iterators;
+        this.aggregators = aggregators;
+    }
+    
+    private List<PeekingResultIterator> getIterators() throws SQLException {
+        if (iterators == null && resultIterators != null) {
+            iterators = resultIterators.getIterators();
+        }
+        return iterators;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        SQLException toThrow = null;
+        try {
+            if (resultIterators != null) {
+                resultIterators.close();
+            }
+        } catch (Exception e) {
+           toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (iterators != null) {
+                    for (;index < iterators.size(); index++) {
+                        PeekingResultIterator iterator = iterators.get(index);
+                        try {
+                            iterator.close();
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                
toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
+                        }
+                    }
+                }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
+            }
+        }
+    }
+
+
+    @Override
+    public void explain(List<String> planSteps) {
+        if (resultIterators != null) {
+            resultIterators.explain(planSteps);
+        }
+    }
+
+    private Tuple nextTuple() throws SQLException {
+        List<PeekingResultIterator> iterators = getIterators();
+        while (index < iterators.size()) {
+            PeekingResultIterator iterator = iterators.get(index);
+            Tuple r = iterator.peek();
+            if (r != null) {
+                return iterator.next();
+            }
+            traversedIterator = true;
+            iterator.close();
+            index++;
+        }
+        return null;
+    }
+    
+    private boolean continueAggregating(Tuple previous, Tuple next) {
+        if (next == null) {
+            return false;
+        }
+        next.getKey(currentKey);
+        previous.getKey(previousKey);
+        return (currentKey.compareTo(previousKey) == 0);
+    }
+    
+    @Override
+    public Tuple next() throws SQLException {
+        Tuple t = super.next();
+        if (t == null) {
+            return null;
+        }
+        aggregate(t);
+        return t;
+    }
+    
+    @Override
+    protected Tuple advance() throws SQLException {
+        Tuple current = this.next;
+        boolean traversedIterators = nextTraversedIterators;
+        if (current == null) {
+            current = nextTuple();
+            traversedIterators = this.traversedIterator;
+        }
+        if (current != null) {
+            Tuple previous = current;
+            Aggregator[] rowAggregators = null;
+            while (true) {
+                current = nextTuple();
+                if (!traversedIterators || !continueAggregating(previous, 
current)) {
+                    break;
+                }
+                if (rowAggregators == null) {
+                    rowAggregators = aggregate(previous);
+                }
+                aggregators.aggregate(rowAggregators, current); 
+                traversedIterators = this.traversedIterator;
+            }
+            this.next = current;
+            this.nextTraversedIterators = this.traversedIterator;
+            if (rowAggregators == null) {
+                current = previous;
+            } else {
+                byte[] value = aggregators.toBytes(rowAggregators);
+                current = new 
SingleKeyValueTuple(KeyValueUtil.newKeyValue(previousKey, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+            }
+        }
+        if (current == null) {
+            close(); // Close underlying ResultIterators to free resources 
sooner rather than later
+        }
+        return current;
+    }
+
+       @Override
+       public String toString() {
+               return "RowKeyOrderedAggregateResultIterator [resultIterators=" 
+ resultIterators + ", index=" + index + "]";
+       }
+
+    @Override
+    public Aggregator[] aggregate(Tuple result) {
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        aggregators.reset(rowAggregators);
+        aggregators.aggregate(rowAggregators, result);
+        return rowAggregators;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index f53e871..791eb23 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -19,40 +19,22 @@ package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
-import static 
org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 
-import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.AggregationManager;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.aggregator.ClientAggregators;
-import org.apache.phoenix.expression.function.SingleAggregateFunction;
-import org.apache.phoenix.expression.function.SumAggregateFunction;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.schema.PLongColumn;
-import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.AssertResults;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
 
@@ -87,90 +69,9 @@ public class AggregateResultScannerTest extends 
BaseConnectionlessQueryTest {
                 new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, PLong.INSTANCE.toBytes(2L))),
             };
 
-        PhoenixConnection pconn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, new 
Scan(), new SequenceManager(statement));
-        AggregationManager aggregationManager = 
context.getAggregationManager();
-        SumAggregateFunction func = new 
SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new 
PLongColumn() {
-            @Override
-            public PName getName() {
-                return SINGLE_COLUMN_NAME;
-            }
-            @Override
-            public PName getFamilyName() {
-                return SINGLE_COLUMN_FAMILY_NAME;
-            }
-            @Override
-            public int getPosition() {
-                return 0;
-            }
-            
-            @Override
-            public SortOrder getSortOrder() {
-               return SortOrder.getDefault();
-            }
-            
-            @Override
-            public Integer getArraySize() {
-                return 0;
-            }
-            
-            @Override
-            public byte[] getViewConstant() {
-                return null;
-            }
-            
-            @Override
-            public boolean isViewReferenced() {
-                return false;
-            }
-            
-            @Override
-            public String getExpressionStr() {
-                return null;
-            }
-            @Override
-            public boolean isRowTimestamp() {
-                return false;
-            }
-                       @Override
-                       public boolean isDynamic() {
-                               return false;
-                       }
-        })), null);
-        aggregationManager.setAggregators(new 
ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1));
-        ResultIterators iterators = new ResultIterators() {
-
-            @Override
-            public List<PeekingResultIterator> getIterators() throws 
SQLException {
-                return results;
-            }
-
-            @Override
-            public int size() {
-                return results.size();
-            }
-
-            @Override
-            public void explain(List<String> planSteps) {
-            }
-
-                       @Override
-                       public List<KeyRange> getSplits() {
-                               return Collections.emptyList();
-                       }
-
-                       @Override
-                       public List<List<Scan>> getScans() {
-                               return Collections.emptyList();
-                       }
-
-            @Override
-            public void close() throws SQLException {
-            }
-            
-        };
-        ResultIterator scanner = new GroupedAggregatingResultIterator(new 
MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());
+        ResultIterators iterators = new MaterializedResultIterators(results);
+        ClientAggregators aggregators = 
TestUtil.getSingleSumAggregator(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES));
+        ResultIterator scanner = new GroupedAggregatingResultIterator(new 
MergeSortRowKeyResultIterator(iterators), aggregators);
         AssertResults.assertResults(scanner, expectedResults);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index cf71724..67d5cd0 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -56,36 +56,7 @@ public class ConcatResultIteratorTest {
                 new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(4))),
             };
         final List<PeekingResultIterator>results = Arrays.asList(new 
PeekingResultIterator[] {new 
MaterializedResultIterator(Arrays.asList(results1)), new 
MaterializedResultIterator(Arrays.asList(results2)), new 
MaterializedResultIterator(Arrays.asList(results3))});
-        ResultIterators iterators = new ResultIterators() {
-
-            @Override
-            public List<PeekingResultIterator> getIterators() throws 
SQLException {
-                return results;
-            }
-
-            @Override
-            public int size() {
-                return results.size();
-            }
-
-            @Override
-            public void explain(List<String> planSteps) {
-            }
-            
-                       @Override
-                       public List<KeyRange> getSplits() {
-                               return Collections.emptyList();
-                       }
-
-                       @Override
-                       public List<List<Scan>> getScans() {
-                               return Collections.emptyList();
-                       }
-
-            @Override
-            public void close() throws SQLException {
-            }
-        };
+        ResultIterators iterators = new MaterializedResultIterators(results);
 
         Tuple[] expectedResults = new Tuple[] {
                 new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
new file mode 100644
index 0000000..c4b0265
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.query.KeyRange;
+
+/**
+ * 
+ * ResultIteraors implementation backed by in-memory list of 
PeekingResultIterator
+ *
+ */
+public class MaterializedResultIterators implements ResultIterators {
+    private final List<PeekingResultIterator> results;
+
+    public MaterializedResultIterators(List<PeekingResultIterator> results) {
+        this.results = results;
+    }
+
+    @Override
+    public List<PeekingResultIterator> getIterators() throws SQLException {
+        return results;
+    }
+
+    @Override
+    public int size() {
+        return results.size();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+       return Collections.emptyList();
+    }
+
+    @Override
+    public List<List<Scan>> getScans() {
+       return Collections.emptyList();
+    }
+
+    @Override
+    public void close() throws SQLException {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
new file mode 100644
index 0000000..347de78
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.AssertResults;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+public class RowKeyOrderedAggregateResultIteratorTest extends 
BaseConnectionlessQueryTest {
+    private final static byte[] A = Bytes.toBytes("a");
+    private final static byte[] B = Bytes.toBytes("b");
+    private final static byte[] C = Bytes.toBytes("c");
+    private final static byte[] D = Bytes.toBytes("d");
+
+    @Test
+    public void testNoSpan() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1L))),
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(2L)))
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(4L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new 
PeekingResultIterator[] {new 
MaterializedResultIterator(Arrays.asList(results1)), new 
MaterializedResultIterator(Arrays.asList(results2)), new 
MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(2L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(4L))),
+            };
+
+        ClientAggregators aggregators = 
TestUtil.getSingleSumAggregator(getUrl(), 
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new 
RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+    
+    @Test
+    public void testSpanThree() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(2L)))
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(3L)))
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(4L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(5L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new 
PeekingResultIterator[] {new 
MaterializedResultIterator(Arrays.asList(results1)), new 
MaterializedResultIterator(Arrays.asList(results2)), new 
MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(9L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(5L))),
+            };
+
+        ClientAggregators aggregators = 
TestUtil.getSingleSumAggregator(getUrl(), 
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new 
RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+
+    @Test
+    public void testSpanAll() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(2L)))
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(3L)))
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(4L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new 
PeekingResultIterator[] {new 
MaterializedResultIterator(Arrays.asList(results1)), new 
MaterializedResultIterator(Arrays.asList(results2)), new 
MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(9L))),
+            };
+
+        ClientAggregators aggregators = 
TestUtil.getSingleSumAggregator(getUrl(), 
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new 
RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+    
+    @Test
+    public void testSpanEnd() throws Exception {
+        Tuple[] results1 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1L))),
+            };
+        Tuple[] results2 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(2L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(4L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(5L))),
+            };
+        Tuple[] results3 = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(6L))),
+            };
+        final List<PeekingResultIterator>results = Arrays.asList(new 
PeekingResultIterator[] {new 
MaterializedResultIterator(Arrays.asList(results1)), new 
MaterializedResultIterator(Arrays.asList(results2)), new 
MaterializedResultIterator(Arrays.asList(results3))});
+        ResultIterators iterators = new MaterializedResultIterators(results);
+
+        Tuple[] expectedResults = new Tuple[] {
+                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(3L))),
+                new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(4L))),
+                new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(11L))),
+            };
+
+        ClientAggregators aggregators = 
TestUtil.getSingleSumAggregator(getUrl(), 
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        ResultIterator scanner = new 
RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c73c160..872555c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.util;
 
 import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
+import static 
org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME;
 import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
@@ -42,6 +44,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -55,6 +58,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.AggregationManager;
+import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
@@ -70,7 +75,10 @@ import org.apache.phoenix.expression.NotExpression;
 import org.apache.phoenix.expression.OrExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.StringBasedLikeExpression;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
 import org.apache.phoenix.expression.function.SubstrFunction;
+import org.apache.phoenix.expression.function.SumAggregateFunction;
 import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.MultiKeyValueComparisonFilter;
 import org.apache.phoenix.filter.RowKeyComparisonFilter;
@@ -79,13 +87,17 @@ import 
org.apache.phoenix.filter.SingleKeyValueComparisonFilter;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.LikeParseNode.LikeType;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PLongColumn;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
@@ -622,5 +634,63 @@ public class TestUtil {
             tableNameBuilder.append(transactional ? "_TXN" : "_NON_TXN");
         return tableNameBuilder.toString();
     }
+
+    public static ClientAggregators getSingleSumAggregator(String url, 
Properties props) throws SQLException {
+        try (PhoenixConnection pconn = DriverManager.getConnection(url, 
props).unwrap(PhoenixConnection.class)) {
+            PhoenixStatement statement = new PhoenixStatement(pconn);
+            StatementContext context = new StatementContext(statement, null, 
new Scan(), new SequenceManager(statement));
+            AggregationManager aggregationManager = 
context.getAggregationManager();
+            SumAggregateFunction func = new 
SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new 
PLongColumn() {
+                @Override
+                public PName getName() {
+                    return SINGLE_COLUMN_NAME;
+                }
+                @Override
+                public PName getFamilyName() {
+                    return SINGLE_COLUMN_FAMILY_NAME;
+                }
+                @Override
+                public int getPosition() {
+                    return 0;
+                }
+                
+                @Override
+                public SortOrder getSortOrder() {
+                       return SortOrder.getDefault();
+                }
+                
+                @Override
+                public Integer getArraySize() {
+                    return 0;
+                }
+                
+                @Override
+                public byte[] getViewConstant() {
+                    return null;
+                }
+                
+                @Override
+                public boolean isViewReferenced() {
+                    return false;
+                }
+                
+                @Override
+                public String getExpressionStr() {
+                    return null;
+                }
+                @Override
+                public boolean isRowTimestamp() {
+                    return false;
+                }
+                       @Override
+                       public boolean isDynamic() {
+                               return false;
+                       }
+            })), null);
+            aggregationManager.setAggregators(new 
ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1));
+            ClientAggregators aggregators = 
aggregationManager.getAggregators();
+            return aggregators;
+        }
+    }
 }
 

Reply via email to