http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java index a561a47..016cd52 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java @@ -136,7 +136,7 @@ public class GroupByCompiler { * @throws ColumnNotFoundException if column name could not be resolved * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables */ - public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector) throws SQLException { + public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector, boolean isInRowKeyOrder) throws SQLException { List<ParseNode> groupByNodes = statement.getGroupBy(); /** * Distinct can use an aggregate plan if there's no group by. @@ -179,7 +179,7 @@ public class GroupByCompiler { return GroupBy.EMPTY_GROUP_BY; } - boolean isRowKeyOrderedGrouping = groupByVisitor.isOrderPreserving(); + boolean isRowKeyOrderedGrouping = isInRowKeyOrder && groupByVisitor.isOrderPreserving(); List<Expression> expressions = Lists.newArrayListWithCapacity(groupByEntries.size()); List<Expression> keyExpressions = expressions; String groupExprAttribName;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index a609f60..f3d353b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -20,7 +20,6 @@ package org.apache.phoenix.compile; import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN; import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -293,6 +292,10 @@ public class JoinCompiler { return columnRefs; } + public ParseNode getPostFiltersCombined() { + return combine(postFilters); + } + public void addFilter(ParseNode filter) throws SQLException { if (joinSpecs.isEmpty()) { table.addFilter(filter); @@ -320,7 +323,7 @@ public class JoinCompiler { for (JoinSpec joinSpec : joinSpecs) { JoinTable joinTable = joinSpec.getJoinTable(); boolean hasSubJoin = !joinTable.getJoinSpecs().isEmpty(); - for (ComparisonParseNode node : joinSpec.getOnConditions()) { + for (EqualParseNode node : joinSpec.getOnConditions()) { node.getLHS().accept(generalRefVisitor); if (hasSubJoin) { node.getRHS().accept(generalRefVisitor); @@ -384,13 +387,12 @@ public class JoinCompiler { } public SelectStatement getAsSingleSubquery(SelectStatement query, boolean asSubquery) throws SQLException { - if (!isFlat(query)) - throw new SQLFeatureNotSupportedException("Complex subqueries not supported as left join table."); + assert (isFlat(query)); if (asSubquery) return query; - return NODE_FACTORY.select(query.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), query.getWhere(), select.getGroupBy(), select.getHaving(), select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence()); + return NODE_FACTORY.select(select, query.getFrom(), query.getWhere()); } public boolean hasPostReference() { @@ -427,7 +429,7 @@ public class JoinCompiler { public static class JoinSpec { private final JoinType type; - private final List<ComparisonParseNode> onConditions; + private final List<EqualParseNode> onConditions; private final JoinTable joinTable; private final boolean singleValueOnly; private Set<TableRef> dependencies; @@ -436,7 +438,7 @@ public class JoinCompiler { private JoinSpec(JoinType type, ParseNode onNode, JoinTable joinTable, boolean singleValueOnly, ColumnResolver resolver) throws SQLException { this.type = type; - this.onConditions = new ArrayList<ComparisonParseNode>(); + this.onConditions = new ArrayList<EqualParseNode>(); this.joinTable = joinTable; this.singleValueOnly = singleValueOnly; this.dependencies = new HashSet<TableRef>(); @@ -454,7 +456,7 @@ public class JoinCompiler { return type; } - public List<ComparisonParseNode> getOnConditions() { + public List<EqualParseNode> getOnConditions() { return onConditions; } @@ -470,75 +472,63 @@ public class JoinCompiler { return dependencies; } - public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext context, ColumnResolver leftResolver, ColumnResolver rightResolver) throws SQLException { + public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, boolean sortExpressions) throws SQLException { if (onConditions.isEmpty()) { return new Pair<List<Expression>, List<Expression>>( Collections.<Expression> singletonList(LiteralExpression.newConstant(1)), Collections.<Expression> singletonList(LiteralExpression.newConstant(1))); } - ColumnResolver resolver = context.getResolver(); - List<Pair<Expression, Expression>> compiled = new ArrayList<Pair<Expression, Expression>>(onConditions.size()); - context.setResolver(leftResolver); - ExpressionCompiler expressionCompiler = new ExpressionCompiler(context); - for (ParseNode condition : onConditions) { - assert (condition instanceof EqualParseNode); - EqualParseNode equalNode = (EqualParseNode) condition; - expressionCompiler.reset(); - Expression left = equalNode.getLHS().accept(expressionCompiler); - compiled.add(new Pair<Expression, Expression>(left, null)); - } - context.setResolver(rightResolver); - expressionCompiler = new ExpressionCompiler(context); - Iterator<Pair<Expression, Expression>> iter = compiled.iterator(); - for (ParseNode condition : onConditions) { - Pair<Expression, Expression> p = iter.next(); - EqualParseNode equalNode = (EqualParseNode) condition; - expressionCompiler.reset(); - Expression right = equalNode.getRHS().accept(expressionCompiler); - Expression left = p.getFirst(); + List<Pair<Expression, Expression>> compiled = Lists.<Pair<Expression, Expression>> newArrayListWithExpectedSize(onConditions.size()); + ExpressionCompiler lhsCompiler = new ExpressionCompiler(lhsCtx); + ExpressionCompiler rhsCompiler = new ExpressionCompiler(rhsCtx); + for (EqualParseNode condition : onConditions) { + lhsCompiler.reset(); + Expression left = condition.getLHS().accept(lhsCompiler); + rhsCompiler.reset(); + Expression right = condition.getRHS().accept(rhsCompiler); PDataType toType = getCommonType(left.getDataType(), right.getDataType()); if (left.getDataType() != toType) { left = CoerceExpression.create(left, toType); - p.setFirst(left); } if (right.getDataType() != toType) { right = CoerceExpression.create(right, toType); } - p.setSecond(right); + compiled.add(new Pair<Expression, Expression>(left, right)); } - context.setResolver(resolver); // recover the resolver - Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() { - @Override - public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) { - Expression e1 = o1.getFirst(); - Expression e2 = o2.getFirst(); - boolean isFixed1 = e1.getDataType().isFixedWidth(); - boolean isFixed2 = e2.getDataType().isFixedWidth(); - boolean isFixedNullable1 = e1.isNullable() &&isFixed1; - boolean isFixedNullable2 = e2.isNullable() && isFixed2; - if (isFixedNullable1 == isFixedNullable2) { - if (isFixed1 == isFixed2) { - return 0; - } else if (isFixed1) { - return -1; - } else { + if (sortExpressions) { + Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() { + @Override + public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) { + Expression e1 = o1.getFirst(); + Expression e2 = o2.getFirst(); + boolean isFixed1 = e1.getDataType().isFixedWidth(); + boolean isFixed2 = e2.getDataType().isFixedWidth(); + boolean isFixedNullable1 = e1.isNullable() &&isFixed1; + boolean isFixedNullable2 = e2.isNullable() && isFixed2; + if (isFixedNullable1 == isFixedNullable2) { + if (isFixed1 == isFixed2) { + return 0; + } else if (isFixed1) { + return -1; + } else { + return 1; + } + } else if (isFixedNullable1) { return 1; + } else { + return -1; } - } else if (isFixedNullable1) { - return 1; - } else { - return -1; } - } - }); - List<Expression> lConditions = new ArrayList<Expression>(compiled.size()); - List<Expression> rConditions = new ArrayList<Expression>(compiled.size()); + }); + } + List<Expression> lConditions = Lists.<Expression> newArrayListWithExpectedSize(compiled.size()); + List<Expression> rConditions = Lists.<Expression> newArrayListWithExpectedSize(compiled.size()); for (Pair<Expression, Expression> pair : compiled) { lConditions.add(pair.getFirst()); rConditions.add(pair.getSecond()); } - + return new Pair<List<Expression>, List<Expression>>(lConditions, rConditions); } @@ -683,11 +673,11 @@ public class JoinCompiler { return JoinCompiler.compilePostFilterExpression(context, postFilters); } - public SelectStatement getAsSubquery() throws SQLException { + public SelectStatement getAsSubquery(List<OrderByNode> orderBy) throws SQLException { if (isSubselect()) - return SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()); + return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias()); - return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false, select.hasSequence()); + return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence()); } public boolean hasFilters() { @@ -912,12 +902,12 @@ public class JoinCompiler { } private static class OnNodeVisitor extends BooleanParseNodeVisitor<Void> { - private List<ComparisonParseNode> onConditions; + private List<EqualParseNode> onConditions; private Set<TableRef> dependencies; private JoinTable joinTable; private ColumnRefParseNodeVisitor columnRefVisitor; - public OnNodeVisitor(ColumnResolver resolver, List<ComparisonParseNode> onConditions, + public OnNodeVisitor(ColumnResolver resolver, List<EqualParseNode> onConditions, Set<TableRef> dependencies, JoinTable joinTable) { this.onConditions = onConditions; this.dependencies = dependencies; @@ -981,7 +971,7 @@ public class JoinCompiler { joinTable.addFilter(node); } else if (lhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY && rhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) { - onConditions.add(node); + onConditions.add((EqualParseNode) node); dependencies.addAll(lhsTableRefSet); } else if (rhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY && lhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) { @@ -1069,9 +1059,9 @@ public class JoinCompiler { } } - private static String PROJECTED_TABLE_SCHEMA = "."; + private static final String PROJECTED_TABLE_SCHEMA = "."; // for creation of new statements - private static ParseNodeFactory NODE_FACTORY = new ParseNodeFactory(); + private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory(); private static boolean isFlat(SelectStatement select) { return !select.isJoin() @@ -1167,7 +1157,7 @@ public class JoinCompiler { QueryCompiler compiler = new QueryCompiler(statement, select, resolver); List<Object> binds = statement.getParameters(); StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement)); - QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false); + QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null); TableRef table = plan.getTableRef(); if (groupByTableRef != null && !groupByTableRef.equals(table)) { groupByTableRef = null; @@ -1303,17 +1293,30 @@ public class JoinCompiler { return new JoinedTableColumnResolver(this, origResolver); } - public PTableWrapper mergeProjectedTables(PTableWrapper rWrapper, boolean innerJoin) throws SQLException { + public PTableWrapper mergeProjectedTables(PTableWrapper rWrapper, JoinType type) throws SQLException { PTable left = this.getTable(); PTable right = rWrapper.getTable(); - List<PColumn> merged = new ArrayList<PColumn>(); - merged.addAll(left.getColumns()); + List<PColumn> merged = Lists.<PColumn> newArrayList(); + if (type != JoinType.Full) { + merged.addAll(left.getColumns()); + } else { + for (PColumn c : left.getColumns()) { + if (SchemaUtil.isPKColumn(c)) { + merged.add(c); + } else { + PColumnImpl column = new PColumnImpl(c.getName(), c.getFamilyName(), c.getDataType(), + c.getMaxLength(), c.getScale(), true, c.getPosition(), + c.getSortOrder(), c.getArraySize(), c.getViewConstant(), c.isViewReferenced()); + merged.add(column); + } + } + } int position = merged.size(); for (PColumn c : right.getColumns()) { if (!SchemaUtil.isPKColumn(c)) { PColumnImpl column = new PColumnImpl(c.getName(), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), c.getDataType(), - c.getMaxLength(), c.getScale(), innerJoin ? c.isNullable() : true, position++, + c.getMaxLength(), c.getScale(), type == JoinType.Inner ? c.isNullable() : true, position++, c.getSortOrder(), c.getArraySize(), c.getViewConstant(), c.isViewReferenced()); merged.add(column); } @@ -1358,12 +1361,16 @@ public class JoinCompiler { private JoinedTableColumnResolver(PTableWrapper table, ColumnResolver tableResolver) { this.table = table; this.tableResolver = tableResolver; - this.tableRef = new TableRef(null, table.getTable(), 0, false); + this.tableRef = new TableRef(ParseNodeFactory.createTempAlias(), table.getTable(), 0, false); } public PTableWrapper getPTableWrapper() { return table; } + + public TableRef getTableRef() { + return tableRef; + } @Override public List<TableRef> getTables() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java index d33d93a..de356d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java @@ -32,6 +32,7 @@ import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.OrderByNode; import org.apache.phoenix.query.ConnectionQueryServices.Feature; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import com.google.common.collect.ImmutableList; @@ -76,7 +77,8 @@ public class OrderByCompiler { */ public static OrderBy compile(StatementContext context, FilterableStatement statement, - GroupBy groupBy, Integer limit) throws SQLException { + GroupBy groupBy, Integer limit, + boolean isInRowKeyOrder) throws SQLException { List<OrderByNode> orderByNodes = statement.getOrderBy(); if (orderByNodes.isEmpty()) { return OrderBy.EMPTY_ORDER_BY; @@ -114,9 +116,12 @@ public class OrderByCompiler { return OrderBy.EMPTY_ORDER_BY; } // If we're ordering by the order returned by the scan, we don't need an order by - if (visitor.isOrderPreserving()) { + if (isInRowKeyOrder && visitor.isOrderPreserving()) { if (visitor.isReverse()) { - if (context.getConnection().getQueryServices().supportsFeature(Feature.REVERSE_SCAN)) { + // REV_ROW_KEY_ORDER_BY scan would not take effect for a projected table, so don't return it for such table types. + if (context.getConnection().getQueryServices().supportsFeature(Feature.REVERSE_SCAN) + && context.getCurrentTable().getTable().getType() != PTableType.JOIN + && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY) { return OrderBy.REV_ROW_KEY_ORDER_BY; } } else { @@ -130,4 +135,4 @@ public class OrderByCompiler { private OrderByCompiler() { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index cf96562..0bad266 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -40,6 +40,7 @@ import org.apache.phoenix.execute.HashJoinPlan; import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan; import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan; import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.SortMergeJoinPlan; import org.apache.phoenix.execute.TupleProjectionPlan; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; @@ -51,12 +52,17 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.parse.AliasedNode; +import org.apache.phoenix.parse.EqualParseNode; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.parse.OrderByNode; import org.apache.phoenix.parse.ParseNode; +import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.SubqueryParseNode; +import org.apache.phoenix.parse.TableNode; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -78,6 +84,7 @@ import com.google.common.collect.Sets; * @since 0.1 */ public class QueryCompiler { + private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory(); /* * Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't * want to introduce a dependency on 0.94.5 (where this feature was @@ -93,6 +100,7 @@ public class QueryCompiler { private final List<? extends PDatum> targetColumns; private final ParallelIteratorFactory parallelIteratorFactory; private final SequenceManager sequenceManager; + private final boolean useSortMergeJoin; public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException { this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement)); @@ -106,6 +114,7 @@ public class QueryCompiler { this.targetColumns = targetColumns; this.parallelIteratorFactory = parallelIteratorFactory; this.sequenceManager = sequenceManager; + this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN); if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) { this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE); } @@ -137,26 +146,56 @@ public class QueryCompiler { context = new StatementContext(statement, resolver, scan, sequenceManager); } JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver()); - return compileJoinQuery(context, binds, joinTable, false); + return compileJoinQuery(context, binds, joinTable, false, false, null); } else { return compileSingleQuery(context, select, binds, false, true); } } + /* + * Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes. + * This matches the input JoinTable node against patterns in the following order: + * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.) + * Returns the compilation result of a single table scan or of an independent subquery. + * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified): + * 1) A LEFT/INNER JOIN B + * 2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified + * where A can be a named table reference or a flat subquery, and B, C, ... can be a named + * table reference, a sub-join or a subquery of any kind. + * Returns a HashJoinPlan{scan: A, hash: B, C, ...}. + * 3. Matching pattern: + * A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified) + * where B can be a named table reference or a flat subquery, and A can be a named table + * reference, a sub-join or a subquery of any kind. + * Returns a HashJoinPlan{scan: B, hash: A}. + * NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as + * "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the + * parenthesis is considered a sub-join. + * viewed as a sub-join. + * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins. + * Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes + * of both sides as order-by clauses. + * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching. + * + * If no join algorithm hint is provided, according to the above compilation process, a join query + * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other. + * TODO 1) Use table statistics to guide the choice of join plans. + * 2) Make it possible to hint a certain join algorithm for a specific join step. + */ @SuppressWarnings("unchecked") - protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery) throws SQLException { + protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException { byte[] emptyByteArray = new byte[0]; List<JoinSpec> joinSpecs = joinTable.getJoinSpecs(); if (joinSpecs.isEmpty()) { Table table = joinTable.getTable(); - SelectStatement subquery = table.getAsSubquery(); + SelectStatement subquery = table.getAsSubquery(orderBy); if (!table.isSubselect()) { - ProjectedPTableWrapper projectedTable = table.createProjectedTable(!asSubquery); + ProjectedPTableWrapper projectedTable = table.createProjectedTable(!projectPKColumns); TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector()); context.setCurrentTable(table.getTableRef()); context.setResolver(projectedTable.createColumnResolver()); table.projectColumns(context.getScan()); - return compileSingleQuery(context, subquery, binds, asSubquery, true); + return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery); } QueryPlan plan = compileSubquery(subquery); ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector()); @@ -164,25 +203,26 @@ public class QueryCompiler { return new TupleProjectionPlan(plan, projectedTable.createTupleProjector(), table.compilePostFilterExpression(context)); } - boolean[] starJoinVector = joinTable.getStarJoinVector(); - if (starJoinVector != null) { + boolean[] starJoinVector; + if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) { Table table = joinTable.getTable(); ProjectedPTableWrapper initialProjectedTable; TableRef tableRef; SelectStatement query; if (!table.isSubselect()) { - initialProjectedTable = table.createProjectedTable(!asSubquery); + initialProjectedTable = table.createProjectedTable(!projectPKColumns); tableRef = table.getTableRef(); table.projectColumns(context.getScan()); - query = joinTable.getAsSingleSubquery(table.getAsSubquery(), asSubquery); + query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery); } else { - SelectStatement subquery = table.getAsSubquery(); + SelectStatement subquery = table.getAsSubquery(orderBy); QueryPlan plan = compileSubquery(subquery); initialProjectedTable = table.createProjectedTable(plan.getProjector()); tableRef = plan.getTableRef(); context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap()); query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery); } + context.setCurrentTable(tableRef); PTableWrapper projectedTable = initialProjectedTable; int count = joinSpecs.size(); ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count]; @@ -198,13 +238,12 @@ public class QueryCompiler { JoinSpec joinSpec = joinSpecs.get(i); Scan subScan = ScanUtil.newScan(originalScan); StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); - QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true); - ColumnResolver resolver = subContext.getResolver(); + QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true, true, null); boolean hasPostReference = joinSpec.getJoinTable().hasPostReference(); if (hasPostReference) { - PTableWrapper subProjTable = ((JoinedTableColumnResolver) (resolver)).getPTableWrapper(); + PTableWrapper subProjTable = ((JoinedTableColumnResolver) subContext.getResolver()).getPTableWrapper(); tables[i] = subProjTable.getTable(); - projectedTable = projectedTable.mergeProjectedTables(subProjTable, joinSpec.getType() == JoinType.Inner); + projectedTable = projectedTable.mergeProjectedTables(subProjTable, joinSpec.getType()); needsProject = true; } else { tables[i] = null; @@ -212,13 +251,13 @@ public class QueryCompiler { if (!starJoinVector[i]) { needsProject = true; } - ColumnResolver leftResolver = (!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver(); + context.setResolver((!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver()); joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder - Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, leftResolver, resolver); + Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true); joinExpressions[i] = joinConditions.getFirst(); List<Expression> hashExpressions = joinConditions.getSecond(); Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); - boolean complete = getKeyExpressionCombinations(keyRangeExpressions, context, tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions); + boolean complete = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions); Expression keyRangeLhsExpression = keyRangeExpressions.getFirst(); Expression keyRangeRhsExpression = keyRangeExpressions.getSecond(); boolean hasFilters = joinSpec.getJoinTable().hasFilters(); @@ -232,9 +271,8 @@ public class QueryCompiler { if (needsProject) { TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector()); } - context.setCurrentTable(tableRef); context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver()); - QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, joinTable.isAllLeftJoin()); + QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin()); Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table); Integer limit = null; if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) { @@ -246,65 +284,114 @@ public class QueryCompiler { JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1); JoinType type = lastJoinSpec.getType(); - if (type == JoinType.Full) - throw new SQLFeatureNotSupportedException(type + " joins not supported."); - - if (type == JoinType.Right || type == JoinType.Inner) { - if (!lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()) - throw new SQLFeatureNotSupportedException("Right join followed by sub-join is not supported."); - + if (!this.useSortMergeJoin + && (type == JoinType.Right || type == JoinType.Inner) + && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty() + && lastJoinSpec.getJoinTable().getTable().isFlat()) { JoinTable rhsJoinTable = lastJoinSpec.getJoinTable(); Table rhsTable = rhsJoinTable.getTable(); JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); Scan subScan = ScanUtil.newScan(originalScan); StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); - QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true); - ColumnResolver lhsResolver = lhsCtx.getResolver(); - PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper(); + QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null); + PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) lhsCtx.getResolver()).getPTableWrapper(); ProjectedPTableWrapper rhsProjTable; TableRef rhsTableRef; SelectStatement rhs; if (!rhsTable.isSubselect()) { - rhsProjTable = rhsTable.createProjectedTable(!asSubquery); + rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns); rhsTableRef = rhsTable.getTableRef(); rhsTable.projectColumns(context.getScan()); - rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(), asSubquery); + rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery); } else { - SelectStatement subquery = rhsTable.getAsSubquery(); + SelectStatement subquery = rhsTable.getAsSubquery(orderBy); QueryPlan plan = compileSubquery(subquery); rhsProjTable = rhsTable.createProjectedTable(plan.getProjector()); rhsTableRef = plan.getTableRef(); context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap()); rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery); } + context.setCurrentTable(rhsTableRef); boolean forceProjection = rhsTable.isSubselect(); - ColumnResolver rhsResolver = forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver(); + context.setResolver(forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver()); ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)}; - Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(context, lhsResolver, rhsResolver); + Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true); List<Expression> joinExpressions = joinConditions.getSecond(); List<Expression> hashExpressions = joinConditions.getFirst(); - int fieldPosition = rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size(); - PTableWrapper projectedTable = rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Inner); - TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector()); - context.setCurrentTable(rhsTableRef); - context.setResolver(projectedTable.createColumnResolver()); - QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, type == JoinType.Right); + boolean needsMerge = lhsJoin.hasPostReference(); + boolean needsProject = forceProjection || asSubquery || needsMerge; + PTable lhsTable = needsMerge ? lhsProjTable.getTable() : null; + int fieldPosition = needsMerge ? rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size() : 0; + PTableWrapper projectedTable = needsMerge ? rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable; + if (needsProject) { + TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector()); + } + context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver()); + QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right); Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable); Integer limit = null; if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) { limit = LimitCompiler.compile(context, rhs); } - HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection); + HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection); Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); - getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions); + getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions); return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), lhsJoin.hasFilters())}); } - - // Do not support queries like "A right join B left join C" with hash-joins. - throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported."); + + JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); + JoinTable rhsJoin = lastJoinSpec.getJoinTable(); + if (type == JoinType.Right) { + JoinTable temp = lhsJoin; + lhsJoin = rhsJoin; + rhsJoin = temp; + } + + List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions(); + List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size()); + List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size()); + for (EqualParseNode condition : joinConditionNodes) { + lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true)); + rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true)); + } + + Scan lhsScan = ScanUtil.newScan(originalScan); + StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement)); + boolean preserveRowkey = !projectPKColumns && type != JoinType.Full; + QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy); + PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) lhsCtx.getResolver()).getPTableWrapper(); + boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty(); + + Scan rhsScan = ScanUtil.newScan(originalScan); + StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement)); + QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy); + PTableWrapper rhsProjTable = ((JoinedTableColumnResolver) rhsCtx.getResolver()).getPTableWrapper(); + + Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, false); + List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst(); + List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond(); + + boolean needsMerge = rhsJoin.hasPostReference(); + PTable rhsTable = needsMerge ? rhsProjTable.getTable() : null; + int fieldPosition = needsMerge ? lhsProjTable.getTable().getColumns().size() - lhsProjTable.getTable().getPKColumns().size() : 0; + PTableWrapper projectedTable = needsMerge ? lhsProjTable.mergeProjectedTables(rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable; + + ColumnResolver resolver = projectedTable.createColumnResolver(); + TableRef tableRef = ((JoinedTableColumnResolver) resolver).getTableRef(); + StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement)); + subCtx.setCurrentTable(tableRef); + QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable.getTable(), lhsProjTable.getTable(), rhsTable, fieldPosition); + context.setCurrentTable(tableRef); + context.setResolver(resolver); + TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString())); + ParseNode where = joinTable.getPostFiltersCombined(); + SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence()) + : NODE_FACTORY.select(joinTable.getStatement(), from, where); + + return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder); } - private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException { + private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException { if (type != JoinType.Inner && type != JoinType.Semi) return false; @@ -312,7 +399,7 @@ public class QueryCompiler { StatementContext contextCopy = new StatementContext(statement, context.getResolver(), scanCopy, new SequenceManager(statement)); contextCopy.setCurrentTable(table); List<Expression> lhsCombination = Lists.<Expression> newArrayList(); - boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, this.select, joinExpressions); + boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, select, joinExpressions); if (lhsCombination.isEmpty()) return false; @@ -354,7 +441,7 @@ public class QueryCompiler { protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{ SelectStatement innerSelect = select.getInnerSelectStatement(); if (innerSelect == null) { - return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null); + return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true); } QueryPlan innerPlan = compileSubquery(innerSelect); @@ -368,10 +455,10 @@ public class QueryCompiler { tableRef = resolver.getTables().get(0); context.setCurrentTable(tableRef); - return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : null); + return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, innerPlan.getOrderBy().getOrderByExpressions().isEmpty()); } - protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) throws SQLException{ + protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{ PhoenixConnection connection = statement.getConnection(); ColumnResolver resolver = context.getResolver(); TableRef tableRef = context.getCurrentTable(); @@ -383,7 +470,7 @@ public class QueryCompiler { } Integer limit = LimitCompiler.compile(context, select); - GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector); + GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector, isInRowKeyOrder); // Optimize the HAVING clause by finding any group by expressions that can be moved // to the WHERE clause select = HavingCompiler.rewrite(context, select, groupBy); @@ -396,7 +483,7 @@ public class QueryCompiler { Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet(); Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries); context.setResolver(resolver); // recover resolver - OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit); + OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, isInRowKeyOrder); RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns); // Final step is to build the query plan http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java index d229478..805894f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java @@ -58,6 +58,13 @@ public class SubselectRewriter extends ParseNodeRewriter { return statement.getLimit() == null && (!statement.isAggregate() || !statement.getGroupBy().isEmpty()); } + public static SelectStatement applyOrderBy(SelectStatement statement, List<OrderByNode> orderBy, String subqueryAlias) throws SQLException { + if (orderBy == null) + return statement; + + return new SubselectRewriter(null, statement.getSelect(), subqueryAlias).applyOrderBy(statement, orderBy); + } + public static SelectStatement flatten(SelectStatement select, PhoenixConnection connection) throws SQLException { TableNode from = select.getFrom(); while (from != null && from instanceof DerivedTableNode) { @@ -209,16 +216,24 @@ public class SubselectRewriter extends ParseNodeRewriter { if (where != null) { postFiltersRewrite.add(where); } - return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), combine(postFiltersRewrite), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), - statement.getBindCount(), statement.isAggregate(), statement.hasSequence()); + return NODE_FACTORY.select(statement, combine(postFiltersRewrite)); } ParseNode having = statement.getHaving(); if (having != null) { postFiltersRewrite.add(having); } - return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), combine(postFiltersRewrite), statement.getOrderBy(), statement.getLimit(), - statement.getBindCount(), statement.isAggregate(), statement.hasSequence()); + return NODE_FACTORY.select(statement, statement.getWhere(), combine(postFiltersRewrite)); + } + + private SelectStatement applyOrderBy(SelectStatement statement, List<OrderByNode> orderBy) throws SQLException { + List<OrderByNode> orderByRewrite = Lists.<OrderByNode> newArrayListWithExpectedSize(orderBy.size()); + for (OrderByNode orderByNode : orderBy) { + ParseNode node = orderByNode.getNode(); + orderByRewrite.add(NODE_FACTORY.orderBy(node.accept(this), orderByNode.isNullsLast(), orderByNode.isAscending())); + } + + return NODE_FACTORY.select(statement, orderByRewrite); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 62feb12..8731359 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -419,6 +419,11 @@ public class UpsertCompiler { try { QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement)); queryPlanToBe = compiler.compile(); + // This is post-fix: if the tableRef is a projected table, this means there are post-processing + // steps and parallelIteratorFactory did not take effect. + if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.JOIN || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) { + parallelIteratorFactoryToBe = null; + } } catch (MetaDataEntityNotFoundException e) { retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache throw e; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 1bf26f2..6a8f52a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -139,7 +139,9 @@ public class WhereCompiler { expression = AndExpression.create(filters); } - expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes); + if (context.getCurrentTable().getTable().getType() != PTableType.JOIN && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY) { + expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes); + } setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily, hashJoinOptimization); return expression; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 1aecb7a..40bfd9d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -154,7 +154,7 @@ public class HashJoinRegionScanner implements RegionScanner { for (int i = 0; i < count; i++) { boolean earlyEvaluation = joinInfo.earlyEvaluation()[i]; JoinType type = joinInfo.getJoinTypes()[i]; - if (earlyEvaluation && (tempTuples[i] == null || type == JoinType.Semi)) + if (earlyEvaluation && (type == JoinType.Semi || type == JoinType.Anti)) continue; int j = resultQueue.size(); while (j-- > 0) { @@ -163,12 +163,23 @@ public class HashJoinRegionScanner implements RegionScanner { ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(lhs, joinInfo.getJoinExpressions()[i]); tempTuples[i] = hashCaches[i].get(key); if (tempTuples[i] == null) { - if (type != JoinType.Inner && type != JoinType.Semi) { + if (type == JoinType.Inner || type == JoinType.Semi) { + continue; + } else if (type == JoinType.Anti) { resultQueue.offer(lhs); + continue; } - continue; } } + if (tempTuples[i] == null) { + Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? + lhs : TupleProjector.mergeProjectedValue( + (ProjectedValueTuple) lhs, schema, tempDestBitSet, + null, joinInfo.getSchemas()[i], tempSrcBitSet[i], + joinInfo.getFieldPositions()[i]); + resultQueue.offer(joined); + continue; + } for (Tuple t : tempTuples[i]) { Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? lhs : TupleProjector.mergeProjectedValue( http://git-wip-us.apache.org/repos/asf/phoenix/blob/c647c6a2/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java new file mode 100644 index 0000000..fdbd23f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.sql.ParameterMetaData; +import java.sql.SQLException; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.iterate.MappedByteBufferQueue; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.jdbc.PhoenixParameterMetaData; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; + +public class SortMergeJoinPlan implements QueryPlan { + private static final byte[] EMPTY_PTR = new byte[0]; + + private final StatementContext context; + private final FilterableStatement statement; + private final TableRef table; + private final JoinType type; + private final QueryPlan lhsPlan; + private final QueryPlan rhsPlan; + private final List<Expression> lhsKeyExpressions; + private final List<Expression> rhsKeyExpressions; + private final KeyValueSchema joinedSchema; + private final KeyValueSchema lhsSchema; + private final KeyValueSchema rhsSchema; + private final int rhsFieldPosition; + + public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, + JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions, + PTable joinedTable, PTable lhsTable, PTable rhsTable, int rhsFieldPosition) { + if (type == JoinType.Right) throw new IllegalArgumentException("JoinType should not be " + type); + this.context = context; + this.statement = statement; + this.table = table; + this.type = type; + this.lhsPlan = lhsPlan; + this.rhsPlan = rhsPlan; + this.lhsKeyExpressions = lhsKeyExpressions; + this.rhsKeyExpressions = rhsKeyExpressions; + this.joinedSchema = buildSchema(joinedTable); + this.lhsSchema = buildSchema(lhsTable); + this.rhsSchema = buildSchema(rhsTable); + this.rhsFieldPosition = rhsFieldPosition; + } + + private static KeyValueSchema buildSchema(PTable table) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + if (table != null) { + for (PColumn column : table.getColumns()) { + if (!SchemaUtil.isPKColumn(column)) { + builder.addField(column); + } + } + } + return builder.build(); + } + + @Override + public ResultIterator iterator() throws SQLException { + return type == JoinType.Semi || type == JoinType.Anti ? + new SemiAntiJoinIterator(lhsPlan.iterator(), rhsPlan.iterator()) : + new BasicJoinIterator(lhsPlan.iterator(), rhsPlan.iterator()); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> steps = Lists.newArrayList(); + steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") TABLES"); + for (String step : lhsPlan.getExplainPlan().getPlanSteps()) { + steps.add(" " + step); + } + steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : "")); + for (String step : rhsPlan.getExplainPlan().getPlanSteps()) { + steps.add(" " + step); + } + return new ExplainPlan(steps); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public long getEstimatedSize() { + return lhsPlan.getEstimatedSize() + rhsPlan.getEstimatedSize(); + } + + @Override + public TableRef getTableRef() { + return table; + } + + @Override + public RowProjector getProjector() { + return null; + } + + @Override + public Integer getLimit() { + return null; + } + + @Override + public OrderBy getOrderBy() { + return null; + } + + @Override + public GroupBy getGroupBy() { + return null; + } + + @Override + public List<KeyRange> getSplits() { + return Collections.<KeyRange> emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.<List<Scan>> emptyList(); + } + + @Override + public FilterableStatement getStatement() { + return statement; + } + + @Override + public boolean isDegenerate() { + return false; + } + + @Override + public boolean isRowKeyOrdered() { + return false; + } + + private class BasicJoinIterator implements ResultIterator { + private final ResultIterator lhsIterator; + private final ResultIterator rhsIterator; + private boolean initialized; + private Tuple lhsTuple; + private Tuple rhsTuple; + private JoinKey lhsKey; + private JoinKey rhsKey; + private Tuple nextLhsTuple; + private Tuple nextRhsTuple; + private JoinKey nextLhsKey; + private JoinKey nextRhsKey; + private ValueBitSet destBitSet; + private ValueBitSet lhsBitSet; + private ValueBitSet rhsBitSet; + private byte[] emptyProjectedValue; + private MappedByteBufferTupleQueue queue; + private Iterator<Tuple> queueIterator; + + public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { + this.lhsIterator = lhsIterator; + this.rhsIterator = rhsIterator; + this.initialized = false; + this.lhsTuple = null; + this.rhsTuple = null; + this.lhsKey = new JoinKey(lhsKeyExpressions); + this.rhsKey = new JoinKey(rhsKeyExpressions); + this.nextLhsTuple = null; + this.nextRhsTuple = null; + this.nextLhsKey = new JoinKey(lhsKeyExpressions); + this.nextRhsKey = new JoinKey(rhsKeyExpressions); + this.destBitSet = ValueBitSet.newInstance(joinedSchema); + this.lhsBitSet = ValueBitSet.newInstance(lhsSchema); + this.rhsBitSet = ValueBitSet.newInstance(rhsSchema); + lhsBitSet.clear(); + int len = lhsBitSet.getEstimatedLength(); + this.emptyProjectedValue = new byte[len]; + lhsBitSet.toBytes(emptyProjectedValue, 0); + int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( + QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); + this.queue = new MappedByteBufferTupleQueue(thresholdBytes); + this.queueIterator = null; + } + + @Override + public void close() throws SQLException { + lhsIterator.close(); + rhsIterator.close(); + queue.close(); + } + + @Override + public Tuple next() throws SQLException { + if (!initialized) { + init(); + } + + Tuple next = null; + while (next == null && !isEnd()) { + if (queueIterator != null) { + if (queueIterator.hasNext()) { + next = join(lhsTuple, queueIterator.next()); + } else { + boolean eq = nextLhsTuple != null && lhsKey.equals(nextLhsKey); + advance(true); + if (eq) { + queueIterator = queue.iterator(); + } else { + queue.clear(); + queueIterator = null; + } + } + } else if (lhsTuple != null) { + if (rhsTuple != null) { + if (lhsKey.equals(rhsKey)) { + next = join(lhsTuple, rhsTuple); + if (nextLhsTuple != null && lhsKey.equals(nextLhsKey)) { + queue.offer(rhsTuple); + if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) { + queueIterator = queue.iterator(); + advance(true); + } + } else if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) { + advance(true); + } + advance(false); + } else if (lhsKey.compareTo(rhsKey) < 0) { + if (type == JoinType.Full || type == JoinType.Left) { + next = join(lhsTuple, null); + } + advance(true); + } else { + if (type == JoinType.Full) { + next = join(null, rhsTuple); + } + advance(false); + } + } else { // left-join or full-join + next = join(lhsTuple, null); + advance(true); + } + } else { // full-join + next = join(null, rhsTuple); + advance(false); + } + } + + return next; + } + + @Override + public void explain(List<String> planSteps) { + } + + private void init() throws SQLException { + nextLhsTuple = lhsIterator.next(); + if (nextLhsTuple != null) { + nextLhsKey.evaluate(nextLhsTuple); + } + advance(true); + nextRhsTuple = rhsIterator.next(); + if (nextRhsTuple != null) { + nextRhsKey.evaluate(nextRhsTuple); + } + advance(false); + initialized = true; + } + + private void advance(boolean lhs) throws SQLException { + if (lhs) { + lhsTuple = nextLhsTuple; + lhsKey.set(nextLhsKey); + if (lhsTuple != null) { + nextLhsTuple = lhsIterator.next(); + if (nextLhsTuple != null) { + nextLhsKey.evaluate(nextLhsTuple); + } else { + nextLhsKey.clear(); + } + } + } else { + rhsTuple = nextRhsTuple; + rhsKey.set(nextRhsKey); + if (rhsTuple != null) { + nextRhsTuple = rhsIterator.next(); + if (nextRhsTuple != null) { + nextRhsKey.evaluate(nextRhsTuple); + } else { + nextRhsKey.clear(); + } + } + } + } + + private boolean isEnd() { + return (lhsTuple == null && (rhsTuple == null || type != JoinType.Full)) + || (queueIterator == null && rhsTuple == null && type == JoinType.Inner); + } + + private Tuple join(Tuple lhs, Tuple rhs) throws SQLException { + try { + ProjectedValueTuple t = null; + if (lhs == null) { + t = new ProjectedValueTuple(rhs, rhs.getValue(0).getTimestamp(), + this.emptyProjectedValue, 0, this.emptyProjectedValue.length, + this.emptyProjectedValue.length); + } else if (lhs instanceof ProjectedValueTuple) { + t = (ProjectedValueTuple) lhs; + } else { + ImmutableBytesWritable ptr = context.getTempPtr(); + TupleProjector.decodeProjectedValue(lhs, ptr); + lhsBitSet.clear(); + lhsBitSet.or(ptr); + int bitSetLen = lhsBitSet.getEstimatedLength(); + t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(), + ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen); + + } + return rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ? + t : TupleProjector.mergeProjectedValue( + t, joinedSchema, destBitSet, + rhs, rhsSchema, rhsBitSet, rhsFieldPosition); + } catch (IOException e) { + throw new SQLException(e); + } + } + } + + private class SemiAntiJoinIterator implements ResultIterator { + private final ResultIterator lhsIterator; + private final ResultIterator rhsIterator; + private final boolean isSemi; + private boolean initialized; + private Tuple lhsTuple; + private Tuple rhsTuple; + private JoinKey lhsKey; + private JoinKey rhsKey; + + public SemiAntiJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { + if (type != JoinType.Semi && type != JoinType.Anti) throw new IllegalArgumentException("Type " + type + " is not allowed by " + SemiAntiJoinIterator.class.getName()); + this.lhsIterator = lhsIterator; + this.rhsIterator = rhsIterator; + this.isSemi = type == JoinType.Semi; + this.initialized = false; + this.lhsTuple = null; + this.rhsTuple = null; + this.lhsKey = new JoinKey(lhsKeyExpressions); + this.rhsKey = new JoinKey(rhsKeyExpressions); + } + + @Override + public void close() throws SQLException { + lhsIterator.close(); + rhsIterator.close(); + } + + @Override + public Tuple next() throws SQLException { + if (!initialized) { + advance(true); + advance(false); + initialized = true; + } + + Tuple next = null; + while (lhsTuple != null && next == null) { + if (rhsTuple != null) { + if (lhsKey.equals(rhsKey)) { + if (isSemi) { + next = lhsTuple; + } + advance(true); + } else if (lhsKey.compareTo(rhsKey) < 0) { + if (!isSemi) { + next = lhsTuple; + } + advance(true); + } else { + advance(false); + } + } else { + if (!isSemi) { + next = lhsTuple; + } + advance(true); + } + } + + return next; + } + + @Override + public void explain(List<String> planSteps) { + } + + private void advance(boolean lhs) throws SQLException { + if (lhs) { + lhsTuple = lhsIterator.next(); + if (lhsTuple != null) { + lhsKey.evaluate(lhsTuple); + } else { + lhsKey.clear(); + } + } else { + rhsTuple = rhsIterator.next(); + if (rhsTuple != null) { + rhsKey.evaluate(rhsTuple); + } else { + rhsKey.clear(); + } + } + } + } + + private static class JoinKey implements Comparable<JoinKey> { + private final List<Expression> expressions; + private final List<ImmutableBytesWritable> keys; + + public JoinKey(List<Expression> expressions) { + this.expressions = expressions; + this.keys = Lists.newArrayListWithExpectedSize(expressions.size()); + for (int i = 0; i < expressions.size(); i++) { + this.keys.add(new ImmutableBytesWritable()); + } + } + + public void evaluate(Tuple tuple) { + for (int i = 0; i < keys.size(); i++) { + if (!expressions.get(i).evaluate(tuple, keys.get(i))) { + keys.get(i).set(EMPTY_PTR); + } + } + } + + public void set(JoinKey other) { + for (int i = 0; i < keys.size(); i++) { + ImmutableBytesWritable key = other.keys.get(i); + this.keys.get(i).set(key.get(), key.getOffset(), key.getLength()); + } + } + + public void clear() { + for (int i = 0; i < keys.size(); i++) { + this.keys.get(i).set(EMPTY_PTR); + } + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof JoinKey)) + return false; + return this.compareTo((JoinKey) other) == 0; + } + + @Override + public int compareTo(JoinKey other) { + for (int i = 0; i < keys.size(); i++) { + int comp = this.keys.get(i).compareTo(other.keys.get(i)); + if (comp != 0) + return comp; + } + + return 0; + } + } + + private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> { + + public MappedByteBufferTupleQueue(int thresholdBytes) { + super(thresholdBytes); + } + + @Override + protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue( + int index, int thresholdBytes) { + return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false); + } + + @Override + protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() { + return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() { + @Override + public int compare(MappedByteBufferSegmentQueue<Tuple> q1, + MappedByteBufferSegmentQueue<Tuple> q2) { + return q1.index() - q2.index(); + } + }; + } + + @Override + public Iterator<Tuple> iterator() { + return new Iterator<Tuple>() { + private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter; + private Iterator<Tuple> currentIter; + { + this.queueIter = getSegmentQueues().iterator(); + this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; + } + + @Override + public boolean hasNext() { + return currentIter != null && currentIter.hasNext(); + } + + @Override + public Tuple next() { + if (!hasNext()) + return null; + + Tuple ret = currentIter.next(); + if (!currentIter.hasNext()) { + this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; + } + + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> { + private LinkedList<Tuple> results; + + public MappedByteBufferTupleSegmentQueue(int index, + int thresholdBytes, boolean hasMaxQueueSize) { + super(index, thresholdBytes, hasMaxQueueSize); + this.results = Lists.newLinkedList(); + } + + @Override + protected Queue<Tuple> getInMemoryQueue() { + return results; + } + + @Override + protected int sizeOf(Tuple e) { + KeyValue kv = e.getValue(0); + return Bytes.SIZEOF_INT * 2 + kv.getLength(); + } + + @SuppressWarnings("deprecation") + @Override + protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) { + KeyValue kv = e.getValue(0); + buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT); + buffer.putInt(kv.getLength()); + buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength()); + } + + @Override + protected Tuple readFromBuffer(MappedByteBuffer buffer) { + int length = buffer.getInt(); + if (length < 0) + return null; + + byte[] b = new byte[length]; + buffer.get(b); + Result result = new Result(new ImmutableBytesWritable(b)); + return new ResultTuple(result); + } + + } + } + +} +