Repository: phoenix Updated Branches: refs/heads/calcite f04eaf172 -> ae0c234fa
PHOENIX-1841 Implement PhoenixSort in Phoenix/Calcite Integration Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ae0c234f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ae0c234f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ae0c234f Branch: refs/heads/calcite Commit: ae0c234faf9bd4edac23a7511790104cf86c097e Parents: f04eaf1 Author: maryannxue <[email protected]> Authored: Sun Apr 12 19:40:22 2015 -0400 Committer: maryannxue <[email protected]> Committed: Sun Apr 12 19:40:22 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 86 +++++++++++++- .../apache/phoenix/calcite/CalciteUtils.java | 10 ++ .../phoenix/calcite/PhoenixAggregate.java | 15 +-- .../apache/phoenix/calcite/PhoenixFilter.java | 3 - .../org/apache/phoenix/calcite/PhoenixJoin.java | 12 +- .../apache/phoenix/calcite/PhoenixProject.java | 13 +-- .../apache/phoenix/calcite/PhoenixRules.java | 2 +- .../org/apache/phoenix/calcite/PhoenixSort.java | 113 ++++++++++++++++++- .../apache/phoenix/compile/OrderByCompiler.java | 2 +- .../apache/phoenix/execute/AggregatePlan.java | 4 + .../org/apache/phoenix/execute/ScanPlan.java | 4 + 11 files changed, 230 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index 8d2be0d..3f39768 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -345,7 +345,91 @@ public class CalciteTest extends BaseClientManagedTimeIT { {"a"}, {"b"}, {"c"}}) - .close();; + .close(); + } + + @Test public void testSort() { + start().sql("select organization_id, entity_id, a_string from aTable order by a_string, entity_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" + + " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {"00D300000000XHP", "00A123122312312", "a"}, + {"00D300000000XHP", "00A223122312312", "a"}, + {"00D300000000XHP", "00A323122312312", "a"}, + {"00D300000000XHP", "00A423122312312", "a"}, + {"00D300000000XHP", "00B523122312312", "b"}, + {"00D300000000XHP", "00B623122312312", "b"}, + {"00D300000000XHP", "00B723122312312", "b"}, + {"00D300000000XHP", "00B823122312312", "b"}, + {"00D300000000XHP", "00C923122312312", "c"}}) + .close(); + + start().sql("select organization_id, entity_id, a_string from aTable order by organization_id, entity_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" + + " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {"00D300000000XHP", "00A123122312312", "a"}, + {"00D300000000XHP", "00A223122312312", "a"}, + {"00D300000000XHP", "00A323122312312", "a"}, + {"00D300000000XHP", "00A423122312312", "a"}, + {"00D300000000XHP", "00B523122312312", "b"}, + {"00D300000000XHP", "00B623122312312", "b"}, + {"00D300000000XHP", "00B723122312312", "b"}, + {"00D300000000XHP", "00B823122312312", "b"}, + {"00D300000000XHP", "00C923122312312", "c"}}) + .close(); + + start().sql("select count(entity_id), a_string from atable group by a_string order by count(entity_id), a_string desc") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixProject(EXPR$0=[$1], A_STRING=[$0])\n" + + " PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + + " PhoenixAggregate(group=[{0}], EXPR$0=[COUNT()])\n" + + " PhoenixProject(A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {1L, "c"}, + {4L, "b"}, + {4L, "a"}}) + .close(); + + start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + + " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" + + " PhoenixProject(NAME=[$1])\n" + + " PhoenixJoin(condition=[=($0, $2)], joinType=[inner])\n" + + " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n" + + " PhoenixProject(supplier_id=[$5])\n" + + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n") + .resultIs(new Object[][] { + {"S6", 1L}, + {"S5", 1L}, + {"S2", 2L}, + {"S1", 2L}}) + .close(); + + start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" order by item.name desc") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixSort(sort0=[$1], dir0=[DESC])\n" + + " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + + " PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" + + " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" + + " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") + .resultIs(new Object[][] { + {"0000000006", "T6", "0000000006", "S6"}, + {"0000000005", "T5", "0000000005", "S5"}, + {"0000000004", "T4", "0000000002", "S2"}, + {"0000000003", "T3", "0000000002", "S2"}, + {"0000000002", "T2", "0000000001", "S1"}, + {"0000000001", "T1", "0000000001", "S1"}}) + .close(); } @Test public void testSubquery() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index 4110b5e..d2a4a31 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -5,6 +5,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; @@ -15,6 +17,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.calcite.PhoenixRel.Implementor; +import org.apache.phoenix.calcite.PhoenixRel.PlanType; import org.apache.phoenix.expression.ComparisonExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; @@ -39,6 +42,13 @@ public class CalciteUtils { public static String createTempAlias() { return "$" + tempAliasCounter.incrementAndGet(); } + + public static RelNode getBestRel(RelNode rel) { + if (rel instanceof RelSubset) + return ((RelSubset) rel).getBest(); + + return rel; + } private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = Maps .newHashMapWithExpectedSize(ExpressionType.values().length); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java index c3d4982..ca3a34c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java @@ -7,7 +7,6 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; @@ -103,8 +102,9 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { basePlan = (ScanPlan) delegate; } } - // TopN, we can not merge with the base plan. - if (!plan.getOrderBy().getOrderByExpressions().isEmpty() && plan.getLimit() != null) { + // We can not merge with the base plan that has a limit already. + // But if there is order-by without a limit, we can simply ignore the order-by. + if (plan.getLimit() != null) { basePlan = null; } PhoenixStatement stmt = plan.getContext().getStatement(); @@ -183,12 +183,9 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { } private boolean isServerAggregateDoable() { - RelNode rel = getInput(); - if (rel instanceof RelSubset) { - rel = ((RelSubset) rel).getBest(); - } - - return rel instanceof PhoenixRel && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER; + RelNode rel = CalciteUtils.getBestRel(getInput()); + return rel instanceof PhoenixRel + && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java index 8925ead..0a49477 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java @@ -34,9 +34,6 @@ public class PhoenixFilter extends Filter implements PhoenixRel { public QueryPlan implement(Implementor implementor) { assert getConvention() == getInput().getConvention(); QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); - // TODO: what to do with the Expression? - // Already determined this filter cannot be pushed down, so - // this will be run Expression expr = CalciteUtils.toExpression(condition, implementor); return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java index c316b5d..0b85217 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java @@ -9,7 +9,6 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; @@ -125,11 +124,10 @@ public class PhoenixJoin extends Join implements PhoenixRel { private boolean isHashJoinDoable() { // TODO check memory limit - RelNode rel = getLeft(); - if (rel instanceof RelSubset) { - rel = ((RelSubset) rel).getBest(); - } - return (rel instanceof PhoenixRel && ((PhoenixRel) rel).getPlanType() == PlanType.SERVER_ONLY_FLAT) && getJoinType() != JoinRelType.RIGHT; + RelNode rel = CalciteUtils.getBestRel(getLeft()); + return rel instanceof PhoenixRel + && ((PhoenixRel) rel).getPlanType() == PlanType.SERVER_ONLY_FLAT + && getJoinType() != JoinRelType.RIGHT; } private JoinType convertJoinType(JoinRelType type) { @@ -155,6 +153,6 @@ public class PhoenixJoin extends Join implements PhoenixRel { @Override public PlanType getPlanType() { - return PlanType.SERVER_ONLY_COMPLEX; + return isHashJoinDoable() ? PlanType.SERVER_ONLY_COMPLEX : PlanType.CLIENT_SERVER; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java index 4f08968..53793d7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java @@ -6,13 +6,11 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.execute.ScanPlan; import org.apache.phoenix.execute.TupleProjectionPlan; import org.apache.phoenix.execute.TupleProjector; @@ -71,13 +69,8 @@ public class PhoenixProject extends Project implements PhoenixRel { @Override public PlanType getPlanType() { - RelNode rel = getInput(); - if (rel instanceof RelSubset) { - rel = ((RelSubset) rel).getBest(); - } - // TODO this is based on the assumption that there is no two Project - // in a row and Project can be pushed down to the input node if it is - // a server plan. - return !(rel instanceof PhoenixRel) ? PlanType.CLIENT_SERVER : ((PhoenixRel) rel).getPlanType(); + RelNode rel = CalciteUtils.getBestRel(getInput()); + return rel instanceof PhoenixRel && !(rel instanceof PhoenixProject) ? + ((PhoenixRel) rel).getPlanType() : PlanType.CLIENT_SERVER; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java index e98ee73..843e6de 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java @@ -68,7 +68,7 @@ public class PhoenixRules { sort.getTraitSet().replace(out) .replace(sort.getCollation()); return new PhoenixSort(rel.getCluster(), traitSet, - convert(sort.getInput(), sort.getInput().getTraitSet().replace(RelCollationImpl.EMPTY)), + convert(sort.getInput(), sort.getInput().getTraitSet().replace(out)), sort.getCollation(), sort.offset, sort.fetch); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java index 6d11231..978e264 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java @@ -1,12 +1,39 @@ package org.apache.phoenix.calcite; +import java.sql.SQLException; +import java.util.List; + import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.BaseQueryPlan; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.collect.Lists; /** * Implementation of {@link org.apache.calcite.rel.core.Sort} @@ -15,6 +42,8 @@ import org.apache.phoenix.compile.QueryPlan; * <p>Like {@code Sort}, it also supports LIMIT and OFFSET. */ public class PhoenixSort extends Sort implements PhoenixRel { + private static final double CLIENT_MERGE_FACTOR = 0.5; + public PhoenixSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { super(cluster, traits, child, collation, offset, fetch); } @@ -23,11 +52,91 @@ public class PhoenixSort extends Sort implements PhoenixRel { public PhoenixSort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, RexNode offset, RexNode fetch) { return new PhoenixSort(getCluster(), traitSet, newInput, newCollation, offset, fetch); } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + RelOptCost cost = super.computeSelfCost(planner); + if (isServerSortDoable()) { + cost = cost.multiplyBy(SERVER_FACTOR); + } else if (isClientSortMergable()) { + cost = cost.multiplyBy(CLIENT_MERGE_FACTOR); + } + return cost.multiplyBy(PHOENIX_FACTOR); + } @Override public QueryPlan implement(Implementor implementor) { - implementor.visitInput(0, (PhoenixRel) getInput()); - throw new UnsupportedOperationException(); + assert getConvention() == getInput().getConvention(); + if (this.fetch != null || this.offset != null) + throw new UnsupportedOperationException(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + TableRef tableRef = implementor.getTableRef(); + BaseQueryPlan basePlan = null; + if (plan instanceof BaseQueryPlan) { + basePlan = (BaseQueryPlan) plan; + } else if (plan instanceof HashJoinPlan) { + QueryPlan delegate = ((HashJoinPlan) plan).getDelegate(); + if (delegate instanceof BaseQueryPlan) { + basePlan = (BaseQueryPlan) delegate; + } + } + // We can not merge with the base plan that has a limit already. + // But if there is order-by without a limit, we can simply ignore the order-by. + if (plan.getLimit() != null) { + basePlan = null; + } + PhoenixStatement stmt = plan.getContext().getStatement(); + StatementContext context; + try { + context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + List<OrderByExpression> orderByExpressions = Lists.newArrayList(); + for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) { + Expression expr = implementor.newColumnExpression(fieldCollation.getFieldIndex()); + boolean isAscending = fieldCollation.getDirection() == Direction.ASCENDING; + if (expr.getSortOrder() == SortOrder.DESC) { + isAscending = !isAscending; + } + orderByExpressions.add(new OrderByExpression(expr, fieldCollation.nullDirection == NullDirection.LAST, isAscending)); + } + OrderBy orderBy = new OrderBy(orderByExpressions); + + SelectStatement select = SelectStatement.SELECT_STAR; + if (basePlan == null) { + return new ClientScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan); + } + + QueryPlan newPlan; + try { + if (basePlan instanceof ScanPlan) { + newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy); + } else { + newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (plan instanceof HashJoinPlan) { + HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; + newPlan = HashJoinPlan.create(select, newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); + } + return newPlan; + } + + private boolean isServerSortDoable() { + RelNode rel = CalciteUtils.getBestRel(getInput()); + return rel instanceof PhoenixRel + && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER; + } + + private boolean isClientSortMergable() { + RelNode rel = CalciteUtils.getBestRel(getInput()); + return rel instanceof PhoenixAggregate; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java index 215f59e..70b3b97 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java @@ -58,7 +58,7 @@ public class OrderByCompiler { private final List<OrderByExpression> orderByExpressions; - private OrderBy(List<OrderByExpression> orderByExpressions) { + public OrderBy(List<OrderByExpression> orderByExpressions) { this.orderByExpressions = ImmutableList.copyOf(orderByExpressions); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 617cc48..241814c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -71,6 +71,10 @@ public class AggregatePlan extends BaseQueryPlan { private final Expression having; private List<KeyRange> splits; private List<List<Scan>> scans; + + public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) { + return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving()); + } public AggregatePlan( StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index d0a71f4..0dfbcbf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -73,6 +73,10 @@ public class ScanPlan extends BaseQueryPlan { private List<List<Scan>> scans; private boolean allowPageFilter; + public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException { + return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter); + } + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory :
