PHOENIX-2876 Using aggregation function in ORDER BY (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af8d3b65 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af8d3b65 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af8d3b65 Branch: refs/heads/master Commit: af8d3b65c84fd57b91b99ff36de2194149c5a94e Parents: d414505 Author: James Taylor <[email protected]> Authored: Thu May 12 14:32:34 2016 -0700 Committer: James Taylor <[email protected]> Committed: Thu May 12 14:32:34 2016 -0700 ---------------------------------------------------------------------- .../phoenix/compile/AggregationManager.java | 60 ++++++++++++++++++++ .../apache/phoenix/compile/DeleteCompiler.java | 1 + .../apache/phoenix/compile/PostDDLCompiler.java | 1 + .../phoenix/compile/ProjectionCompiler.java | 53 +---------------- .../apache/phoenix/compile/QueryCompiler.java | 1 + .../apache/phoenix/compile/UpsertCompiler.java | 6 +- .../phoenix/compile/QueryCompilerTest.java | 16 +++++- 7 files changed, 86 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java index ee2497b..c8e672e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java @@ -17,7 +17,21 @@ */ package org.apache.phoenix.compile; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.aggregator.ClientAggregators; +import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.expression.function.SingleAggregateFunction; +import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; /** * @@ -52,4 +66,50 @@ public class AggregationManager { public void setAggregators(ClientAggregators clientAggregator) { this.aggregators = clientAggregator; } + /** + * Compiles projection by: + * 1) Adding RowCount aggregate function if not present when limiting rows. We need this + * to track how many rows have been scanned. + * 2) Reordering aggregation functions (by putting fixed length aggregates first) to + * optimize the positional access of the aggregated value. + */ + public void compile(StatementContext context, GroupByCompiler.GroupBy groupBy) throws + SQLException { + final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount()); + + Iterator<Expression> expressions = context.getExpressionManager().getExpressions(); + while (expressions.hasNext()) { + Expression expression = expressions.next(); + expression.accept(new SingleAggregateFunctionVisitor() { + @Override + public Iterator<Expression> visitEnter(SingleAggregateFunction function) { + aggFuncSet.add(function); + return Iterators.emptyIterator(); + } + }); + } + if (aggFuncSet.isEmpty() && groupBy.isEmpty()) { + return; + } + List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet); + Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR); + + int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty()); + context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex)); + ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex); + context.getAggregationManager().setAggregators(clientAggregators); + } + + private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) { + int minNullableIndex = aggFuncs.size(); + for (int i = 0; i < aggFuncs.size(); i++) { + SingleAggregateFunction aggFunc = aggFuncs.get(i); + if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) { + minNullableIndex = i; + break; + } + } + return minNullableIndex; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 2a97686..fa3dd62 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -508,6 +508,7 @@ public class DeleteCompiler { // Ignoring ORDER BY, since with auto commit on and no limit makes no difference SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint()); RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY); + context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY); if (plan.getProjector().projectEveryRow()) { projectorToBe = new RowProjector(projectorToBe,true); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index a786438..e43b596 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -253,6 +253,7 @@ public class PostDDLCompiler { } // Need to project all column families into the scan, since we haven't yet created our empty key value RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY); + context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY); // Explicitly project these column families and don't project the empty key value, // since at this point we haven't added the empty key value everywhere. if (columnFamilies != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java index 3cf3934..8d7d7cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java @@ -188,8 +188,8 @@ public class ProjectionCompiler { try { dataTable = conn.getTable(new PTableKey(tenantId, tableName)); } catch (TableNotFoundException e) { - if (tenantId != null) { - // Check with null tenantId + if (tenantId != null) { + // Check with null tenantId dataTable = conn.getTable(new PTableKey(null, tableName)); } else { @@ -483,8 +483,6 @@ public class ProjectionCompiler { } } } - - selectVisitor.compile(); boolean isProjectEmptyKeyValue = false; if (isWildcard) { projectAllColumnFamilies(table, scan); @@ -576,18 +574,7 @@ public class ProjectionCompiler { } private static class SelectClauseVisitor extends ExpressionCompiler { - private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) { - int minNullableIndex = aggFuncs.size(); - for (int i = 0; i < aggFuncs.size(); i++) { - SingleAggregateFunction aggFunc = aggFuncs.get(i); - if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) { - minNullableIndex = i; - break; - } - } - return minNullableIndex; - } - + /** * Track whether or not the projection expression is case sensitive. We use this * information to determine whether or not we normalize the column name passed @@ -613,40 +600,6 @@ public class ProjectionCompiler { reset(); } - - /** - * Compiles projection by: - * 1) Adding RowCount aggregate function if not present when limiting rows. We need this - * to track how many rows have been scanned. - * 2) Reordering aggregation functions (by putting fixed length aggregates first) to - * optimize the positional access of the aggregated value. - */ - private void compile() throws SQLException { - final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount()); - - Iterator<Expression> expressions = context.getExpressionManager().getExpressions(); - while (expressions.hasNext()) { - Expression expression = expressions.next(); - expression.accept(new SingleAggregateFunctionVisitor() { - @Override - public Iterator<Expression> visitEnter(SingleAggregateFunction function) { - aggFuncSet.add(function); - return Iterators.emptyIterator(); - } - }); - } - if (aggFuncSet.isEmpty() && groupBy.isEmpty()) { - return; - } - List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet); - Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR); - - int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty()); - context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex)); - ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex); - context.getAggregationManager().setAggregators(clientAggregators); - } - @Override public void reset() { super.reset(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 82c9731..4488aff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -564,6 +564,7 @@ public class QueryCompiler { RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where); OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, offset, projector, groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder); + context.getAggregationManager().compile(context, groupBy); // Final step is to build the query plan if (!asSubquery) { int maxRows = statement.getMaxRows(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 7c6347f..e2fc2ca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -636,7 +636,11 @@ public class UpsertCompiler { PTable projectedTable = PTableImpl.makePTable(table, projectedColumns); SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint()); - RowProjector aggProjectorToBe = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY); + StatementContext statementContext = queryPlan.getContext(); + RowProjector aggProjectorToBe = ProjectionCompiler.compile(statementContext, select, GroupBy + .EMPTY_GROUP_BY); + statementContext.getAggregationManager().compile(queryPlan.getContext() + ,GroupBy.EMPTY_GROUP_BY); if (queryPlan.getProjector().projectEveryRow()) { aggProjectorToBe = new RowProjector(aggProjectorToBe,true); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 4b756fa..1db90a9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -2275,5 +2275,19 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { conn.close(); } } - + @Test + public void testOrderByWithNoProjection() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + try { + conn.createStatement().execute("create table x (id integer primary key, A.i1 integer," + + " B.i2 integer)"); + Scan scan = projectQuery("select A.i1 from X group by i1 order by avg(B.i2) " + + "desc"); + ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute + (BaseScannerRegionObserver.AGGREGATORS), null); + assertEquals(2,aggregators.getAggregatorCount()); + } finally { + conn.close(); + } + } }
