Fix for merge
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8c19e1c1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8c19e1c1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8c19e1c1 Branch: refs/heads/calcite Commit: 8c19e1c13f2c59c4ff8a974cf351e5a3faae3eab Parents: 7167262 2acb38a Author: maryannxue <[email protected]> Authored: Thu Sep 10 09:58:30 2015 -0400 Committer: maryannxue <[email protected]> Committed: Thu Sep 10 09:58:30 2015 -0400 ---------------------------------------------------------------------- .../phoenix/end2end/RowValueConstructorIT.java | 69 ++++++ .../org/apache/phoenix/end2end/SortOrderIT.java | 27 ++ .../java/org/apache/phoenix/end2end/ViewIT.java | 24 ++ .../iterate/MockParallelIteratorFactory.java | 47 ++++ .../iterate/MockTableResultIterator.java | 66 +++++ .../iterate/RoundRobinResultIteratorIT.java | 40 ++- .../RoundRobinResultIteratorWithStatsIT.java | 104 ++++++++ .../phoenix/calcite/rel/PhoenixTableScan.java | 2 +- .../phoenix/calcite/rel/PhoenixValues.java | 2 +- .../phoenix/compile/OrderPreservingTracker.java | 21 +- .../org/apache/phoenix/compile/ScanRanges.java | 136 +++++++--- .../apache/phoenix/compile/WhereOptimizer.java | 72 +++--- .../coprocessor/MetaDataEndpointImpl.java | 7 +- .../apache/phoenix/execute/AggregatePlan.java | 13 +- .../apache/phoenix/execute/BaseQueryPlan.java | 22 +- .../apache/phoenix/execute/CorrelatePlan.java | 218 ++++++++++++++++ .../phoenix/execute/DegenerateQueryPlan.java | 2 +- .../apache/phoenix/execute/HashJoinPlan.java | 9 +- .../execute/LiteralResultIterationPlan.java | 13 +- .../apache/phoenix/execute/RuntimeContext.java | 33 +++ .../phoenix/execute/RuntimeContextImpl.java | 86 +++++++ .../org/apache/phoenix/execute/ScanPlan.java | 25 +- .../phoenix/execute/SortMergeJoinPlan.java | 5 +- .../apache/phoenix/execute/UnnestArrayPlan.java | 8 +- .../CorrelateVariableFieldAccessExpression.java | 75 ++++++ .../phoenix/expression/ExpressionType.java | 11 +- .../visitor/CloneExpressionVisitor.java | 6 + .../expression/visitor/ExpressionVisitor.java | 2 + .../StatelessTraverseAllExpressionVisitor.java | 7 + .../StatelessTraverseNoExpressionVisitor.java | 7 + .../apache/phoenix/filter/SkipScanFilter.java | 28 ++- .../phoenix/index/PhoenixIndexBuilder.java | 5 +- .../phoenix/iterate/BaseResultIterators.java | 14 +- .../apache/phoenix/iterate/ExplainTable.java | 7 +- .../UngroupedAggregatingResultIterator.java | 3 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 16 ++ .../apache/phoenix/jdbc/PhoenixResultSet.java | 6 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 3 +- .../apache/phoenix/optimize/QueryOptimizer.java | 6 +- .../org/apache/phoenix/util/KeyValueUtil.java | 91 ++----- .../java/org/apache/phoenix/util/ScanUtil.java | 49 ++-- .../phoenix/compile/QueryCompilerTest.java | 18 +- .../compile/ScanRangesIntersectTest.java | 37 +-- .../apache/phoenix/compile/ScanRangesTest.java | 3 +- .../TenantSpecificViewIndexCompileTest.java | 172 +++++++++++++ .../phoenix/compile/ViewCompilerTest.java | 1 - .../phoenix/compile/WhereOptimizerTest.java | 22 +- .../phoenix/execute/CorrelatePlanTest.java | 248 +++++++++++++++++++ .../phoenix/execute/UnnestArrayPlanTest.java | 6 +- .../query/ParallelIteratorsSplitTest.java | 3 +- .../org/apache/phoenix/query/QueryPlanTest.java | 10 +- .../org/apache/phoenix/util/ScanUtilTest.java | 10 +- 52 files changed, 1617 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index b8e97ed,0000000..171dd26 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@@ -1,229 -1,0 +1,229 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; +import org.apache.phoenix.compile.WhereOptimizer; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.iterate.ParallelIteratorFactory; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Scan of a Phoenix table. + */ +public class PhoenixTableScan extends TableScan implements PhoenixRel { + public final RexNode filter; + + private final ScanRanges scanRanges; + + /** + * This will not make a difference in implement(), but rather give a more accurate + * estimate of the row count. + */ + public final Integer statelessFetch; + + public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table, + RexNode filter, Integer statelessFetch) { + final RelTraitSet traits = + cluster.traitSetOf(PhoenixRel.SERVER_CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + if (table != null) { + return table.unwrap(PhoenixTable.class).getStatistic().getCollations(); + } + return ImmutableList.of(); + } + }); + return new PhoenixTableScan(cluster, traits, table, filter, statelessFetch); + } + + private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter, Integer statelessFetch) { + super(cluster, traits, table); + this.filter = filter; + this.statelessFetch = statelessFetch; + + ScanRanges scanRanges = null; + if (filter != null) { + try { + // TODO simplify this code + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + PTable pTable = phoenixTable.getTable(); + TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); + Implementor tmpImplementor = new PhoenixRelImplementorImpl(); + tmpImplementor.setTableRef(tableRef); + SelectStatement select = SelectStatement.SELECT_ONE; + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableRef); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + Expression filterExpr = CalciteUtils.toExpression(filter, tmpImplementor); + filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + scanRanges = context.getScanRanges(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + this.scanRanges = scanRanges; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return this; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("filter", filter, filter != null) + .itemIf("statelessFetch", statelessFetch, statelessFetch != null); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + double rowCount = super.getRows(); + Double filteredRowCount = null; + if (scanRanges != null) { + if (scanRanges.isPointLookup()) { + filteredRowCount = 1.0; - } else if (scanRanges.getPkColumnSpan() > 0) { ++ } else if (scanRanges.getBoundPkColumnCount() > 0) { + // TODO + filteredRowCount = rowCount * RelMetadataQuery.getSelectivity(this, filter); + } + } + if (filteredRowCount != null) { + rowCount = filteredRowCount; + } else if (table.unwrap(PhoenixTable.class).getTable().getParentName() != null){ + rowCount = addEpsilon(rowCount); + } + int fieldCount = this.table.getRowType().getFieldCount(); + return planner.getCostFactory() + .makeCost(rowCount * 2 * fieldCount / (fieldCount + 1), rowCount + 1, 0) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public double getRows() { + double rows = super.getRows(); + if (filter != null && !filter.isAlwaysTrue()) { + rows = rows * RelMetadataQuery.getSelectivity(this, filter); + } + if (statelessFetch == null) + return rows; + + return Math.min(statelessFetch, rows); + } + + @Override + public QueryPlan implement(Implementor implementor) { + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + PTable pTable = phoenixTable.getTable(); + TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); + implementor.setTableRef(tableRef); + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableRef); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + SelectStatement select = SelectStatement.SELECT_ONE; + if (filter != null) { + Expression filterExpr = CalciteUtils.toExpression(filter, implementor); + filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, filterExpr, true, false); + } + projectAllColumnFamilies(context.getScan(), phoenixTable.getTable()); + if (implementor.getCurrentContext().forceProject()) { + TupleProjector tupleProjector = createTupleProjector(implementor, phoenixTable.getTable()); + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); + } + Integer limit = null; + OrderBy orderBy = OrderBy.EMPTY_ORDER_BY; + ParallelIteratorFactory iteratorFactory = null; + return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private TupleProjector createTupleProjector(Implementor implementor, PTable table) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (PColumn column : table.getColumns()) { + if (!SchemaUtil.isPKColumn(column) || !implementor.getCurrentContext().isRetainPKColumns()) { + Expression expr = implementor.newColumnExpression(column.getPosition()); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + // TODO only project needed columns + private void projectAllColumnFamilies(Scan scan, PTable table) { + scan.getFamilyMap().clear(); + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } + } + + private double addEpsilon(double d) { + assert d >= 0d; + final double d0 = d; + if (d < 10) { + // For small d, adding 1 would change the value significantly. + d *= 1.001d; + if (d != d0) { + return d; + } + } + // For medium d, add 1. Keeps integral values integral. + ++d; + if (d != d0) { + return d; + } + // For large d, adding 1 might not change the value. Add .1%. + // If d is NaN, this still will probably not change the value. That's OK. + d *= 1.001d; + return d; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java index b65b1b8,0000000..89aaa07 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java @@@ -1,129 -1,0 +1,129 @@@ +package org.apache.phoenix.calcite.rel; + +import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMdDistribution; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.LiteralResultIterationPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Values} + * relational expression in Phoenix. + */ +public class PhoenixValues extends Values implements PhoenixRel { + + private static final PhoenixConnection phoenixConnection; + static { + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + final Connection connection = + DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS); + phoenixConnection = + connection.unwrap(PhoenixConnection.class); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static PhoenixValues create(RelOptCluster cluster, final RelDataType rowType, final ImmutableList<ImmutableList<RexLiteral>> tuples) { + final RelTraitSet traits = + cluster.traitSetOf(PhoenixRel.CLIENT_CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return RelMdCollation.values(rowType, tuples); + } + }) + .replaceIf(RelDistributionTraitDef.INSTANCE, + new Supplier<RelDistribution>() { + public RelDistribution get() { + return RelMdDistribution.values(rowType, tuples); + } + }); + return new PhoenixValues(cluster, rowType, tuples, traits); + } + + private PhoenixValues(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + } + + @Override + public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return create(getCluster(), rowType, tuples); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<Tuple> literalResult = Lists.newArrayList(); + Iterator<ImmutableList<RexLiteral>> iter = getTuples().iterator(); + Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY); + while (iter.hasNext()) { + ImmutableList<RexLiteral> row = iter.next(); + List<Expression> exprs = Lists.newArrayListWithExpectedSize(row.size()); + for (RexLiteral rexLiteral : row) { + exprs.add(CalciteUtils.toExpression(rexLiteral, implementor)); + } + TupleProjector projector = implementor.project(exprs); + literalResult.add(projector.projectResults(baseTuple)); + } + + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixConnection); + ColumnResolver resolver = FromCompiler.getResolver(implementor.getTableRef()); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); - return new LiteralResultIterationPlan(literalResult.iterator(), context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null); ++ return new LiteralResultIterationPlan(literalResult, context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 598ead2,9a415b9..da55fb5 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@@ -73,10 -72,6 +73,10 @@@ public class AggregatePlan extends Base private final Expression having; private List<KeyRange> splits; private List<List<Scan>> scans; + + public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) { - return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving()); ++ return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), plan.dynamicFilter); + } public AggregatePlan( StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, @@@ -221,13 -223,4 +228,13 @@@ public boolean useRoundRobinIterator() throws SQLException { return false; } + + @Override + public QueryPlan limit(Integer limit) { + if (limit == this.limit || (limit != null && limit.equals(this.limit))) + return this; + + return new AggregatePlan(this.context, this.statement, this.tableRef, this.projection, - limit, this.orderBy, this.parallelIteratorFactory, this.groupBy, this.having); ++ limit, this.orderBy, this.parallelIteratorFactory, this.groupBy, this.having, this.dynamicFilter); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java index 0000000,1b0af8c..1981c4b mode 000000,100644..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java @@@ -1,0 -1,208 +1,218 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.phoenix.execute; + + import java.io.IOException; + import java.sql.SQLException; + import java.util.List; + + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + import org.apache.phoenix.compile.ExplainPlan; ++import org.apache.phoenix.compile.OrderByCompiler.OrderBy; + import org.apache.phoenix.compile.QueryPlan; + import org.apache.phoenix.exception.SQLExceptionCode; + import org.apache.phoenix.exception.SQLExceptionInfo; + import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; + import org.apache.phoenix.iterate.DefaultParallelScanGrouper; + import org.apache.phoenix.iterate.ParallelScanGrouper; + import org.apache.phoenix.iterate.ResultIterator; + import org.apache.phoenix.parse.JoinTableNode.JoinType; + import org.apache.phoenix.schema.KeyValueSchema; + import org.apache.phoenix.schema.PColumn; + import org.apache.phoenix.schema.PTable; + import org.apache.phoenix.schema.ValueBitSet; + import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; + import org.apache.phoenix.schema.tuple.Tuple; + import org.apache.phoenix.util.SchemaUtil; + + import com.google.common.collect.Lists; + + public class CorrelatePlan extends DelegateQueryPlan { + private final QueryPlan rhs; + private final String variableId; + private final JoinType joinType; + private final boolean isSingleValueOnly; + private final RuntimeContext runtimeContext; + private final KeyValueSchema joinedSchema; + private final KeyValueSchema lhsSchema; + private final KeyValueSchema rhsSchema; + private final int rhsFieldPosition; + + public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId, + JoinType joinType, boolean isSingleValueOnly, + RuntimeContext runtimeContext, PTable joinedTable, + PTable lhsTable, PTable rhsTable, int rhsFieldPosition) { + super(lhs); + if (joinType != JoinType.Inner && joinType != JoinType.Left && joinType != JoinType.Semi && joinType != JoinType.Anti) + throw new IllegalArgumentException("Unsupported join type '" + joinType + "' by CorrelatePlan"); + + this.rhs = rhs; + this.variableId = variableId; + this.joinType = joinType; + this.isSingleValueOnly = isSingleValueOnly; + this.runtimeContext = runtimeContext; + this.joinedSchema = buildSchema(joinedTable); + this.lhsSchema = buildSchema(lhsTable); + this.rhsSchema = buildSchema(rhsTable); + this.rhsFieldPosition = rhsFieldPosition; + } + + private static KeyValueSchema buildSchema(PTable table) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + if (table != null) { + for (PColumn column : table.getColumns()) { + if (!SchemaUtil.isPKColumn(column)) { + builder.addField(column); + } + } + } + return builder.build(); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> steps = Lists.newArrayList(); + steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + ") TABLES"); + for (String step : delegate.getExplainPlan().getPlanSteps()) { + steps.add(" " + step); + } + steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : "")); + for (String step : rhs.getExplainPlan().getPlanSteps()) { + steps.add(" " + step); + } + return new ExplainPlan(steps); + } + + @Override + public ResultIterator iterator() throws SQLException { + return iterator(DefaultParallelScanGrouper.getInstance()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper) + throws SQLException { + return new ResultIterator() { + private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema); + private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema); + private final ValueBitSet rhsBitSet = + (joinType == JoinType.Semi || joinType == JoinType.Anti) ? + ValueBitSet.EMPTY_VALUE_BITSET + : ValueBitSet.newInstance(rhsSchema); + private final ResultIterator iter = delegate.iterator(); + private ResultIterator rhsIter = null; + private Tuple current = null; + private boolean closed = false; + + @Override + public void close() throws SQLException { + if (!closed) { + closed = true; + iter.close(); + if (rhsIter != null) { + rhsIter.close(); + } + } + } + + @Override + public Tuple next() throws SQLException { + if (closed) + return null; + + Tuple rhsCurrent = null; + if (rhsIter != null) { + rhsCurrent = rhsIter.next(); + if (rhsCurrent == null) { + rhsIter.close(); + rhsIter = null; + } else if (isSingleValueOnly) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException(); + } + } + while (rhsIter == null) { + current = iter.next(); + if (current == null) { + close(); + return null; + } + runtimeContext.setCorrelateVariableValue(variableId, current); + rhsIter = rhs.iterator(); + rhsCurrent = rhsIter.next(); + if ((rhsCurrent == null && (joinType == JoinType.Inner || joinType == JoinType.Semi)) + || (rhsCurrent != null && joinType == JoinType.Anti)) { + rhsIter.close(); + rhsIter = null; + } + } + + Tuple joined; + try { + joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ? + current : TupleProjector.mergeProjectedValue( + convertLhs(current), joinedSchema, destBitSet, + rhsCurrent, rhsSchema, rhsBitSet, rhsFieldPosition); + } catch (IOException e) { + throw new SQLException(e); + } + + if ((joinType == JoinType.Semi || rhsCurrent == null) && rhsIter != null) { + rhsIter.close(); + rhsIter = null; + } + + return joined; + } + + @Override + public void explain(List<String> planSteps) { + } + + private ProjectedValueTuple convertLhs(Tuple lhs) throws IOException { + ProjectedValueTuple t; + if (lhs instanceof ProjectedValueTuple) { + t = (ProjectedValueTuple) lhs; + } else { + ImmutableBytesWritable ptr = getContext().getTempPtr(); + TupleProjector.decodeProjectedValue(lhs, ptr); + lhsBitSet.clear(); + lhsBitSet.or(ptr); + int bitSetLen = lhsBitSet.getEstimatedLength(); + t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(), + ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen); + + } + return t; + } + }; + } + + @Override + public Integer getLimit() { + return null; + } + ++ @Override ++ public QueryPlan limit(Integer limit) { ++ if (limit == null) ++ return this; ++ ++ return new ClientScanPlan(this.getContext(), this.getStatement(), this.getTableRef(), ++ this.getProjector(), limit, null, OrderBy.EMPTY_ORDER_BY, this); ++ } ++ + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 8cf39ba,72920b2..bbac5a5 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@@ -113,16 -114,10 +113,18 @@@ public class HashJoinPlan extends Deleg this.joinInfo = joinInfo; this.subPlans = subPlans; this.recompileWhereClause = recompileWhereClause; + this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt( + QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); } + public HashJoinInfo getJoinInfo() { + return this.joinInfo; + } + + public SubPlan[] getSubPlans() { + return this.subPlans; + } + @Override public ResultIterator iterator() throws SQLException { return iterator(DefaultParallelScanGrouper.getInstance()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index 58c78d2,ab13e6c..e9fac80 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@@ -106,13 -106,4 +107,13 @@@ public class LiteralResultIterationPla return scanner; } + @Override + public QueryPlan limit(Integer limit) { + if (limit == this.limit || (limit != null && limit.equals(this.limit))) + return this; + - return new LiteralResultIterationPlan(this.tupleIterator, this.context, this.statement, this.tableRef, ++ return new LiteralResultIterationPlan(this.tuples, this.context, this.statement, this.tableRef, + this.projection, limit, this.orderBy, this.parallelIteratorFactory); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 1e18aa6,9f7e482..2d408bc --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@@ -77,20 -76,15 +77,19 @@@ public class ScanPlan extends BaseQuery private List<KeyRange> splits; private List<List<Scan>> scans; private boolean allowPageFilter; - + + public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException { - return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter); ++ return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter, plan.dynamicFilter); + } - ++ public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException { - this(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, - parallelIteratorFactory != null ? parallelIteratorFactory : - buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), - allowPageFilter); + this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter, null); } - private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) { - super(context, statement, table, projector, paramMetaData, limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory); + private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { + super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, + parallelIteratorFactory != null ? parallelIteratorFactory : + buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter); this.allowPageFilter = allowPageFilter; if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( @@@ -229,13 -223,4 +228,17 @@@ return ScanUtil.isRoundRobinPossible(orderBy, context); } + @Override + public QueryPlan limit(Integer limit) { + if (limit == this.limit || (limit != null && limit.equals(this.limit))) + return this; + - return new ScanPlan(this.context, this.statement, this.tableRef, this.projection, - this.paramMetaData, limit, this.orderBy, this.parallelIteratorFactory, this.allowPageFilter); ++ try { ++ return new ScanPlan(this.context, this.statement, this.tableRef, this.projection, ++ limit, this.orderBy, this.parallelIteratorFactory, this.allowPageFilter, this.dynamicFilter); ++ } catch (SQLException e) { ++ throw new RuntimeException(e); ++ } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ----------------------------------------------------------------------
