Repository: phoenix Updated Branches: refs/heads/master d97eb4967 -> ae14e38cc
PHOENIX-2818 Optimize ORDERED group by Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d414505d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d414505d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d414505d Branch: refs/heads/master Commit: d414505df8afeba553734db688547c6a4e9e90d9 Parents: d97eb49 Author: James Taylor <[email protected]> Authored: Thu May 12 14:29:29 2016 -0700 Committer: James Taylor <[email protected]> Committed: Thu May 12 14:29:29 2016 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/GroupByCaseIT.java | 92 +++++++++ .../apache/phoenix/execute/AggregatePlan.java | 3 +- .../iterate/AggregatingResultIterator.java | 4 +- .../BaseGroupedAggregatingResultIterator.java | 3 +- .../DistinctAggregatingResultIterator.java | 15 +- .../FilterAggregatingResultIterator.java | 8 +- .../OrderedAggregatingResultIterator.java | 5 +- .../RowKeyOrderedAggregateResultIterator.java | 190 +++++++++++++++++++ .../iterate/AggregateResultScannerTest.java | 109 +---------- .../iterate/ConcatResultIteratorTest.java | 31 +-- .../iterate/MaterializedResultIterators.java | 66 +++++++ ...owKeyOrderedAggregateResultIteratorTest.java | 149 +++++++++++++++ .../java/org/apache/phoenix/util/TestUtil.java | 70 +++++++ 13 files changed, 596 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java index 44f43b7..b0524da 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java @@ -32,6 +32,9 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; +import org.apache.phoenix.schema.types.PChar; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.junit.Test; @@ -343,4 +346,93 @@ public class GroupByCaseIT extends BaseHBaseManagedTimeIT { " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs)); } + @Test + public void testSumGroupByOrderPreservingDesc() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("CREATE TABLE GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary key (k1,k2)) split on (?,?,?)"); + stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), PInteger.INSTANCE.toBytes(3))); + stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), PInteger.INSTANCE.toBytes(3))); + stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), PInteger.INSTANCE.toBytes(3))); + stmt.execute(); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 1)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 2)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 3)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 4)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 5)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 1)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 2)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 3)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 4)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 1)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 3)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 4)"); + conn.commit(); + String query = "SELECT k1,sum(k2) FROM GROUP_BY_DESC GROUP BY k1 ORDER BY k1 DESC"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("n", rs.getString(1)); + assertEquals(10, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("j", rs.getString(1)); + assertEquals(10, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertEquals(5, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals(10, rs.getInt(2)); + assertFalse(rs.next()); + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals( + "CLIENT PARALLEL 1-WAY REVERSE FULL SCAN OVER GROUP_BY_DESC\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs)); + } + + @Test + public void testAvgGroupByOrderPreserving() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("CREATE TABLE GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary key (k1,k2)) split on (?,?,?)"); + stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), PInteger.INSTANCE.toBytes(3))); + stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), PInteger.INSTANCE.toBytes(3))); + stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), PInteger.INSTANCE.toBytes(3))); + stmt.execute(); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 1)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 2)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 3)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 6)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 5)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 1)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 2)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 3)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 10)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 1)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 3)"); + conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)"); + conn.commit(); + String query = "SELECT k1,avg(k2) FROM GROUP_BY_DESC GROUP BY k1"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals(3, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertEquals(5, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("j", rs.getString(1)); + assertEquals(4, rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("n", rs.getString(1)); + assertEquals(2, rs.getInt(2)); + assertFalse(rs.next()); + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals( + "CLIENT PARALLEL 1-WAY FULL SCAN OVER GROUP_BY_DESC\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs)); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 94d1fc8..770cf71 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -41,6 +41,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.DistinctAggregatingResultIterator; import org.apache.phoenix.iterate.FilterAggregatingResultIterator; import org.apache.phoenix.iterate.GroupedAggregatingResultIterator; +import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator; import org.apache.phoenix.iterate.LimitingResultIterator; import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator; import org.apache.phoenix.iterate.OffsetResultIterator; @@ -226,7 +227,7 @@ public class AggregatePlan extends BaseQueryPlan { aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators); // If salted or local index we still need a merge sort as we'll potentially have multiple group by keys that aren't contiguous. } else if (groupBy.isOrderPreserving() && !(this.getTableRef().getTable().getBucketNum() != null || this.getTableRef().getTable().getIndexType() == IndexType.LOCAL)) { - aggResultIterator = new GroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators); + aggResultIterator = new RowKeyOrderedAggregateResultIterator(iterators, aggregators); } else { aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java index abd0545..59a89ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.iterate; +import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.schema.tuple.Tuple; @@ -33,6 +34,7 @@ public interface AggregatingResultIterator extends ResultIterator { * Provides a means of re-aggregating a result row. For * scanners that need to look ahead (i.e. {@link org.apache.phoenix.iterate.OrderedAggregatingResultIterator} * @param result the row to re-aggregate + * @return Aggregator[] results */ - void aggregate(Tuple result); + Aggregator[] aggregate(Tuple result); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java index 8fd36b3..84d29ff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java @@ -91,10 +91,11 @@ public abstract class BaseGroupedAggregatingResultIterator implements } @Override - public void aggregate(Tuple result) { + public Aggregator[] aggregate(Tuple result) { Aggregator[] rowAggregators = aggregators.getAggregators(); aggregators.reset(rowAggregators); aggregators.aggregate(rowAggregators, result); + return rowAggregators; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java index 1ba134b..cf3fb38 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java @@ -18,17 +18,20 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; -import java.util.*; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - -import com.google.common.collect.Iterators; -import com.google.common.collect.Sets; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.schema.tuple.Tuple; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; + /** * Result scanner that dedups the incoming tuples to make them distinct. * <p> @@ -155,8 +158,8 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera } @Override - public void aggregate(Tuple result) { - delegate.aggregate(result); + public Aggregator[] aggregate(Tuple result) { + return delegate.aggregate(result); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java index 4fa2011..5fd2028 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java @@ -21,10 +21,10 @@ import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.schema.types.PBoolean; +import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PBoolean; /** @@ -66,8 +66,8 @@ public class FilterAggregatingResultIterator implements AggregatingResultIterat } @Override - public void aggregate(Tuple result) { - delegate.aggregate(result); + public Aggregator[] aggregate(Tuple result) { + return delegate.aggregate(result); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java index da2be48..51a7dd8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.List; import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.schema.tuple.Tuple; @@ -55,7 +56,7 @@ public class OrderedAggregatingResultIterator extends OrderedResultIterator impl } @Override - public void aggregate(Tuple result) { - getDelegate().aggregate(result); + public Aggregator[] aggregate(Tuple result) { + return getDelegate().aggregate(result); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java new file mode 100644 index 0000000..3c52e51 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.ServerUtil; + + +/** + * + * Client-side aggregate for key ordered aggregation. Prevents the comparison of + * row keys for rows returned unless we cross a scan boundary. + * + */ +public class RowKeyOrderedAggregateResultIterator extends LookAheadResultIterator implements AggregatingResultIterator { + private final ResultIterators resultIterators; + private List<PeekingResultIterator> iterators; + private final Aggregators aggregators; + private final ImmutableBytesWritable currentKey = new ImmutableBytesWritable(); + private final ImmutableBytesWritable previousKey = new ImmutableBytesWritable(); + private boolean traversedIterator = true; + private boolean nextTraversedIterators; + private Tuple next; + + private int index; + + public RowKeyOrderedAggregateResultIterator(ResultIterators iterators, Aggregators aggregators) { + this.resultIterators = iterators; + this.aggregators = aggregators; + } + + private List<PeekingResultIterator> getIterators() throws SQLException { + if (iterators == null && resultIterators != null) { + iterators = resultIterators.getIterators(); + } + return iterators; + } + + @Override + public void close() throws SQLException { + SQLException toThrow = null; + try { + if (resultIterators != null) { + resultIterators.close(); + } + } catch (Exception e) { + toThrow = ServerUtil.parseServerException(e); + } finally { + try { + if (iterators != null) { + for (;index < iterators.size(); index++) { + PeekingResultIterator iterator = iterators.get(index); + try { + iterator.close(); + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } + } + } + } + } finally { + if (toThrow != null) { + throw toThrow; + } + } + } + } + + + @Override + public void explain(List<String> planSteps) { + if (resultIterators != null) { + resultIterators.explain(planSteps); + } + } + + private Tuple nextTuple() throws SQLException { + List<PeekingResultIterator> iterators = getIterators(); + while (index < iterators.size()) { + PeekingResultIterator iterator = iterators.get(index); + Tuple r = iterator.peek(); + if (r != null) { + return iterator.next(); + } + traversedIterator = true; + iterator.close(); + index++; + } + return null; + } + + private boolean continueAggregating(Tuple previous, Tuple next) { + if (next == null) { + return false; + } + next.getKey(currentKey); + previous.getKey(previousKey); + return (currentKey.compareTo(previousKey) == 0); + } + + @Override + public Tuple next() throws SQLException { + Tuple t = super.next(); + if (t == null) { + return null; + } + aggregate(t); + return t; + } + + @Override + protected Tuple advance() throws SQLException { + Tuple current = this.next; + boolean traversedIterators = nextTraversedIterators; + if (current == null) { + current = nextTuple(); + traversedIterators = this.traversedIterator; + } + if (current != null) { + Tuple previous = current; + Aggregator[] rowAggregators = null; + while (true) { + current = nextTuple(); + if (!traversedIterators || !continueAggregating(previous, current)) { + break; + } + if (rowAggregators == null) { + rowAggregators = aggregate(previous); + } + aggregators.aggregate(rowAggregators, current); + traversedIterators = this.traversedIterator; + } + this.next = current; + this.nextTraversedIterators = this.traversedIterator; + if (rowAggregators == null) { + current = previous; + } else { + byte[] value = aggregators.toBytes(rowAggregators); + current = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(previousKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); + } + } + if (current == null) { + close(); // Close underlying ResultIterators to free resources sooner rather than later + } + return current; + } + + @Override + public String toString() { + return "RowKeyOrderedAggregateResultIterator [resultIterators=" + resultIterators + ", index=" + index + "]"; + } + + @Override + public Aggregator[] aggregate(Tuple result) { + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + aggregators.aggregate(rowAggregators, result); + return rowAggregators; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java index f53e871..791eb23 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java @@ -19,40 +19,22 @@ package org.apache.phoenix.iterate; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; -import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME; -import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import java.sql.DriverManager; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.AggregationManager; -import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.aggregator.ClientAggregators; -import org.apache.phoenix.expression.function.SingleAggregateFunction; -import org.apache.phoenix.expression.function.SumAggregateFunction; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.BaseConnectionlessQueryTest; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.types.PLong; -import org.apache.phoenix.schema.PLongColumn; -import org.apache.phoenix.schema.PName; -import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.AssertResults; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.Test; @@ -87,90 +69,9 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest { new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, PLong.INSTANCE.toBytes(2L))), }; - PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); - PhoenixStatement statement = new PhoenixStatement(pconn); - StatementContext context = new StatementContext(statement, null, new Scan(), new SequenceManager(statement)); - AggregationManager aggregationManager = context.getAggregationManager(); - SumAggregateFunction func = new SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new PLongColumn() { - @Override - public PName getName() { - return SINGLE_COLUMN_NAME; - } - @Override - public PName getFamilyName() { - return SINGLE_COLUMN_FAMILY_NAME; - } - @Override - public int getPosition() { - return 0; - } - - @Override - public SortOrder getSortOrder() { - return SortOrder.getDefault(); - } - - @Override - public Integer getArraySize() { - return 0; - } - - @Override - public byte[] getViewConstant() { - return null; - } - - @Override - public boolean isViewReferenced() { - return false; - } - - @Override - public String getExpressionStr() { - return null; - } - @Override - public boolean isRowTimestamp() { - return false; - } - @Override - public boolean isDynamic() { - return false; - } - })), null); - aggregationManager.setAggregators(new ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1)); - ResultIterators iterators = new ResultIterators() { - - @Override - public List<PeekingResultIterator> getIterators() throws SQLException { - return results; - } - - @Override - public int size() { - return results.size(); - } - - @Override - public void explain(List<String> planSteps) { - } - - @Override - public List<KeyRange> getSplits() { - return Collections.emptyList(); - } - - @Override - public List<List<Scan>> getScans() { - return Collections.emptyList(); - } - - @Override - public void close() throws SQLException { - } - - }; - ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators()); + ResultIterators iterators = new MaterializedResultIterators(results); + ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)); + ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators); AssertResults.assertResults(scanner, expectedResults); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java index cf71724..67d5cd0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java @@ -56,36 +56,7 @@ public class ConcatResultIteratorTest { new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4))), }; final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))}); - ResultIterators iterators = new ResultIterators() { - - @Override - public List<PeekingResultIterator> getIterators() throws SQLException { - return results; - } - - @Override - public int size() { - return results.size(); - } - - @Override - public void explain(List<String> planSteps) { - } - - @Override - public List<KeyRange> getSplits() { - return Collections.emptyList(); - } - - @Override - public List<List<Scan>> getScans() { - return Collections.emptyList(); - } - - @Override - public void close() throws SQLException { - } - }; + ResultIterators iterators = new MaterializedResultIterators(results); Tuple[] expectedResults = new Tuple[] { new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java new file mode 100644 index 0000000..c4b0265 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.query.KeyRange; + +/** + * + * ResultIteraors implementation backed by in-memory list of PeekingResultIterator + * + */ +public class MaterializedResultIterators implements ResultIterators { + private final List<PeekingResultIterator> results; + + public MaterializedResultIterators(List<PeekingResultIterator> results) { + this.results = results; + } + + @Override + public List<PeekingResultIterator> getIterators() throws SQLException { + return results; + } + + @Override + public int size() { + return results.size(); + } + + @Override + public void explain(List<String> planSteps) { + } + + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } + + @Override + public void close() throws SQLException { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java new file mode 100644 index 0000000..347de78 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.aggregator.ClientAggregators; +import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.AssertResults; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Test; + +public class RowKeyOrderedAggregateResultIteratorTest extends BaseConnectionlessQueryTest { + private final static byte[] A = Bytes.toBytes("a"); + private final static byte[] B = Bytes.toBytes("b"); + private final static byte[] C = Bytes.toBytes("c"); + private final static byte[] D = Bytes.toBytes("d"); + + @Test + public void testNoSpan() throws Exception { + Tuple[] results1 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))), + }; + Tuple[] results2 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))) + }; + Tuple[] results3 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))), + new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))), + }; + final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))}); + ResultIterators iterators = new MaterializedResultIterators(results); + + Tuple[] expectedResults = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))), + new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))), + new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))), + }; + + ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators); + AssertResults.assertResults(scanner, expectedResults); + } + + @Test + public void testSpanThree() throws Exception { + Tuple[] results1 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))) + }; + Tuple[] results2 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))) + }; + Tuple[] results3 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))), + new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))), + }; + final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))}); + ResultIterators iterators = new MaterializedResultIterators(results); + + Tuple[] expectedResults = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(9L))), + new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))), + }; + + ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators); + AssertResults.assertResults(scanner, expectedResults); + } + + @Test + public void testSpanAll() throws Exception { + Tuple[] results1 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))) + }; + Tuple[] results2 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))) + }; + Tuple[] results3 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))), + }; + final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))}); + ResultIterators iterators = new MaterializedResultIterators(results); + + Tuple[] expectedResults = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(9L))), + }; + + ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators); + AssertResults.assertResults(scanner, expectedResults); + } + + @Test + public void testSpanEnd() throws Exception { + Tuple[] results1 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))), + }; + Tuple[] results2 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))), + new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))), + new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))), + }; + Tuple[] results3 = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(6L))), + }; + final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))}); + ResultIterators iterators = new MaterializedResultIterators(results); + + Tuple[] expectedResults = new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))), + new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))), + new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(11L))), + }; + + ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators); + AssertResults.assertResults(scanner, expectedResults); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index c73c160..872555c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -18,6 +18,8 @@ package org.apache.phoenix.util; import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME; import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; @@ -42,6 +44,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -55,6 +58,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.AggregationManager; +import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse; @@ -70,7 +75,10 @@ import org.apache.phoenix.expression.NotExpression; import org.apache.phoenix.expression.OrExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.StringBasedLikeExpression; +import org.apache.phoenix.expression.aggregator.ClientAggregators; +import org.apache.phoenix.expression.function.SingleAggregateFunction; import org.apache.phoenix.expression.function.SubstrFunction; +import org.apache.phoenix.expression.function.SumAggregateFunction; import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter; import org.apache.phoenix.filter.MultiKeyValueComparisonFilter; import org.apache.phoenix.filter.RowKeyComparisonFilter; @@ -79,13 +87,17 @@ import org.apache.phoenix.filter.SingleKeyValueComparisonFilter; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.LikeParseNode.LikeType; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PLongColumn; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.RowKeyValueAccessor; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.PTableStats; @@ -622,5 +634,63 @@ public class TestUtil { tableNameBuilder.append(transactional ? "_TXN" : "_NON_TXN"); return tableNameBuilder.toString(); } + + public static ClientAggregators getSingleSumAggregator(String url, Properties props) throws SQLException { + try (PhoenixConnection pconn = DriverManager.getConnection(url, props).unwrap(PhoenixConnection.class)) { + PhoenixStatement statement = new PhoenixStatement(pconn); + StatementContext context = new StatementContext(statement, null, new Scan(), new SequenceManager(statement)); + AggregationManager aggregationManager = context.getAggregationManager(); + SumAggregateFunction func = new SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new PLongColumn() { + @Override + public PName getName() { + return SINGLE_COLUMN_NAME; + } + @Override + public PName getFamilyName() { + return SINGLE_COLUMN_FAMILY_NAME; + } + @Override + public int getPosition() { + return 0; + } + + @Override + public SortOrder getSortOrder() { + return SortOrder.getDefault(); + } + + @Override + public Integer getArraySize() { + return 0; + } + + @Override + public byte[] getViewConstant() { + return null; + } + + @Override + public boolean isViewReferenced() { + return false; + } + + @Override + public String getExpressionStr() { + return null; + } + @Override + public boolean isRowTimestamp() { + return false; + } + @Override + public boolean isDynamic() { + return false; + } + })), null); + aggregationManager.setAggregators(new ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1)); + ClientAggregators aggregators = aggregationManager.getAggregators(); + return aggregators; + } + } }
