Repository: phoenix Updated Branches: refs/heads/calcite c967b7962 -> 4c65ef2b5
PHOENIX-1843 Implement getCollations() in PhoenixTable.getStatistics() and all other PhoenixRel nodes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4c65ef2b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4c65ef2b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4c65ef2b Branch: refs/heads/calcite Commit: 4c65ef2b51ae7ac49010ba89b393fb3c5e2bdcca Parents: c967b79 Author: maryannxue <[email protected]> Authored: Tue Apr 21 17:14:54 2015 -0400 Committer: maryannxue <[email protected]> Committed: Tue Apr 21 17:14:54 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 64 +++++++------- .../apache/phoenix/calcite/PhoenixTable.java | 23 +++-- .../calcite/metadata/PhoenixRelMdCollation.java | 93 ++++++++++++++++++++ .../metadata/PhoenixRelMetadataProvider.java | 1 + .../calcite/rel/PhoenixAbstractJoin.java | 3 + .../calcite/rel/PhoenixAbstractSort.java | 42 +-------- .../calcite/rel/PhoenixClientAggregate.java | 2 +- .../phoenix/calcite/rel/PhoenixClientJoin.java | 47 +++++++++- .../calcite/rel/PhoenixClientProject.java | 2 +- .../phoenix/calcite/rel/PhoenixClientSort.java | 14 ++- .../calcite/rel/PhoenixCompactClientSort.java | 14 ++- .../phoenix/calcite/rel/PhoenixFilter.java | 20 ++++- .../apache/phoenix/calcite/rel/PhoenixJoin.java | 2 +- .../phoenix/calcite/rel/PhoenixLimit.java | 20 +++-- .../calcite/rel/PhoenixServerAggregate.java | 2 +- .../phoenix/calcite/rel/PhoenixServerJoin.java | 22 +++-- .../calcite/rel/PhoenixServerProject.java | 2 +- .../phoenix/calcite/rel/PhoenixServerSort.java | 14 ++- .../rel/PhoenixToEnumerableConverter.java | 2 +- .../phoenix/calcite/rel/PhoenixUnion.java | 2 +- .../phoenix/calcite/rel/PhoenixValues.java | 4 +- .../calcite/rules/PhoenixClientJoinRule.java | 32 +------ .../rules/PhoenixCompactClientSortRule.java | 2 +- .../calcite/rules/PhoenixConverterRules.java | 29 +++--- .../rules/PhoenixInnerSortRemoveRule.java | 29 ++++++ .../calcite/rules/PhoenixServerSortRule.java | 2 +- .../apache/phoenix/execute/AggregatePlan.java | 4 +- .../org/apache/phoenix/execute/ScanPlan.java | 4 +- 28 files changed, 320 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index 05dcc9e..acd230b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -261,12 +261,11 @@ public class CalciteTest extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixClientProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + " PhoenixClientJoin(condition=[=($2, $3)], joinType=[full])\n" + - " PhoenixServerSort(sort0=[$2], dir0=[ASC-nulls-first])\n" + + " PhoenixServerSort(sort0=[$2], dir0=[ASC])\n" + " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" + - " PhoenixServerSort(sort0=[$0], dir0=[ASC-nulls-first])\n" + - " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + - " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") + " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") .close(); } @@ -382,9 +381,8 @@ public class CalciteTest extends BaseClientManagedTimeIT { start().sql("select organization_id, entity_id, a_string from aTable order by organization_id, entity_id") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" + - " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {"00D300000000XHP", "00A123122312312", "a"}, {"00D300000000XHP", "00A223122312312", "a"}, @@ -449,9 +447,10 @@ public class CalciteTest extends BaseClientManagedTimeIT { @Test public void testSortWithLimit() { start().sql("select organization_id, entity_id, a_string from aTable order by a_string, entity_id limit 5") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[5])\n" + - " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixLimit(fetch=[5])\n" + + " PhoenixServerSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" + + " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {"00D300000000XHP", "00A123122312312", "a"}, {"00D300000000XHP", "00A223122312312", "a"}, @@ -462,9 +461,9 @@ public class CalciteTest extends BaseClientManagedTimeIT { start().sql("select organization_id, entity_id, a_string from aTable order by organization_id, entity_id limit 5") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[5])\n" + + " PhoenixLimit(fetch=[5])\n" + " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixTableScan(table=[[phoenix, ATABLE]], statelessFetch=[5])\n") .resultIs(new Object[][] { {"00D300000000XHP", "00A123122312312", "a"}, {"00D300000000XHP", "00A223122312312", "a"}, @@ -476,10 +475,11 @@ public class CalciteTest extends BaseClientManagedTimeIT { start().sql("select count(entity_id), a_string from atable group by a_string order by count(entity_id), a_string desc limit 2") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" + - " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC], fetch=[2])\n" + - " PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" + - " PhoenixServerProject(A_STRING=[$2])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixLimit(fetch=[2])\n" + + " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + + " PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" + + " PhoenixServerProject(A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {1L, "c"}, {4L, "b"}}) @@ -487,14 +487,15 @@ public class CalciteTest extends BaseClientManagedTimeIT { start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc limit 3") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC], fetch=[3])\n" + - " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" + - " PhoenixServerProject(NAME=[$2])\n" + - " PhoenixServerJoin(condition=[=($1, $0)], joinType=[inner])\n" + - " PhoenixServerProject(supplier_id=[$5])\n" + - " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" + - " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + - " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") + " PhoenixLimit(fetch=[3])\n" + + " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + + " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" + + " PhoenixServerProject(NAME=[$2])\n" + + " PhoenixServerJoin(condition=[=($1, $0)], joinType=[inner])\n" + + " PhoenixServerProject(supplier_id=[$5])\n" + + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" + + " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") .resultIs(new Object[][] { {"S6", 1L}, {"S5", 1L}, @@ -503,13 +504,14 @@ public class CalciteTest extends BaseClientManagedTimeIT { start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" order by item.name desc limit 3") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerSort(sort0=[$1], dir0=[DESC], fetch=[3])\n" + - " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + - " PhoenixServerJoin(condition=[=($2, $3)], joinType=[inner])\n" + - " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + - " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" + - " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + - " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") + " PhoenixLimit(fetch=[3])\n" + + " PhoenixServerSort(sort0=[$1], dir0=[DESC])\n" + + " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + + " PhoenixServerJoin(condition=[=($2, $3)], joinType=[inner])\n" + + " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" + + " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") .resultIs(new Object[][] { {"0000000006", "T6", "0000000006", "S6"}, {"0000000005", "T5", "0000000005", "S5"}, http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index a70602e..9938a30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -1,16 +1,16 @@ package org.apache.phoenix.calcite; -import java.util.Collections; import java.util.List; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -24,8 +24,13 @@ import org.apache.phoenix.calcite.rel.PhoenixTableScan; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.types.PDataType; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + /** * Implementation of Calcite {@link org.apache.calcite.schema.Table} SPI for * Phoenix. @@ -33,16 +38,22 @@ import org.apache.phoenix.schema.types.PDataType; public class PhoenixTable extends AbstractTable implements TranslatableTable { public final PTable pTable; public final ImmutableBitSet pkBitSet; + public final RelCollation collation; public final PhoenixConnection pc; public PhoenixTable(PhoenixConnection pc, PTable pTable) { this.pc = Preconditions.checkNotNull(pc); this.pTable = Preconditions.checkNotNull(pTable); List<Integer> pkPositions = Lists.<Integer> newArrayList(); + List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> newArrayList(); for (PColumn column : pTable.getPKColumns()) { - pkPositions.add(column.getPosition()); + int position = column.getPosition(); + SortOrder sortOrder = column.getSortOrder(); + pkPositions.add(position); + fieldCollations.add(new RelFieldCollation(position, sortOrder == SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING)); } this.pkBitSet = ImmutableBitSet.of(pkPositions); + this.collation = RelCollationTraitDef.INSTANCE.canonize(RelCollations.of(fieldCollations)); } public PTable getTable() { @@ -94,7 +105,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { @Override public List<RelCollation> getCollations() { - return Collections.<RelCollation> emptyList(); + return ImmutableList.<RelCollation> of(collation); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java new file mode 100644 index 0000000..09e6239 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java @@ -0,0 +1,93 @@ +package org.apache.phoenix.calcite.metadata; + +import java.util.List; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.calcite.rel.PhoenixClientJoin; +import org.apache.phoenix.calcite.rel.PhoenixLimit; +import org.apache.phoenix.calcite.rel.PhoenixServerJoin; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class PhoenixRelMdCollation { + public static final RelMetadataProvider SOURCE = + ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.COLLATIONS.method, new PhoenixRelMdCollation()); + + private PhoenixRelMdCollation() { } + + public ImmutableList<RelCollation> collations(PhoenixLimit limit) { + return ImmutableList.copyOf(RelMdCollation.limit(limit.getInput())); + } + + public ImmutableList<RelCollation> collations(PhoenixServerJoin join) { + return ImmutableList.copyOf(hashJoin(join.getLeft(), join.getRight(), join.getJoinType())); + } + + public ImmutableList<RelCollation> collations(PhoenixClientJoin join) { + return ImmutableList.copyOf(PhoenixRelMdCollation.mergeJoin(join.getLeft(), join.getRight(), join.joinInfo.leftKeys, join.joinInfo.rightKeys)); + } + + /** Helper method to determine a {@link PhoenixServerJoin}'s collation. */ + public static List<RelCollation> hashJoin(RelNode left, RelNode right, JoinRelType joinType) { + if (joinType != JoinRelType.FULL) + return ImmutableList.of(); + + // TODO enable the following code, right now would cause some unexpected behaviors. + if (joinType == JoinRelType.RIGHT) { + final ImmutableList<RelCollation> rightCollations = + RelMetadataQuery.collations(right); + if (rightCollations.isEmpty()) + return rightCollations; + + List<RelCollation> newCollations = Lists.<RelCollation> newArrayList(); + final int leftFieldCount = left.getRowType().getFieldCount(); + for (RelCollation collation : rightCollations) { + if (!collation.getFieldCollations().isEmpty()) { + newCollations.add(RelCollations.shift(collation, leftFieldCount)); + } + } + return ImmutableList.copyOf(newCollations); + } + + return RelMetadataQuery.collations(left); + } + + public static List<RelCollation> mergeJoin(RelNode left, RelNode right, + ImmutableIntList leftKeys, ImmutableIntList rightKeys) { + final ImmutableList.Builder<RelCollation> builder = ImmutableList.builder(); + + final ImmutableList<RelCollation> leftCollations = + RelMetadataQuery.collations(left); + assert RelCollations.contains(leftCollations, leftKeys) + : "cannot merge join: left input is not sorted on left keys"; + for (RelCollation collation : leftCollations) { + if (!collation.getFieldCollations().isEmpty()) { + builder.add(collation); + } + } + + final ImmutableList<RelCollation> rightCollations = + RelMetadataQuery.collations(right); + assert RelCollations.contains(rightCollations, rightKeys) + : "cannot merge join: right input is not sorted on right keys"; + final int leftFieldCount = left.getRowType().getFieldCount(); + for (RelCollation collation : rightCollations) { + if (!collation.getFieldCollations().isEmpty()) { + builder.add(RelCollations.shift(collation, leftFieldCount)); + } + } + return builder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java index ea37251..c9412c6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java @@ -9,6 +9,7 @@ public class PhoenixRelMetadataProvider extends ChainedRelMetadataProvider { public PhoenixRelMetadataProvider() { super(ImmutableList.of( PhoenixRelMdRowCount.SOURCE, + PhoenixRelMdCollation.SOURCE, new DefaultRelMetadataProvider())); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java index de5f464..5e42ab3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java @@ -6,6 +6,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; import org.apache.phoenix.parse.JoinTableNode.JoinType; @@ -15,9 +16,11 @@ import org.apache.phoenix.parse.JoinTableNode.JoinType; * relational expression in Phoenix. */ abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel { + public final JoinInfo joinInfo; protected PhoenixAbstractJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType, Set<String> variablesStopped) { super( cluster, traits, left, right, condition, joinType, variablesStopped); + joinInfo = JoinInfo.of(left, right, condition); assert getConvention() == PhoenixRel.CONVENTION; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java index 294defc..5a43269 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java @@ -3,8 +3,6 @@ package org.apache.phoenix.calcite.rel; import java.util.List; import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptCost; -import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelFieldCollation; @@ -12,10 +10,6 @@ import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rel.RelFieldCollation.NullDirection; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.util.Util; -import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; @@ -33,33 +27,11 @@ import com.google.common.collect.Lists; abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel { protected static final double CLIENT_MERGE_FACTOR = 0.5; - private final Integer statelessFetch; - - protected PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { - super(cluster, traits, child, collation, offset, fetch); - Object value = fetch == null ? null : CalciteUtils.evaluateStatelessExpression(fetch); - this.statelessFetch = value == null ? null : ((Number) value).intValue(); + protected PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation) { + super(cluster, traits, child, collation, null, null); assert getConvention() == PhoenixRel.CONVENTION; assert !getCollation().getFieldCollations().isEmpty(); } - - @Override - public RelOptCost computeSelfCost(RelOptPlanner planner) { - // Fix rowCount for super class's computeSelfCost() with input's row count. - double rowCount = RelMetadataQuery.getRowCount(getInput()); - double bytesPerRow = getRowType().getFieldCount() * 4; - return planner.getCostFactory().makeCost( - Util.nLogN(rowCount) * bytesPerRow, rowCount, 0); - } - - @Override - public double getRows() { - double rows = super.getRows(); - if (this.statelessFetch == null) - return rows; - - return Math.min(this.statelessFetch, rows); - } protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) { List<OrderByExpression> orderByExpressions = Lists.newArrayList(); @@ -76,14 +48,4 @@ abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel { return new OrderBy(orderByExpressions); } - - protected Integer getLimit(Implementor implementor) { - if (this.fetch == null) - return null; - - if (this.statelessFetch == null) - throw new UnsupportedOperationException("Stateful limit expression not supported"); - - return this.statelessFetch; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java index 27179b7..360c9b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java @@ -44,7 +44,7 @@ public class PhoenixClientAggregate extends PhoenixAbstractAggregate { public PhoenixClientAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { - return new PhoenixClientAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls); + return create(input, indicator, groupSet, groupSets, aggregateCalls); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java index 4426b73..b7e917d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java @@ -1,24 +1,65 @@ package org.apache.phoenix.calcite.rel; +import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.compile.QueryPlan; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + public class PhoenixClientJoin extends PhoenixAbstractJoin { public static PhoenixClientJoin create(RelNode left, RelNode right, RexNode condition, JoinRelType joinType, Set<String> variablesStopped) { RelOptCluster cluster = left.getCluster(); - RelTraitSet traits = cluster.traitSetOf(PhoenixRel.CONVENTION); - return new PhoenixClientJoin(cluster, traits, left, right, condition, joinType, variablesStopped); + final JoinInfo joinInfo = JoinInfo.of(left, right, condition); + final RelNode sortedLeft = sortInput(left, joinInfo.leftKeys); + final RelNode sortedRight = sortInput(right, joinInfo.rightKeys); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixRel.CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return PhoenixRelMdCollation.mergeJoin(sortedLeft, sortedRight, joinInfo.leftKeys, joinInfo.rightKeys); + } + }); + return new PhoenixClientJoin(cluster, traits, sortedLeft, sortedRight, condition, joinType, variablesStopped); + } + + private static RelNode sortInput(RelNode input, ImmutableIntList sortKeys) { + if (sortKeys.isEmpty()) { + return input; + } + + List<RelFieldCollation> fieldCollations = Lists.newArrayList(); + for (Iterator<Integer> iter = sortKeys.iterator(); iter.hasNext();) { + fieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING)); + } + RelCollation collation = RelCollations.of(fieldCollations); + List<RelCollation> collations = input.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); + if (collations.contains(collation)) { + return input; + } + + return PhoenixClientSort.create(input, collation); } private PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits, @@ -31,7 +72,7 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin { @Override public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left, RelNode right, JoinRelType joinRelType, boolean semiJoinDone) { - return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, variablesStopped); + return create(left, right, condition, joinRelType, variablesStopped); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java index dd900e2..593fba7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java @@ -42,7 +42,7 @@ public class PhoenixClientProject extends PhoenixAbstractProject { @Override public PhoenixClientProject copy(RelTraitSet traits, RelNode input, List<RexNode> projects, RelDataType rowType) { - return new PhoenixClientProject(getCluster(), traits, input, projects, rowType); + return create(input, projects, rowType); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java index ed441ed..bee20e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java @@ -23,24 +23,23 @@ import org.apache.phoenix.schema.TableRef; public class PhoenixClientSort extends PhoenixAbstractSort { - public static PhoenixClientSort create(RelNode input, RelCollation collation, - RexNode offset, RexNode fetch) { + public static PhoenixClientSort create(RelNode input, RelCollation collation) { RelOptCluster cluster = input.getCluster(); collation = RelCollationTraitDef.INSTANCE.canonize(collation); RelTraitSet traits = input.getTraitSet().replace(PhoenixRel.CONVENTION).replace(collation); - return new PhoenixClientSort(cluster, traits, input, collation, offset, fetch); + return new PhoenixClientSort(cluster, traits, input, collation); } private PhoenixClientSort(RelOptCluster cluster, RelTraitSet traits, - RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { - super(cluster, traits, child, collation, offset, fetch); + RelNode child, RelCollation collation) { + super(cluster, traits, child, collation); } @Override public PhoenixClientSort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, RexNode offset, RexNode fetch) { - return new PhoenixClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch); + return create(newInput, newCollation); } @Override @@ -67,9 +66,8 @@ public class PhoenixClientSort extends PhoenixAbstractSort { } OrderBy orderBy = super.getOrderBy(implementor, null); - Integer limit = super.getLimit(implementor); - return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, limit, null, orderBy, plan); + return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java index 94d0cdb..863cd22 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java @@ -17,24 +17,23 @@ import org.apache.phoenix.execute.TupleProjector; public class PhoenixCompactClientSort extends PhoenixAbstractSort { - public static PhoenixCompactClientSort create(RelNode input, RelCollation collation, - RexNode offset, RexNode fetch) { + public static PhoenixCompactClientSort create(RelNode input, RelCollation collation) { RelOptCluster cluster = input.getCluster(); collation = RelCollationTraitDef.INSTANCE.canonize(collation); RelTraitSet traits = input.getTraitSet().replace(PhoenixRel.CONVENTION).replace(collation); - return new PhoenixCompactClientSort(cluster, traits, input, collation, offset, fetch); + return new PhoenixCompactClientSort(cluster, traits, input, collation); } private PhoenixCompactClientSort(RelOptCluster cluster, RelTraitSet traits, - RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { - super(cluster, traits, child, collation, offset, fetch); + RelNode child, RelCollation collation) { + super(cluster, traits, child, collation); } @Override public PhoenixCompactClientSort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, RexNode offset, RexNode fetch) { - return new PhoenixCompactClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch); + return create(newInput, newCollation); } @Override @@ -75,8 +74,7 @@ public class PhoenixCompactClientSort extends PhoenixAbstractSort { } OrderBy orderBy = super.getOrderBy(implementor, tupleProjector); - Integer limit = super.getLimit(implementor); - QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy, limit); + QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy); if (hashJoinPlan != null) { newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java index 2a58a42..f54744d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java @@ -1,11 +1,16 @@ package org.apache.phoenix.calcite.rel; +import java.util.List; + import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.metadata.RelMdCollation; import org.apache.calcite.rex.RexNode; import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; @@ -13,15 +18,24 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.ClientScanPlan; import org.apache.phoenix.expression.Expression; +import com.google.common.base.Supplier; + /** * Implementation of {@link org.apache.calcite.rel.core.Filter} * relational expression in Phoenix. */ public class PhoenixFilter extends Filter implements PhoenixRel { - public static PhoenixFilter create(RelNode input, RexNode condition) { + public static PhoenixFilter create(final RelNode input, final RexNode condition) { RelOptCluster cluster = input.getCluster(); - RelTraitSet traits = input.getTraitSet().replace(PhoenixRel.CONVENTION); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixRel.CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return RelMdCollation.filter(input); + } + }); return new PhoenixFilter(cluster, traits, input, condition); } @@ -31,7 +45,7 @@ public class PhoenixFilter extends Filter implements PhoenixRel { } public PhoenixFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { - return new PhoenixFilter(getCluster(), traitSet, input, condition); + return create(input, condition); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java index 9f31612..ff8ef29 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java @@ -33,7 +33,7 @@ public class PhoenixJoin extends Join implements PhoenixRel { @Override public Join copy(RelTraitSet traits, RexNode condition, RelNode left, RelNode right, JoinRelType joinRelType, boolean semiJoinDone) { - return new PhoenixJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of()); + return create(left, right, condition, joinRelType, ImmutableSet.<String>of()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java index 02776a5..d09c3c6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java @@ -6,9 +6,12 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.RelMdCollation; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.phoenix.calcite.CalciteUtils; @@ -17,14 +20,23 @@ import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.execute.ClientScanPlan; +import com.google.common.base.Supplier; + public class PhoenixLimit extends SingleRel implements PhoenixRel { public final RexNode offset; public final RexNode fetch; public final Integer statelessFetch; - public static PhoenixLimit create(RelNode input, RexNode offset, RexNode fetch) { + public static PhoenixLimit create(final RelNode input, RexNode offset, RexNode fetch) { RelOptCluster cluster = input.getCluster(); - RelTraitSet traits = input.getTraitSet().replace(PhoenixRel.CONVENTION); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixRel.CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return RelMdCollation.limit(input); + } + }); return new PhoenixLimit(cluster, traits, input, offset, fetch); } @@ -41,9 +53,7 @@ public class PhoenixLimit extends SingleRel implements PhoenixRel { public PhoenixLimit copy( RelTraitSet traitSet, List<RelNode> newInputs) { - return new PhoenixLimit( - getCluster(), - traitSet, + return create( sole(newInputs), offset, fetch); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java index 8657a36..0eb2808 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java @@ -38,7 +38,7 @@ public class PhoenixServerAggregate extends PhoenixAbstractAggregate { @Override public PhoenixServerAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { - return new PhoenixServerAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls); + return create(input, indicator, groupSet, groupSets, aggregateCalls); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java index fe84ce7..7a94c0e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java @@ -9,13 +9,15 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.Util; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.compile.JoinCompiler; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.HashJoinPlan; @@ -28,14 +30,23 @@ import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; public class PhoenixServerJoin extends PhoenixAbstractJoin { - public static PhoenixServerJoin create(RelNode left, RelNode right, - RexNode condition, JoinRelType joinType, Set<String> variablesStopped) { + public static PhoenixServerJoin create(final RelNode left, final RelNode right, + RexNode condition, final JoinRelType joinType, + Set<String> variablesStopped) { RelOptCluster cluster = left.getCluster(); - RelTraitSet traits = left.getTraitSet().replace(PhoenixRel.CONVENTION); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixRel.CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return PhoenixRelMdCollation.hashJoin(left, right, joinType); + } + }); return new PhoenixServerJoin(cluster, traits, left, right, condition, joinType, variablesStopped); } @@ -49,7 +60,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin { @Override public PhoenixServerJoin copy(RelTraitSet traits, RexNode condition, RelNode left, RelNode right, JoinRelType joinRelType, boolean semiJoinDone) { - return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, variablesStopped); + return create(left, right, condition, joinRelType, variablesStopped); } @Override @@ -80,7 +91,6 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin { PhoenixRel left = (PhoenixRel) getLeft(); PhoenixRel right = (PhoenixRel) getRight(); - JoinInfo joinInfo = JoinInfo.of(left, right, getCondition()); List<Expression> leftExprs = Lists.<Expression> newArrayList(); List<Expression> rightExprs = Lists.<Expression> newArrayList(); implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java index 710712d..2f201fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java @@ -43,7 +43,7 @@ public class PhoenixServerProject extends PhoenixAbstractProject { @Override public PhoenixServerProject copy(RelTraitSet traits, RelNode input, List<RexNode> projects, RelDataType rowType) { - return new PhoenixServerProject(getCluster(), traits, input, projects, rowType); + return create(input, projects, rowType); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java index 950a730..4f40182 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java @@ -17,24 +17,23 @@ import org.apache.phoenix.execute.ScanPlan; public class PhoenixServerSort extends PhoenixAbstractSort { - public static PhoenixServerSort create(RelNode input, RelCollation collation, - RexNode offset, RexNode fetch) { + public static PhoenixServerSort create(RelNode input, RelCollation collation) { RelOptCluster cluster = input.getCluster(); collation = RelCollationTraitDef.INSTANCE.canonize(collation); RelTraitSet traits = input.getTraitSet().replace(PhoenixRel.CONVENTION).replace(collation); - return new PhoenixServerSort(cluster, traits, input, collation, offset, fetch); + return new PhoenixServerSort(cluster, traits, input, collation); } private PhoenixServerSort(RelOptCluster cluster, RelTraitSet traits, - RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { - super(cluster, traits, child, collation, offset, fetch); + RelNode child, RelCollation collation) { + super(cluster, traits, child, collation); } @Override public PhoenixServerSort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, RexNode offset, RexNode fetch) { - return new PhoenixServerSort(getCluster(), traitSet, newInput, newCollation, offset, fetch); + return create(newInput, newCollation); } @Override @@ -67,10 +66,9 @@ public class PhoenixServerSort extends PhoenixAbstractSort { } OrderBy orderBy = super.getOrderBy(implementor, null); - Integer limit = super.getLimit(implementor); QueryPlan newPlan; try { - newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy, limit); + newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy); } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java index dd6420e..058922c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java @@ -48,7 +48,7 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume } @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new PhoenixToEnumerableConverter(getCluster(), traitSet, sole(inputs)); + return create(sole(inputs)); } @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java index 2885ae5..b961679 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java @@ -28,7 +28,7 @@ public class PhoenixUnion extends Union implements PhoenixRel { @Override public PhoenixUnion copy(RelTraitSet traits, List<RelNode> inputs, boolean all) { - return new PhoenixUnion(getCluster(), traits, inputs, all); + return create(inputs, all); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java index 41320f1..52cd5a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java @@ -2,7 +2,6 @@ package org.apache.phoenix.calcite.rel; import java.util.List; -import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -31,9 +30,8 @@ public class PhoenixValues extends Values implements PhoenixRel { @Override public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) { - assert traitSet.containsIfApplicable(Convention.NONE); assert inputs.isEmpty(); - return new PhoenixValues(getCluster(), rowType, tuples, traitSet); + return create(getCluster(), rowType, tuples); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java index 83812d6..7b627ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java @@ -1,21 +1,10 @@ package org.apache.phoenix.calcite.rules; -import java.util.Iterator; -import java.util.List; - import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.rel.RelCollation; -import org.apache.calcite.rel.RelCollations; -import org.apache.calcite.rel.RelFieldCollation; -import org.apache.calcite.rel.RelFieldCollation.Direction; -import org.apache.calcite.rel.RelFieldCollation.NullDirection; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.JoinInfo; import org.apache.phoenix.calcite.rel.PhoenixClientJoin; -import org.apache.phoenix.calcite.rel.PhoenixClientSort; import org.apache.phoenix.calcite.rel.PhoenixJoin; -import com.google.common.collect.Lists; public class PhoenixClientJoinRule extends RelOptRule { @@ -30,28 +19,9 @@ public class PhoenixClientJoinRule extends RelOptRule { PhoenixJoin join = call.rel(0); RelNode left = join.getLeft(); RelNode right = join.getRight(); - JoinInfo joinInfo = JoinInfo.of(left, right, join.getCondition()); - - RelNode newLeft = left; - RelNode newRight = right; - if (!joinInfo.leftKeys.isEmpty()) { - List<RelFieldCollation> leftFieldCollations = Lists.newArrayList(); - for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { - leftFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING,NullDirection.FIRST)); - } - RelCollation leftCollation = RelCollations.of(leftFieldCollations); - newLeft = PhoenixClientSort.create(left, leftCollation, null, null); - - List<RelFieldCollation> rightFieldCollations = Lists.newArrayList(); - for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) { - rightFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING,NullDirection.FIRST)); - } - RelCollation rightCollation = RelCollations.of(rightFieldCollations); - newRight = PhoenixClientSort.create(right, rightCollation, null, null); - } call.transformTo(PhoenixClientJoin.create( - newLeft, newRight, join.getCondition(), + left, right, join.getCondition(), join.getJoinType(), join.getVariablesStopped())); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java index 7a840ba..b0f3a9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java @@ -24,7 +24,7 @@ public class PhoenixCompactClientSortRule extends RelOptRule { PhoenixClientSort sort = call.rel(0); PhoenixRel input = call.rel(1); call.transformTo(PhoenixCompactClientSort.create( - input, sort.getCollation(), sort.offset, sort.fetch)); + input, sort.getCollation())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index 093966d..016ad0b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -75,17 +75,19 @@ public class PhoenixConverterRules { * {@link PhoenixClientSort}. */ private static class PhoenixSortRule extends PhoenixConverterRule { - private static Predicate<LogicalSort> NON_EMPTY_COLLATION = new Predicate<LogicalSort>() { + private static Predicate<LogicalSort> SORT_ONLY = new Predicate<LogicalSort>() { @Override public boolean apply(LogicalSort input) { - return !input.getCollation().getFieldCollations().isEmpty(); + return !input.getCollation().getFieldCollations().isEmpty() + && input.offset == null + && input.fetch == null; } }; public static final PhoenixSortRule INSTANCE = new PhoenixSortRule(); private PhoenixSortRule() { - super(LogicalSort.class, NON_EMPTY_COLLATION, Convention.NONE, PhoenixRel.CONVENTION, + super(LogicalSort.class, SORT_ONLY, Convention.NONE, PhoenixRel.CONVENTION, "PhoenixSortRule"); } @@ -95,9 +97,7 @@ public class PhoenixConverterRules { convert( sort.getInput(), sort.getInput().getTraitSet().replace(out)), - sort.getCollation(), - sort.offset, - sort.fetch); + sort.getCollation()); } } @@ -106,26 +106,31 @@ public class PhoenixConverterRules { * {@link PhoenixLimit}. */ private static class PhoenixLimitRule extends PhoenixConverterRule { - private static Predicate<LogicalSort> EMPTY_COLLATION = new Predicate<LogicalSort>() { + private static Predicate<LogicalSort> OFFSET_OR_FETCH = new Predicate<LogicalSort>() { @Override public boolean apply(LogicalSort input) { - return input.getCollation().getFieldCollations().isEmpty(); + return input.offset != null + || input.fetch != null; } }; public static final PhoenixLimitRule INSTANCE = new PhoenixLimitRule(); private PhoenixLimitRule() { - super(LogicalSort.class, EMPTY_COLLATION, Convention.NONE, PhoenixRel.CONVENTION, + super(LogicalSort.class, OFFSET_OR_FETCH, Convention.NONE, PhoenixRel.CONVENTION, "PhoenixLimitRule"); } public RelNode convert(RelNode rel) { final LogicalSort sort = (LogicalSort) rel; + RelNode input = convert( + sort.getInput(), + sort.getInput().getTraitSet().replace(out)); + if (!sort.getCollation().getFieldCollations().isEmpty()) { + input = PhoenixClientSort.create(input, sort.getCollation()); + } return PhoenixLimit.create( - convert( - sort.getInput(), - sort.getInput().getTraitSet().replace(out)), + input, sort.offset, sort.fetch); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixInnerSortRemoveRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixInnerSortRemoveRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixInnerSortRemoveRule.java new file mode 100644 index 0000000..f888055 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixInnerSortRemoveRule.java @@ -0,0 +1,29 @@ +package org.apache.phoenix.calcite.rules; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.phoenix.calcite.rel.PhoenixAbstractSort; + +public class PhoenixInnerSortRemoveRule extends RelOptRule { + + public static PhoenixInnerSortRemoveRule INSTANCE = new PhoenixInnerSortRemoveRule(); + + private PhoenixInnerSortRemoveRule() { + super( + operand( + PhoenixAbstractSort.class, + operand( + PhoenixAbstractSort.class, any())), + "PhoenixInnerSortRemoveRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + PhoenixAbstractSort sort = call.rel(0); + PhoenixAbstractSort innerSort = call.rel(1); + call.transformTo(sort.copy(sort.getTraitSet(), + innerSort.getInput(), sort.getCollation(), + sort.offset, sort.fetch)); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java index f09f53a..21959eb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java @@ -32,7 +32,7 @@ public class PhoenixServerSortRule extends RelOptRule { PhoenixClientSort sort = call.rel(0); PhoenixRel input = call.rel(1); call.transformTo(PhoenixServerSort.create( - input, sort.getCollation(), sort.offset, sort.fetch)); + input, sort.getCollation())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index de3a6f0..e19c9e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -72,8 +72,8 @@ public class AggregatePlan extends BaseQueryPlan { private List<KeyRange> splits; private List<List<Scan>> scans; - public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy, Integer newLimit) { - return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), newLimit, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving()); + public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) { + return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving()); } public AggregatePlan( http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c65ef2b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 021da04..cc8f630 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -73,8 +73,8 @@ public class ScanPlan extends BaseQueryPlan { private List<List<Scan>> scans; private boolean allowPageFilter; - public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy, Integer newLimit) throws SQLException { - return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), newLimit, newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter); + public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException { + return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter); } public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
