http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java deleted file mode 100644 index 6e01abb..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.apache.phoenix.calcite; - -import java.util.List; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Values; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexLiteral; -import org.apache.phoenix.compile.QueryPlan; - -import com.google.common.collect.ImmutableList; - -/** - * Implementation of {@link org.apache.calcite.rel.core.Values} - * relational expression in Phoenix. - */ -public class PhoenixValues extends Values implements PhoenixRel { - public PhoenixValues(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) { - super(cluster, rowType, tuples, traits); - assert getConvention() == PhoenixRel.CONVENTION; - } - - @Override - public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) { - assert traitSet.containsIfApplicable(Convention.NONE); - assert inputs.isEmpty(); - return new PhoenixValues(getCluster(), rowType, tuples, traitSet); - } - - @Override - public QueryPlan implement(Implementor implementor) { - throw new UnsupportedOperationException(); - } -}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java new file mode 100644 index 0000000..adc9b63 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java @@ -0,0 +1,119 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.aggregator.ClientAggregators; +import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.expression.function.AggregateFunction; +import org.apache.phoenix.expression.function.SingleAggregateFunction; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.RowKeyValueAccessor; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.collect.Lists; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Aggregate} + * relational expression in Phoenix. + */ +abstract public class PhoenixAbstractAggregate extends Aggregate implements PhoenixRel { + + protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + assert getConvention() == PhoenixRel.CONVENTION; + + for (AggregateCall aggCall : aggCalls) { + if (aggCall.isDistinct()) { + throw new UnsupportedOperationException( "distinct aggregation not supported"); + } + } + switch (getGroupType()) { + case SIMPLE: + break; + default: + throw new UnsupportedOperationException("unsupported group type: " + getGroupType()); + } + } + + protected GroupBy getGroupBy(Implementor implementor) { + if (groupSets.size() > 1) { + throw new UnsupportedOperationException(); + } + + List<Integer> ordinals = groupSet.asList(); + // TODO check order-preserving + String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; + // TODO sort group by keys. not sure if there is a way to avoid this sorting, + // otherwise we would have add an extra projection. + // TODO convert key types. can be avoided? + List<Expression> keyExprs = Lists.newArrayListWithExpectedSize(ordinals.size()); + for (int i = 0; i < ordinals.size(); i++) { + Expression expr = implementor.newColumnExpression(ordinals.get(i)); + keyExprs.add(expr); + } + + return new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build(); + } + + protected void serializeAggregators(Implementor implementor, StatementContext context, boolean isEmptyGroupBy) { + // TODO sort aggFuncs. same problem with group by key sorting. + List<SingleAggregateFunction> aggFuncs = Lists.newArrayList(); + for (AggregateCall call : aggCalls) { + AggregateFunction aggFunc = CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), implementor); + if (!(aggFunc instanceof SingleAggregateFunction)) { + throw new UnsupportedOperationException(); + } + aggFuncs.add((SingleAggregateFunction) aggFunc); + } + int minNullableIndex = getMinNullableIndex(aggFuncs, isEmptyGroupBy); + context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex)); + ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex); + context.getAggregationManager().setAggregators(clientAggregators); + } + + protected static QueryPlan wrapWithProject(Implementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) { + List<Expression> exprs = Lists.newArrayList(); + for (int i = 0; i < keyExpressions.size(); i++) { + Expression keyExpr = keyExpressions.get(i); + RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExpressions, i); + Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType()); + exprs.add(expr); + } + for (SingleAggregateFunction aggFunc : aggFuncs) { + exprs.add(aggFunc); + } + + TupleProjector tupleProjector = implementor.project(exprs); + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); + return new TupleProjectionPlan(plan, tupleProjector, null); + } + + private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) { + int minNullableIndex = aggFuncs.size(); + for (int i = 0; i < aggFuncs.size(); i++) { + SingleAggregateFunction aggFunc = aggFuncs.get(i); + if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) { + minNullableIndex = i; + break; + } + } + return minNullableIndex; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java new file mode 100644 index 0000000..86ad41f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java @@ -0,0 +1,43 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.parse.JoinTableNode.JoinType; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Join} + * relational expression in Phoenix. + */ +abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel { + public PhoenixAbstractJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType, Set<String> variablesStopped) { + super( cluster, traits, left, right, condition, joinType, variablesStopped); + assert getConvention() == PhoenixRel.CONVENTION; + } + + protected static JoinType convertJoinType(JoinRelType type) { + JoinType ret = null; + switch (type) { + case INNER: + ret = JoinType.Inner; + break; + case LEFT: + ret = JoinType.Left; + break; + case RIGHT: + ret = JoinType.Right; + break; + case FULL: + ret = JoinType.Full; + break; + default: + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java new file mode 100644 index 0000000..2c77e9f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java @@ -0,0 +1,40 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.collect.Lists; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Project} + * relational expression in Phoenix. + */ +abstract public class PhoenixAbstractProject extends Project implements PhoenixRel { + public PhoenixAbstractProject(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + assert getConvention() == PhoenixRel.CONVENTION; + } + + protected TupleProjector project(Implementor implementor) { + List<Expression> exprs = Lists.newArrayList(); + for (RexNode project : getProjects()) { + exprs.add(CalciteUtils.toExpression(project, implementor)); + } + TupleProjector tupleProjector = implementor.project(exprs); + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); + + return tupleProjector; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java new file mode 100644 index 0000000..708b5ae --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java @@ -0,0 +1,68 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.schema.SortOrder; + +import com.google.common.collect.Lists; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Sort} + * relational expression in Phoenix. + * + * <p>Like {@code Sort}, it also supports LIMIT and OFFSET. + */ +abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel { + protected static final double CLIENT_MERGE_FACTOR = 0.5; + + public PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + assert getConvention() == PhoenixRel.CONVENTION; + } + + protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) { + assert !getCollation().getFieldCollations().isEmpty(); + + List<OrderByExpression> orderByExpressions = Lists.newArrayList(); + for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) { + Expression expr = tupleProjector == null ? + implementor.newColumnExpression(fieldCollation.getFieldIndex()) + : tupleProjector.getExpressions()[fieldCollation.getFieldIndex()]; + boolean isAscending = fieldCollation.getDirection() == Direction.ASCENDING; + if (expr.getSortOrder() == SortOrder.DESC) { + isAscending = !isAscending; + } + orderByExpressions.add(new OrderByExpression(expr, fieldCollation.nullDirection == NullDirection.LAST, isAscending)); + } + + return new OrderBy(orderByExpressions); + } + + protected Integer getLimit(Implementor implementor) { + if (this.fetch == null) + return null; + + Expression expr = CalciteUtils.toExpression(this.fetch, implementor); + if (!expr.isStateless()) + throw new UnsupportedOperationException("Stateful limit expression not supported"); + + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + expr.evaluate(null, ptr); + return ((Number) (expr.getDataType().toObject(ptr))).intValue(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java new file mode 100644 index 0000000..d66294b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java @@ -0,0 +1,69 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.ClientAggregatePlan; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.TableRef; + +public class PhoenixClientAggregate extends PhoenixAbstractAggregate { + + public PhoenixClientAggregate(RelOptCluster cluster, RelTraitSet traits, + RelNode child, boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + } + + @Override + public PhoenixClientAggregate copy(RelTraitSet traits, RelNode input, + boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { + return new PhoenixClientAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + + TableRef tableRef = implementor.getTableRef(); + PhoenixStatement stmt = plan.getContext().getStatement(); + StatementContext context; + try { + context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + GroupBy groupBy = super.getGroupBy(implementor); + super.serializeAggregators(implementor, context, groupBy.isEmpty()); + + QueryPlan aggPlan = new ClientAggregatePlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan); + + return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions())); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java new file mode 100644 index 0000000..4c7c6b9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java @@ -0,0 +1,54 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.QueryPlan; + +import com.google.common.collect.ImmutableSet; + +public class PhoenixClientJoin extends PhoenixAbstractJoin { + + public PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits, + RelNode left, RelNode right, RexNode condition, + JoinRelType joinType, Set<String> variablesStopped) { + super(cluster, traits, left, right, condition, joinType, + variablesStopped); + } + + @Override + public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left, + RelNode right, JoinRelType joinRelType, boolean semiJoinDone) { + return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of()); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + double rowCount = RelMetadataQuery.getRowCount(this); + + for (RelNode input : getInputs()) { + double inputRowCount = input.getRows(); + if (Double.isInfinite(inputRowCount)) { + rowCount = inputRowCount; + } else { + rowCount += inputRowCount; + } + } + RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0); + + return cost.multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java new file mode 100644 index 0000000..2557b43 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java @@ -0,0 +1,45 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; + +public class PhoenixClientProject extends PhoenixAbstractProject { + + public PhoenixClientProject(RelOptCluster cluster, RelTraitSet traits, + RelNode input, List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public PhoenixClientProject copy(RelTraitSet traits, RelNode input, + List<RexNode> projects, RelDataType rowType) { + return new PhoenixClientProject(getCluster(), traits, input, projects, rowType); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + TupleProjector tupleProjector = project(implementor); + + return new TupleProjectionPlan(plan, tupleProjector, null); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java new file mode 100644 index 0000000..a36d9d0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java @@ -0,0 +1,65 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.TableRef; + +public class PhoenixClientSort extends PhoenixAbstractSort { + + public PhoenixClientSort(RelOptCluster cluster, RelTraitSet traits, + RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + } + + @Override + public PhoenixClientSort copy(RelTraitSet traitSet, RelNode newInput, + RelCollation newCollation, RexNode offset, RexNode fetch) { + return new PhoenixClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + if (this.offset != null) + throw new UnsupportedOperationException(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + + TableRef tableRef = implementor.getTableRef(); + PhoenixStatement stmt = plan.getContext().getStatement(); + StatementContext context; + try { + context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + OrderBy orderBy = super.getOrderBy(implementor, null); + Integer limit = super.getLimit(implementor); + + return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, limit, null, orderBy, plan); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java new file mode 100644 index 0000000..6e93905 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java @@ -0,0 +1,79 @@ +package org.apache.phoenix.calcite.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; + +public class PhoenixCompactClientSort extends PhoenixAbstractSort { + + public PhoenixCompactClientSort(RelOptCluster cluster, RelTraitSet traits, + RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + } + + @Override + public PhoenixCompactClientSort copy(RelTraitSet traitSet, RelNode newInput, + RelCollation newCollation, RexNode offset, RexNode fetch) { + return new PhoenixCompactClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner) + .multiplyBy(CLIENT_MERGE_FACTOR) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + if (this.offset != null) + throw new UnsupportedOperationException(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + assert plan instanceof TupleProjectionPlan; + + // PhoenixServerAggregate wraps the AggregatePlan with a TupleProjectionPlan, + // so we need to unwrap the TupleProjectionPlan. + TupleProjectionPlan tupleProjectionPlan = (TupleProjectionPlan) plan; + assert tupleProjectionPlan.getPostFilter() == null; + QueryPlan innerPlan = tupleProjectionPlan.getDelegate(); + TupleProjector tupleProjector = tupleProjectionPlan.getTupleProjector(); + assert (innerPlan instanceof AggregatePlan + || innerPlan instanceof HashJoinPlan) + && innerPlan.getLimit() == null; + + AggregatePlan basePlan; + HashJoinPlan hashJoinPlan = null; + if (innerPlan instanceof AggregatePlan) { + basePlan = (AggregatePlan) innerPlan; + } else { + hashJoinPlan = (HashJoinPlan) innerPlan; + QueryPlan delegate = hashJoinPlan.getDelegate(); + assert delegate instanceof AggregatePlan; + basePlan = (AggregatePlan) delegate; + } + + OrderBy orderBy = super.getOrderBy(implementor, tupleProjector); + Integer limit = super.getLimit(implementor); + QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy, limit); + + if (hashJoinPlan != null) { + newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); + } + // Recover the wrapping of TupleProjectionPlan + newPlan = new TupleProjectionPlan(newPlan, tupleProjector, null); + return newPlan; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java new file mode 100644 index 0000000..0827d74 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java @@ -0,0 +1,42 @@ +package org.apache.phoenix.calcite.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.expression.Expression; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Filter} + * relational expression in Phoenix. + */ +public class PhoenixFilter extends Filter implements PhoenixRel { + public PhoenixFilter(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode condition) { + super(cluster, traits, input, condition); + assert getConvention() == PhoenixRel.CONVENTION; + } + + public PhoenixFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new PhoenixFilter(getCluster(), traitSet, input, condition); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); + } + + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + Expression expr = CalciteUtils.toExpression(condition, implementor); + return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), + plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java new file mode 100644 index 0000000..142fb35 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java @@ -0,0 +1,42 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.QueryPlan; + +import com.google.common.collect.ImmutableSet; + +public class PhoenixJoin extends Join implements PhoenixRel { + + public PhoenixJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, + RelNode right, RexNode condition, JoinRelType joinType, + Set<String> variablesStopped) { + super(cluster, traits, left, right, condition, joinType, + variablesStopped); + } + + @Override + public Join copy(RelTraitSet traits, RexNode condition, RelNode left, + RelNode right, JoinRelType joinRelType, boolean semiJoinDone) { + return new PhoenixJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of()); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return planner.getCostFactory().makeCost(Double.POSITIVE_INFINITY, 0, 0); + } + + @Override + public QueryPlan implement(Implementor implementor) { + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java new file mode 100644 index 0000000..c7cc60d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java @@ -0,0 +1,73 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; + +/** + * Relational expression in Phoenix. + * + * <p>Phoenix evaluates relational expressions using {@link java.util.Iterator}s + * over streams of {@link org.apache.phoenix.schema.tuple.Tuple}s.</p> + */ +public interface PhoenixRel extends RelNode { + /** Calling convention for relational operations that occur in Phoenix. */ + Convention CONVENTION = new Convention.Impl("PHOENIX", PhoenixRel.class); + + /** Relative cost of Phoenix versus Enumerable convention. + * + * <p>Multiply by the value (which is less than unity), and you will get a cheaper cost. + * Phoenix is cheaper. + */ + double PHOENIX_FACTOR = 0.5; + + /** Relative cost of server plan versus client plan. + * + * <p>Multiply by the value (which is less than unity), and you will get a cheaper cost. + * Server is cheaper. + */ + double SERVER_FACTOR = 0.2; + + QueryPlan implement(Implementor implementor); + + class ImplementorContext { + private boolean retainPKColumns; + private boolean forceProject; + + public ImplementorContext(boolean retainPKColumns, boolean forceProject) { + this.retainPKColumns = retainPKColumns; + this.forceProject = forceProject; + } + + public boolean isRetainPKColumns() { + return this.retainPKColumns; + } + + public boolean forceProject() { + return this.forceProject; + } + } + + /** Holds context for an traversal over a tree of relational expressions + * to convert it to an executable plan. */ + interface Implementor { + QueryPlan visitInput(int i, PhoenixRel input); + ColumnExpression newColumnExpression(int index); + void setTableRef(TableRef tableRef); + TableRef getTableRef(); + void pushContext(ImplementorContext context); + ImplementorContext popContext(); + ImplementorContext getCurrentContext(); + PTable createProjectedTable(); + RowProjector createRowProjector(); + TupleProjector project(List<Expression> exprs); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java new file mode 100644 index 0000000..2ae3838 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@ -0,0 +1,131 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Stack; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.collect.Lists; + +public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { + private TableRef tableRef; + private Stack<ImplementorContext> contextStack; + + public PhoenixRelImplementorImpl() { + this.contextStack = new Stack<ImplementorContext>(); + pushContext(new ImplementorContext(true, false)); + } + + @Override + public QueryPlan visitInput(int i, PhoenixRel input) { + return input.implement(this); + } + + @Override + public ColumnExpression newColumnExpression(int index) { + ColumnRef colRef = new ColumnRef(this.tableRef, index); + return colRef.newColumnExpression(); + } + + + @Override + public void setTableRef(TableRef tableRef) { + this.tableRef = tableRef; + } + + @Override + public TableRef getTableRef() { + return this.tableRef; + } + + @Override + public void pushContext(ImplementorContext context) { + this.contextStack.push(context); + } + + @Override + public ImplementorContext popContext() { + return contextStack.pop(); + } + + @Override + public ImplementorContext getCurrentContext() { + return contextStack.peek(); + } + + @Override + public PTable createProjectedTable() { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + for (PColumn column : getTableRef().getTable().getColumns()) { + sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition())); + } + + try { + return TupleProjectionCompiler.createProjectedTable(getTableRef(), sourceColumnRefs, getCurrentContext().isRetainPKColumns()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public RowProjector createRowProjector() { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (PColumn column : getTableRef().getTable().getColumns()) { + Expression expr = newColumnExpression(column.getPosition()); + columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } + + @Override + public TupleProjector project(List<Expression> exprs) { + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + List<PColumn> columns = Lists.<PColumn>newArrayList(); + for (int i = 0; i < exprs.size(); i++) { + String name = ParseNodeFactory.createTempAlias(); + Expression expr = exprs.get(i); + builder.addField(expr); + columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), + i, expr.getSortOrder(), null, null, false, name)); + } + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, + null, null); + this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java new file mode 100644 index 0000000..3511699 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java @@ -0,0 +1,74 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.Arrays; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.ScanPlan; + +public class PhoenixServerAggregate extends PhoenixAbstractAggregate { + + public PhoenixServerAggregate(RelOptCluster cluster, RelTraitSet traits, + RelNode child, boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + } + + @Override + public PhoenixServerAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { + return new PhoenixServerAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner) + .multiplyBy(SERVER_FACTOR) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + assert (plan instanceof ScanPlan + || plan instanceof HashJoinPlan) + && plan.getLimit() == null; + + ScanPlan basePlan; + HashJoinPlan hashJoinPlan = null; + if (plan instanceof ScanPlan) { + basePlan = (ScanPlan) plan; + } else { + hashJoinPlan = (HashJoinPlan) plan; + QueryPlan delegate = hashJoinPlan.getDelegate(); + assert delegate instanceof ScanPlan; + basePlan = (ScanPlan) delegate; + } + + StatementContext context = basePlan.getContext(); + GroupBy groupBy = super.getGroupBy(implementor); + super.serializeAggregators(implementor, context, groupBy.isEmpty()); + + QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null); + if (hashJoinPlan != null) { + aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); + } + + return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions())); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java new file mode 100644 index 0000000..8a4811a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java @@ -0,0 +1,125 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.compile.JoinCompiler; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +public class PhoenixServerJoin extends PhoenixAbstractJoin { + + public PhoenixServerJoin(RelOptCluster cluster, RelTraitSet traits, + RelNode left, RelNode right, RexNode condition, + JoinRelType joinType, Set<String> variablesStopped) { + super(cluster, traits, left, right, condition, joinType, + variablesStopped); + } + + @Override + public PhoenixServerJoin copy(RelTraitSet traits, RexNode condition, RelNode left, + RelNode right, JoinRelType joinRelType, boolean semiJoinDone) { + return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of()); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + //TODO return infinite cost if RHS size exceeds memory limit. + + double rowCount = RelMetadataQuery.getRowCount(this); + + for (RelNode input : getInputs()) { + double inputRowCount = input.getRows(); + if (Double.isInfinite(inputRowCount)) { + rowCount = inputRowCount; + } else if (input == getLeft()) { + rowCount += inputRowCount; + } else { + rowCount += Util.nLogN(inputRowCount); + } + } + RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0); + + return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getLeft().getConvention() == PhoenixRel.CONVENTION; + assert getRight().getConvention() == PhoenixRel.CONVENTION; + PhoenixRel left = (PhoenixRel) getLeft(); + PhoenixRel right = (PhoenixRel) getRight(); + + JoinInfo joinInfo = JoinInfo.of(left, right, getCondition()); + List<Expression> leftExprs = Lists.<Expression> newArrayList(); + List<Expression> rightExprs = Lists.<Expression> newArrayList(); + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true)); + QueryPlan leftPlan = implementor.visitInput(0, left); + PTable leftTable = implementor.getTableRef().getTable(); + for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { + Integer index = iter.next(); + leftExprs.add(implementor.newColumnExpression(index)); + } + if (leftExprs.isEmpty()) { + leftExprs.add(LiteralExpression.newConstant(0)); + } + implementor.popContext(); + implementor.pushContext(new ImplementorContext(false, true)); + QueryPlan rightPlan = implementor.visitInput(1, right); + PTable rightTable = implementor.getTableRef().getTable(); + for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) { + Integer index = iter.next(); + rightExprs.add(implementor.newColumnExpression(index)); + } + if (rightExprs.isEmpty()) { + rightExprs.add(LiteralExpression.newConstant(0)); + } + implementor.popContext(); + + JoinType type = convertJoinType(getJoinType()); + PTable joinedTable; + try { + joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + implementor.setTableRef(new TableRef(joinedTable)); + RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder()); + Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter, implementor); + @SuppressWarnings("unchecked") + HashJoinInfo hashJoinInfo = new HashJoinInfo( + joinedTable, new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, + (List<Expression>[]) (new List[] {leftExprs}), + new JoinType[] {type}, new boolean[] {true}, + new PTable[] {rightTable}, + new int[] {leftTable.getColumns().size() - leftTable.getPKColumns().size()}, + postFilterExpr, null); + + return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)}); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java new file mode 100644 index 0000000..f9de2ee --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java @@ -0,0 +1,51 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.TupleProjector; + +public class PhoenixServerProject extends PhoenixAbstractProject { + + public PhoenixServerProject(RelOptCluster cluster, RelTraitSet traits, + RelNode input, List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public PhoenixServerProject copy(RelTraitSet traits, RelNode input, + List<RexNode> projects, RelDataType rowType) { + return new PhoenixServerProject(getCluster(), traits, input, projects, rowType); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner) + .multiplyBy(SERVER_FACTOR) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), false)); + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + implementor.popContext(); + assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan) + && !TupleProjector.hasProjector(plan.getContext().getScan(), plan instanceof ScanPlan); + + TupleProjector tupleProjector = super.project(implementor); + TupleProjector.serializeProjectorIntoScan(plan.getContext().getScan(), tupleProjector, plan instanceof ScanPlan); + return plan; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java new file mode 100644 index 0000000..eb4c315 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java @@ -0,0 +1,74 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.ScanPlan; + +public class PhoenixServerSort extends PhoenixAbstractSort { + + public PhoenixServerSort(RelOptCluster cluster, RelTraitSet traits, + RelNode child, RelCollation collation, RexNode offset, RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + } + + @Override + public PhoenixServerSort copy(RelTraitSet traitSet, RelNode newInput, + RelCollation newCollation, RexNode offset, RexNode fetch) { + return new PhoenixServerSort(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner) + .multiplyBy(SERVER_FACTOR) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + assert getConvention() == getInput().getConvention(); + if (this.offset != null) + throw new UnsupportedOperationException(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + assert (plan instanceof ScanPlan + || plan instanceof HashJoinPlan) + && plan.getLimit() == null; + + ScanPlan basePlan; + HashJoinPlan hashJoinPlan = null; + if (plan instanceof ScanPlan) { + basePlan = (ScanPlan) plan; + } else { + hashJoinPlan = (HashJoinPlan) plan; + QueryPlan delegate = hashJoinPlan.getDelegate(); + assert delegate instanceof ScanPlan; + basePlan = (ScanPlan) delegate; + } + + OrderBy orderBy = super.getOrderBy(implementor, null); + Integer limit = super.getLimit(implementor); + QueryPlan newPlan; + try { + newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy, limit); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (hashJoinPlan != null) { + newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); + } + return newPlan; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java new file mode 100644 index 0000000..7902e27 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@ -0,0 +1,166 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.rules.PhoenixClientJoinRule; +import org.apache.phoenix.calcite.rules.PhoenixCompactClientSortRule; +import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule; +import org.apache.phoenix.calcite.rules.PhoenixConverterRules; +import org.apache.phoenix.calcite.rules.PhoenixServerAggregateRule; +import org.apache.phoenix.calcite.rules.PhoenixServerJoinRule; +import org.apache.phoenix.calcite.rules.PhoenixServerProjectRule; +import org.apache.phoenix.calcite.rules.PhoenixServerSortRule; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; +import org.apache.phoenix.compile.WhereOptimizer; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.iterate.ParallelIteratorFactory; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; + +/** + * Scan of a Phoenix table. + */ +public class PhoenixTableScan extends TableScan implements PhoenixRel { + public final RexNode filter; + + public PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter) { + super(cluster, traits, table); + this.filter = filter; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return this; + } + + @Override + public void register(RelOptPlanner planner) { + RelOptRule[] rules = PhoenixConverterRules.RULES; + for (RelOptRule rule : rules) { + planner.addRule(rule); + } + planner.addRule(PhoenixFilterScanMergeRule.INSTANCE); + planner.addRule(PhoenixServerProjectRule.PROJECT_SCAN); + planner.addRule(PhoenixServerProjectRule.PROJECT_SERVERJOIN); + planner.addRule(PhoenixServerJoinRule.JOIN_SCAN); + planner.addRule(PhoenixServerJoinRule.JOIN_SERVERPROJECT_SCAN); + planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SCAN); + planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SERVERJOIN); + planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SERVERPROJECT); + planner.addRule(PhoenixServerSortRule.SORT_SCAN); + planner.addRule(PhoenixServerSortRule.SORT_SERVERJOIN); + planner.addRule(PhoenixServerSortRule.SORT_SERVERPROJECT); + planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE); + planner.addRule(PhoenixClientJoinRule.INSTANCE); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("filter", filter, filter != null); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + RelOptCost cost = super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); + if (filter != null && !filter.isAlwaysTrue()) { + final Double selectivity = RelMetadataQuery.getSelectivity(this, filter); + cost = cost.multiplyBy(selectivity); + } + return cost; + } + + @Override + public double getRows() { + return super.getRows() + * RelMetadataQuery.getSelectivity(this, filter); + } + + @Override + public QueryPlan implement(Implementor implementor) { + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + PTable pTable = phoenixTable.getTable(); + TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); + implementor.setTableRef(tableRef); + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableRef); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + SelectStatement select = SelectStatement.SELECT_STAR; + if (filter != null) { + Expression filterExpr = CalciteUtils.toExpression(filter, implementor); + filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, filterExpr, true, false); + } + projectAllColumnFamilies(context.getScan(), phoenixTable.getTable()); + if (implementor.getCurrentContext().forceProject()) { + TupleProjector tupleProjector = createTupleProjector(implementor, phoenixTable.getTable()); + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); + } + Integer limit = null; + OrderBy orderBy = OrderBy.EMPTY_ORDER_BY; + ParallelIteratorFactory iteratorFactory = null; + return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private TupleProjector createTupleProjector(Implementor implementor, PTable table) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (PColumn column : table.getColumns()) { + if (!SchemaUtil.isPKColumn(column) || !implementor.getCurrentContext().isRetainPKColumns()) { + Expression expr = implementor.newColumnExpression(column.getPosition()); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + // TODO only project needed columns + private void projectAllColumnFamilies(Scan scan, PTable table) { + scan.getFamilyMap().clear(); + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java new file mode 100644 index 0000000..3916102 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java @@ -0,0 +1,102 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.MethodCallExpression; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.phoenix.calcite.BuiltInMethod; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.execute.DelegateQueryPlan; +import org.apache.phoenix.iterate.ResultIterator; + +/** + * Scan of a Phoenix table. + */ +public class PhoenixToEnumerableConverter extends ConverterImpl implements EnumerableRel { + public PhoenixToEnumerableConverter( + RelOptCluster cluster, + RelTraitSet traits, + RelNode input) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new PhoenixToEnumerableConverter(getCluster(), traitSet, sole(inputs)); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(.1); + } + + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + // Generates code that instantiates a result iterator, then converts it + // to an enumerable. + // + // ResultIterator iterator = root.get("x"); + // return CalciteRuntime.toEnumerable(iterator); + final BlockBuilder list = new BlockBuilder(); + QueryPlan plan = makePlan((PhoenixRel)getInput()); + Expression var = stash(implementor, plan, QueryPlan.class); + final RelDataType rowType = getRowType(); + final PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), rowType, + pref.prefer(JavaRowFormat.ARRAY)); + final Expression iterator_ = + list.append("iterator", var); + final Expression enumerable_ = + list.append("enumerable", + Expressions.call(BuiltInMethod.TO_ENUMERABLE.method, + iterator_)); + list.add(Expressions.return_(null, enumerable_)); + return implementor.result(physType, list.toBlock()); + } + + static QueryPlan makePlan(PhoenixRel rel) { + final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl(); + final QueryPlan plan = phoenixImplementor.visitInput(0, rel); + return new DelegateQueryPlan(plan) { + @Override + public ResultIterator iterator() throws SQLException { + return delegate.iterator(); + } + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return delegate.getExplainPlan(); + } + @Override + public RowProjector getProjector() { + return phoenixImplementor.createRowProjector(); + } + }; + } + + static Expression stash(EnumerableRelImplementor implementor, Object o, Class clazz) { + ParameterExpression x = (ParameterExpression) implementor.stash(o, clazz); + MethodCallExpression e = + Expressions.call(implementor.getRootExpression(), + org.apache.calcite.util.BuiltInMethod.DATA_CONTEXT_GET.method, + Expressions.constant(x.name)); + return Expressions.convert_(e, clazz); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java new file mode 100644 index 0000000..787b2f1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java @@ -0,0 +1,37 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Union; +import org.apache.phoenix.compile.QueryPlan; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Union} + * relational expression in Phoenix. + */ +public class PhoenixUnion extends Union implements PhoenixRel { + public PhoenixUnion(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { + super(cluster, traits, inputs, all); + assert getConvention() == PhoenixRel.CONVENTION; + } + + @Override + public PhoenixUnion copy(RelTraitSet traits, List<RelNode> inputs, boolean all) { + return new PhoenixUnion(getCluster(), traits, inputs, all); + } + + @Override + public QueryPlan implement(Implementor implementor) { + for (RelNode input : getInputs()) { + assert getConvention() == input.getConvention(); + } + for (Ord<RelNode> input : Ord.zip(inputs)) { + implementor.visitInput(input.i, (PhoenixRel) input.e); + } + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java new file mode 100644 index 0000000..f1a626b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java @@ -0,0 +1,37 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.phoenix.compile.QueryPlan; + +import com.google.common.collect.ImmutableList; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Values} + * relational expression in Phoenix. + */ +public class PhoenixValues extends Values implements PhoenixRel { + public PhoenixValues(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + assert getConvention() == PhoenixRel.CONVENTION; + } + + @Override + public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert traitSet.containsIfApplicable(Convention.NONE); + assert inputs.isEmpty(); + return new PhoenixValues(getCluster(), rowType, tuples, traitSet); + } + + @Override + public QueryPlan implement(Implementor implementor) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java new file mode 100644 index 0000000..99ba81f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java @@ -0,0 +1,55 @@ +package org.apache.phoenix.calcite.rules; + +import java.util.Iterator; +import java.util.List; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.phoenix.calcite.rel.PhoenixClientJoin; +import org.apache.phoenix.calcite.rel.PhoenixClientSort; +import org.apache.phoenix.calcite.rel.PhoenixJoin; +import org.apache.phoenix.calcite.rel.PhoenixRel; +import com.google.common.collect.Lists; + +public class PhoenixClientJoinRule extends RelOptRule { + + public static PhoenixClientJoinRule INSTANCE = new PhoenixClientJoinRule(); + + public PhoenixClientJoinRule() { + super(operand(PhoenixJoin.class, any()), "PhoenixClientJoinRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + PhoenixJoin join = call.rel(0); + RelNode left = join.getLeft(); + RelNode right = join.getRight(); + JoinInfo joinInfo = JoinInfo.of(left, right, join.getCondition()); + + List<RelFieldCollation> leftFieldCollations = Lists.newArrayList(); + for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { + leftFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING,NullDirection.FIRST)); + } + RelCollation leftCollation = RelCollations.of(leftFieldCollations); + RelNode newLeft = new PhoenixClientSort(left.getCluster(), left.getTraitSet().replace(PhoenixRel.CONVENTION).replace(leftCollation), left, leftCollation, null, null); + + List<RelFieldCollation> rightFieldCollations = Lists.newArrayList(); + for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) { + rightFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING,NullDirection.FIRST)); + } + RelCollation rightCollation = RelCollations.of(rightFieldCollations); + RelNode newRight = new PhoenixClientSort(right.getCluster(), right.getTraitSet().replace(PhoenixRel.CONVENTION).replace(rightCollation), right, rightCollation, null, null); + + call.transformTo(new PhoenixClientJoin(join.getCluster(), + join.getTraitSet(), newLeft, newRight, join.getCondition(), + join.getJoinType(), join.getVariablesStopped())); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java index 63cd60e..d1f4ec7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java @@ -2,10 +2,10 @@ package org.apache.phoenix.calcite.rules; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.phoenix.calcite.PhoenixClientSort; -import org.apache.phoenix.calcite.PhoenixCompactClientSort; -import org.apache.phoenix.calcite.PhoenixRel; -import org.apache.phoenix.calcite.PhoenixServerAggregate; +import org.apache.phoenix.calcite.rel.PhoenixClientSort; +import org.apache.phoenix.calcite.rel.PhoenixCompactClientSort; +import org.apache.phoenix.calcite.rel.PhoenixRel; +import org.apache.phoenix.calcite.rel.PhoenixServerAggregate; public class PhoenixCompactClientSortRule extends RelOptRule { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index e426637..c6a5d36 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -11,17 +11,19 @@ import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.logical.LogicalUnion; import org.apache.calcite.util.trace.CalciteTrace; -import org.apache.phoenix.calcite.PhoenixAggregate; -import org.apache.phoenix.calcite.PhoenixClientAggregate; -import org.apache.phoenix.calcite.PhoenixClientJoin; -import org.apache.phoenix.calcite.PhoenixClientProject; -import org.apache.phoenix.calcite.PhoenixClientSort; -import org.apache.phoenix.calcite.PhoenixFilter; -import org.apache.phoenix.calcite.PhoenixProject; -import org.apache.phoenix.calcite.PhoenixRel; -import org.apache.phoenix.calcite.PhoenixSort; -import org.apache.phoenix.calcite.PhoenixToEnumerableConverter; -import org.apache.phoenix.calcite.PhoenixUnion; +import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate; +import org.apache.phoenix.calcite.rel.PhoenixClientAggregate; +import org.apache.phoenix.calcite.rel.PhoenixClientProject; +import org.apache.phoenix.calcite.rel.PhoenixClientSort; +import org.apache.phoenix.calcite.rel.PhoenixFilter; +import org.apache.phoenix.calcite.rel.PhoenixAbstractProject; +import org.apache.phoenix.calcite.rel.PhoenixJoin; +import org.apache.phoenix.calcite.rel.PhoenixRel; +import org.apache.phoenix.calcite.rel.PhoenixAbstractSort; +import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter; +import org.apache.phoenix.calcite.rel.PhoenixUnion; + +import com.google.common.base.Predicate; import java.util.logging.Logger; @@ -50,24 +52,41 @@ public class PhoenixConverterRules { abstract static class PhoenixConverterRule extends ConverterRule { protected final Convention out; public PhoenixConverterRule( - Class<? extends RelNode> clazz, - RelTrait in, - Convention out, - String description) { + Class<? extends RelNode> clazz, + RelTrait in, + Convention out, + String description) { super(clazz, in, out, description); this.out = out; } + + public <R extends RelNode> PhoenixConverterRule( + Class<R> clazz, + Predicate<? super R> predicate, + RelTrait in, + Convention out, + String description) { + super(clazz, predicate, in, out, description); + this.out = out; + } } /** * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a - * {@link PhoenixSort}. + * {@link PhoenixAbstractSort}. */ private static class PhoenixSortRule extends PhoenixConverterRule { + private static Predicate<LogicalSort> NON_EMPTY_COLLATION = new Predicate<LogicalSort>() { + @Override + public boolean apply(LogicalSort input) { + return !input.getCollation().getFieldCollations().isEmpty(); + } + }; + public static final PhoenixSortRule INSTANCE = new PhoenixSortRule(); private PhoenixSortRule() { - super(LogicalSort.class, Convention.NONE, PhoenixRel.CONVENTION, + super(LogicalSort.class, NON_EMPTY_COLLATION, Convention.NONE, PhoenixRel.CONVENTION, "PhoenixSortRule"); } @@ -107,7 +126,7 @@ public class PhoenixConverterRules { /** * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject} - * to a {@link PhoenixProject}. + * to a {@link PhoenixAbstractProject}. */ private static class PhoenixProjectRule extends PhoenixConverterRule { private static final PhoenixProjectRule INSTANCE = new PhoenixProjectRule(); @@ -128,7 +147,7 @@ public class PhoenixConverterRules { /** * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalAggregate} - * to an {@link PhoenixAggregate}. + * to an {@link PhoenixAbstractAggregate}. */ private static class PhoenixAggregateRule extends PhoenixConverterRule { public static final RelOptRule INSTANCE = new PhoenixAggregateRule(); @@ -175,7 +194,7 @@ public class PhoenixConverterRules { /** * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a - * {@link PhoenixSort}. + * {@link PhoenixAbstractSort}. */ private static class PhoenixJoinRule extends PhoenixConverterRule { public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule(); @@ -189,7 +208,7 @@ public class PhoenixConverterRules { final LogicalJoin join = (LogicalJoin) rel; final RelTraitSet traitSet = join.getTraitSet().replace(out); - return new PhoenixClientJoin(rel.getCluster(), traitSet, + return new PhoenixJoin(rel.getCluster(), traitSet, convert(join.getLeft(), join.getLeft().getTraitSet().replace(out)), convert(join.getRight(), join.getRight().getTraitSet().replace(out)), join.getCondition(), @@ -343,7 +362,7 @@ public class PhoenixConverterRules { /** * Rule to convert a relational expression from - * {@link org.apache.phoenix.calcite.PhoenixRel#CONVENTION} to + * {@link org.apache.phoenix.calcite.rel.PhoenixRel#CONVENTION} to * {@link org.apache.calcite.adapter.enumerable.EnumerableConvention}. */ public static class PhoenixToEnumerableConverterRule extends ConverterRule { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java index a456c7a..dd0f119 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java @@ -5,7 +5,7 @@ import com.google.common.base.Predicate; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Filter; -import org.apache.phoenix.calcite.PhoenixTableScan; +import org.apache.phoenix.calcite.rel.PhoenixTableScan; public class PhoenixFilterScanMergeRule extends RelOptRule {
