Repository: phoenix Updated Branches: refs/heads/master 656acefd1 -> 49ec34be2
PHOENIX-1332 Support correlated subqueries in comparison with ANY/SOME/ALL Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/49ec34be Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/49ec34be Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/49ec34be Branch: refs/heads/master Commit: 49ec34be258ce12ca150c5c37a35e2c1cad0105c Parents: 656acef Author: maryannxue <[email protected]> Authored: Mon Oct 13 11:05:00 2014 -0400 Committer: maryannxue <[email protected]> Committed: Mon Oct 13 11:05:00 2014 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/SubqueryIT.java | 79 ++++++++ .../org/apache/phoenix/cache/HashCache.java | 4 +- .../apache/phoenix/compile/JoinCompiler.java | 15 +- .../apache/phoenix/compile/QueryCompiler.java | 4 +- .../phoenix/compile/SubqueryRewriter.java | 181 +++++++++++++++---- .../coprocessor/HashJoinRegionScanner.java | 1 + .../apache/phoenix/execute/HashJoinPlan.java | 5 +- .../expression/ArrayConstructorExpression.java | 16 +- .../phoenix/expression/ExpressionType.java | 2 + .../DistinctValueClientAggregator.java | 63 +++++++ .../DistinctValueWithCountServerAggregator.java | 2 +- .../DistinctValueAggregateFunction.java | 66 +++++++ .../apache/phoenix/join/HashCacheClient.java | 7 +- .../apache/phoenix/join/HashCacheFactory.java | 15 +- .../org/apache/phoenix/parse/JoinTableNode.java | 8 +- .../apache/phoenix/parse/ParseNodeFactory.java | 6 +- .../apache/phoenix/parse/ParseNodeRewriter.java | 2 +- 17 files changed, 417 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java index 58d92f3..e4b4c8b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java @@ -899,6 +899,85 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { rs = conn.createStatement().executeQuery("EXPLAIN " + query); String plan = QueryUtil.getExplainPlan(rs); assertTrue("\"" + plan + "\" does not match \"" + plans[4] + "\"", Pattern.matches(plans[4], plan)); + + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004')"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000001"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000002"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000003"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000005"); + + assertFalse(rs.next()); + + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003')"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + try { + while(rs.next()); + fail("Should have got exception."); + } catch (SQLException e) { + } + + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004' GROUP BY \"order_id\")"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000001"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000002"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000003"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000005"); + + assertFalse(rs.next()); + + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003' GROUP BY \"order_id\")"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + try { + while(rs.next()); + fail("Should have got exception."); + } catch (SQLException e) { + } + } finally { + conn.close(); + } + } + + @Test + public void testAnyAllComparisonSubquery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + String query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = ALL(SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")"; + PreparedStatement statement = conn.prepareStatement(query); + ResultSet rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000001"); + assertEquals(rs.getString(2), "T1"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000003"); + assertEquals(rs.getString(2), "T2"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000005"); + assertEquals(rs.getString(2), "T3"); + + assertFalse(rs.next()); + + query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity != ALL(SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000002"); + assertEquals(rs.getString(2), "T6"); + + assertFalse(rs.next()); } finally { conn.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java index e604f63..311f119 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java @@ -18,10 +18,10 @@ package org.apache.phoenix.cache; import java.io.Closeable; +import java.io.IOException; import java.util.List; import org.apache.http.annotation.Immutable; - import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.schema.tuple.Tuple; @@ -34,5 +34,5 @@ import org.apache.phoenix.schema.tuple.Tuple; */ @Immutable public interface HashCache extends Closeable { - public List<Tuple> get(ImmutableBytesPtr hashKey); + public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index d473c97..7e5382e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -187,7 +187,7 @@ public class JoinCompiler { if (joinSpecs == null) { joinSpecs = new ArrayList<JoinSpec>(); } - joinSpecs.add(new JoinSpec(joinNode.getType(), joinNode.getOnNode(), joinTable, origResolver)); + joinSpecs.add(new JoinSpec(joinNode.getType(), joinNode.getOnNode(), joinTable, joinNode.isSingleValueOnly(), origResolver)); return new Pair<Table, List<JoinSpec>>(lhs.getFirst(), joinSpecs); } @@ -362,7 +362,8 @@ public class JoinCompiler { && count > 1 && joinSpecs.get(count - 1).getType() != JoinType.Left && joinSpecs.get(count - 1).getType() != JoinType.Semi - && joinSpecs.get(count - 1).getType() != JoinType.Anti)) + && joinSpecs.get(count - 1).getType() != JoinType.Anti + && !joinSpecs.get(count - 1).isSingleValueOnly())) return null; boolean[] vector = new boolean[count]; @@ -437,13 +438,15 @@ public class JoinCompiler { private final JoinType type; private final List<ComparisonParseNode> onConditions; private final JoinTable joinTable; + private final boolean singleValueOnly; private Set<TableRef> dependencies; private JoinSpec(JoinType type, ParseNode onNode, JoinTable joinTable, - ColumnResolver resolver) throws SQLException { + boolean singleValueOnly, ColumnResolver resolver) throws SQLException { this.type = type; this.onConditions = new ArrayList<ComparisonParseNode>(); this.joinTable = joinTable; + this.singleValueOnly = singleValueOnly; this.dependencies = new HashSet<TableRef>(); OnNodeVisitor visitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable); onNode.accept(visitor); @@ -461,6 +464,10 @@ public class JoinCompiler { return joinTable; } + public boolean isSingleValueOnly() { + return singleValueOnly; + } + public Set<TableRef> getDependencies() { return dependencies; } @@ -1177,7 +1184,7 @@ public class JoinCompiler { if (lhs == lhsReplace && rhs == rhsReplace) return joinNode; - return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode()); + return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode(), joinNode.isSingleValueOnly()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 52abb9e..a2dc5b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -225,7 +225,7 @@ public class QueryCompiler { if (i < count - 1) { fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size())); } - subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters); + subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters); } if (needsProject) { TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector()); @@ -296,7 +296,7 @@ public class QueryCompiler { HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection); Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions); - return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())}); + return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())}); } // Do not support queries like "A right join B left join C" with hash-joins. http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java index 6428155..4b37259 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java @@ -25,9 +25,12 @@ import java.util.List; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.expression.function.DistinctValueAggregateFunction; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.AndParseNode; +import org.apache.phoenix.parse.ArrayAllComparisonNode; +import org.apache.phoenix.parse.ArrayAnyComparisonNode; import org.apache.phoenix.parse.BooleanParseNodeVisitor; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.ComparisonParseNode; @@ -36,6 +39,7 @@ import org.apache.phoenix.parse.ExistsParseNode; import org.apache.phoenix.parse.InParseNode; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.parse.LiteralParseNode; +import org.apache.phoenix.parse.OrderByNode; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.ParseNodeRewriter; @@ -129,26 +133,32 @@ public class SubqueryRewriter extends ParseNodeRewriter { @Override public ParseNode visitLeave(InParseNode node, List<ParseNode> l) throws SQLException { + boolean isTopNode = topNode == node; + if (isTopNode) { + topNode = null; + } + SubqueryParseNode subqueryNode = (SubqueryParseNode) l.get(1); SelectStatement subquery = subqueryNode.getSelectNode(); String rhsTableAlias = ParseNodeFactory.createTempAlias(); - List<AliasedNode> selectNodes = fixAliasedNodes(subquery.getSelect()); + List<AliasedNode> selectNodes = fixAliasedNodes(subquery.getSelect(), true); subquery = NODE_FACTORY.select(subquery, true, selectNodes); ParseNode onNode = getJoinConditionNode(l.get(0), selectNodes, rhsTableAlias); TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery); - JoinType joinType = topNode == node ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left; - ParseNode ret = topNode == node ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate()); - tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode); - - if (topNode == node) { - topNode = null; - } + JoinType joinType = isTopNode ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left; + ParseNode ret = isTopNode ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate()); + tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false); return ret; } @Override public ParseNode visitLeave(ExistsParseNode node, List<ParseNode> l) throws SQLException { + boolean isTopNode = topNode == node; + if (isTopNode) { + topNode = null; + } + SubqueryParseNode subqueryNode = (SubqueryParseNode) l.get(0); SelectStatement subquery = subqueryNode.getSelectNode(); String rhsTableAlias = ParseNodeFactory.createTempAlias(); @@ -169,19 +179,20 @@ public class SubqueryRewriter extends ParseNodeRewriter { subquery = NODE_FACTORY.select(subquery, true, selectNodes, where); ParseNode onNode = conditionExtractor.getJoinCondition(); TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery); - JoinType joinType = topNode == node ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left; - ParseNode ret = topNode == node ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate()); - tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode); - - if (topNode == node) { - topNode = null; - } + JoinType joinType = isTopNode ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left; + ParseNode ret = isTopNode ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate()); + tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false); return ret; } @Override public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> l) throws SQLException { + boolean isTopNode = topNode == node; + if (isTopNode) { + topNode = null; + } + ParseNode secondChild = l.get(1); if (!(secondChild instanceof SubqueryParseNode)) { return super.visitLeave(node, l); @@ -200,12 +211,9 @@ public class SubqueryRewriter extends ParseNodeRewriter { return super.visitLeave(node, l); } - if (!subquery.isAggregate() || !subquery.getGroupBy().isEmpty()) { - //TODO add runtime singleton check or add a "singleton" aggregate funtion - throw new SQLFeatureNotSupportedException("Do not support non-aggregate or groupby subquery in comparison."); - } - ParseNode rhsNode = null; + boolean isGroupby = !subquery.getGroupBy().isEmpty(); + boolean isAggregate = subquery.isAggregate(); List<AliasedNode> aliasedNodes = subquery.getSelect(); if (aliasedNodes.size() == 1) { rhsNode = aliasedNodes.get(0).getNode(); @@ -221,28 +229,139 @@ public class SubqueryRewriter extends ParseNodeRewriter { List<AliasedNode> selectNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + 1); selectNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), rhsNode)); selectNodes.addAll(additionalSelectNodes); - List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size()); - for (AliasedNode aliasedNode : additionalSelectNodes) { - groupbyNodes.add(aliasedNode.getNode()); + + if (!isAggregate) { + subquery = NODE_FACTORY.select(subquery, subquery.isDistinct(), selectNodes, where); + } else { + List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + subquery.getGroupBy().size()); + for (AliasedNode aliasedNode : additionalSelectNodes) { + groupbyNodes.add(aliasedNode.getNode()); + } + groupbyNodes.addAll(subquery.getGroupBy()); + subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true); } - subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true); ParseNode onNode = conditionExtractor.getJoinCondition(); TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery); - JoinType joinType = topNode == node ? JoinType.Inner : JoinType.Left; + JoinType joinType = isTopNode ? JoinType.Inner : JoinType.Left; ParseNode ret = NODE_FACTORY.comparison(node.getFilterOp(), l.get(0), NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null)); - tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode); + tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, !isAggregate || isGroupby); - if (topNode == node) { + return ret; + } + + @Override + public ParseNode visitLeave(ArrayAnyComparisonNode node, List<ParseNode> l) throws SQLException { + List<ParseNode> children = leaveArrayComparisonNode(node, l); + if (children == l) + return super.visitLeave(node, l); + + node = NODE_FACTORY.arrayAny(children.get(0), (ComparisonParseNode) children.get(1)); + return node; + } + + @Override + public ParseNode visitLeave(ArrayAllComparisonNode node, List<ParseNode> l) throws SQLException { + List<ParseNode> children = leaveArrayComparisonNode(node, l); + if (children == l) + return super.visitLeave(node, l); + + node = NODE_FACTORY.arrayAll(children.get(0), (ComparisonParseNode) children.get(1)); + return node; + } + + protected List<ParseNode> leaveArrayComparisonNode(ParseNode node, List<ParseNode> l) throws SQLException { + boolean isTopNode = topNode == node; + if (isTopNode) { topNode = null; } + + ParseNode firstChild = l.get(0); + if (!(firstChild instanceof SubqueryParseNode)) { + return l; + } - return ret; + SubqueryParseNode subqueryNode = (SubqueryParseNode) firstChild; + SelectStatement subquery = subqueryNode.getSelectNode(); + String rhsTableAlias = ParseNodeFactory.createTempAlias(); + JoinConditionExtractor conditionExtractor = new JoinConditionExtractor(subquery, resolver, connection, rhsTableAlias); + ParseNode where = subquery.getWhere() == null ? null : subquery.getWhere().accept(conditionExtractor); + if (where == subquery.getWhere()) { // non-correlated any/all comparison subquery + return l; + } + + ParseNode rhsNode = null; + boolean isNonGroupByAggregate = subquery.getGroupBy().isEmpty() && subquery.isAggregate(); + List<AliasedNode> aliasedNodes = subquery.getSelect(); + String derivedTableAlias = null; + if (!subquery.getGroupBy().isEmpty()) { + derivedTableAlias = ParseNodeFactory.createTempAlias(); + aliasedNodes = fixAliasedNodes(aliasedNodes, false); + } + + if (aliasedNodes.size() == 1) { + rhsNode = derivedTableAlias == null ? aliasedNodes.get(0).getNode() : NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), aliasedNodes.get(0).getAlias(), null); + } else { + List<ParseNode> nodes = Lists.<ParseNode> newArrayListWithExpectedSize(aliasedNodes.size()); + for (AliasedNode aliasedNode : aliasedNodes) { + nodes.add(derivedTableAlias == null ? aliasedNode.getNode() : NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), aliasedNode.getAlias(), null)); + } + rhsNode = NODE_FACTORY.rowValueConstructor(nodes); + } + + if (!isNonGroupByAggregate) { + rhsNode = NODE_FACTORY.function(DistinctValueAggregateFunction.NAME, Collections.singletonList(rhsNode)); + } + + List<AliasedNode> additionalSelectNodes = conditionExtractor.getAdditionalSelectNodes(); + List<AliasedNode> selectNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + 1); + selectNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), rhsNode)); + selectNodes.addAll(additionalSelectNodes); + List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size()); + for (AliasedNode aliasedNode : additionalSelectNodes) { + groupbyNodes.add(aliasedNode.getNode()); + } + + if (derivedTableAlias == null) { + subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true); + } else { + List<ParseNode> derivedTableGroupBy = Lists.newArrayListWithExpectedSize(subquery.getGroupBy().size() + groupbyNodes.size()); + derivedTableGroupBy.addAll(subquery.getGroupBy()); + derivedTableGroupBy.addAll(groupbyNodes); + List<AliasedNode> derivedTableSelect = Lists.newArrayListWithExpectedSize(aliasedNodes.size() + selectNodes.size() - 1); + derivedTableSelect.addAll(aliasedNodes); + for (int i = 1; i < selectNodes.size(); i++) { + AliasedNode aliasedNode = selectNodes.get(i); + String alias = ParseNodeFactory.createTempAlias(); + derivedTableSelect.add(NODE_FACTORY.aliasedNode(alias, aliasedNode.getNode())); + aliasedNode = NODE_FACTORY.aliasedNode(aliasedNode.getAlias(), NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), alias, null)); + selectNodes.set(i, aliasedNode); + groupbyNodes.set(i - 1, aliasedNode.getNode()); + } + SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, derivedTableSelect, where, derivedTableGroupBy, true); + subquery = NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt)), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, subquery.hasSequence()); + } + + ParseNode onNode = conditionExtractor.getJoinCondition(); + TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery); + JoinType joinType = isTopNode ? JoinType.Inner : JoinType.Left; + tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false); + + firstChild = NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null); + if (isNonGroupByAggregate) { + firstChild = NODE_FACTORY.upsertStmtArrayNode(Collections.singletonList(firstChild)); + } + ComparisonParseNode secondChild = (ComparisonParseNode) l.get(1); + secondChild = NODE_FACTORY.comparison(secondChild.getFilterOp(), secondChild.getLHS(), NODE_FACTORY.elementRef(Lists.newArrayList(firstChild, NODE_FACTORY.literal(1)))); + + return Lists.newArrayList(firstChild, secondChild); } - private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes) { - List<AliasedNode> normNodes = Lists.<AliasedNode> newArrayListWithExpectedSize(nodes.size() + 1); - normNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), LiteralParseNode.ONE)); + private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) { + List<AliasedNode> normNodes = Lists.<AliasedNode> newArrayListWithExpectedSize(nodes.size() + (addSelectOne ? 1 : 0)); + if (addSelectOne) { + normNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), LiteralParseNode.ONE)); + } for (int i = 0; i < nodes.size(); i++) { AliasedNode aliasedNode = nodes.get(i); normNodes.add(NODE_FACTORY.aliasedNode( http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 02fc6e3..8e0d42d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -185,6 +185,7 @@ public class HashJoinRegionScanner implements RegionScanner { if (postFilter != null) { for (Iterator<Tuple> iter = resultQueue.iterator(); iter.hasNext();) { Tuple t = iter.next(); + postFilter.reset(); ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); try { if (!postFilter.evaluate(t, tempPtr)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 1ac9d68..bb3940c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -395,6 +395,7 @@ public class HashJoinPlan implements QueryPlan { private final int index; private final QueryPlan plan; private final List<Expression> hashExpressions; + private final boolean singleValueOnly; private final Expression keyRangeLhsExpression; private final Expression keyRangeRhsExpression; private final TupleProjector clientProjector; @@ -402,12 +403,14 @@ public class HashJoinPlan implements QueryPlan { public HashSubPlan(int index, QueryPlan subPlan, List<Expression> hashExpressions, + boolean singleValueOnly, Expression keyRangeLhsExpression, Expression keyRangeRhsExpression, TupleProjector clientProjector, boolean hasFilters) { this.index = index; this.plan = subPlan; this.hashExpressions = hashExpressions; + this.singleValueOnly = singleValueOnly; this.keyRangeLhsExpression = keyRangeLhsExpression; this.keyRangeRhsExpression = keyRangeRhsExpression; this.clientProjector = clientProjector; @@ -424,7 +427,7 @@ public class HashJoinPlan implements QueryPlan { ServerCache cache = null; if (hashExpressions != null) { cache = parent.hashClient.addHashCache(ranges, plan.iterator(), - clientProjector, plan.getEstimatedSize(), hashExpressions, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues); + clientProjector, plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues); long endTime = System.currentTimeMillis(); boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime); if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java index 85eb735..dd23534 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java @@ -37,23 +37,25 @@ public class ArrayConstructorExpression extends BaseCompoundExpression { // store the offset postion in this. Later based on the total size move this to a byte[] // and serialize into byte stream private int[] offsetPos; + + public ArrayConstructorExpression() { + } public ArrayConstructorExpression(List<Expression> children, PDataType baseType) { super(children); init(baseType); + } + + private void init(PDataType baseType) { + this.baseType = baseType; + elements = new Object[getChildren().size()]; estimatedSize = PArrayDataType.estimateSize(this.children.size(), this.baseType); if (!this.baseType.isFixedWidth()) { offsetPos = new int[children.size()]; byteStream = new TrustedByteArrayOutputStream(estimatedSize); } else { byteStream = new TrustedByteArrayOutputStream(estimatedSize); - } - - } - - private void init(PDataType baseType) { - this.baseType = baseType; - elements = new Object[getChildren().size()]; + } } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java index 1566d64..3219cfe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java @@ -32,6 +32,7 @@ import org.apache.phoenix.expression.function.ConvertTimezoneFunction; import org.apache.phoenix.expression.function.CountAggregateFunction; import org.apache.phoenix.expression.function.DecodeFunction; import org.apache.phoenix.expression.function.DistinctCountAggregateFunction; +import org.apache.phoenix.expression.function.DistinctValueAggregateFunction; import org.apache.phoenix.expression.function.EncodeFunction; import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction; import org.apache.phoenix.expression.function.FirstValueFunction; @@ -148,6 +149,7 @@ public enum ExpressionType { LowerFunction(LowerFunction.class), TrimFunction(TrimFunction.class), DistinctCountAggregateFunction(DistinctCountAggregateFunction.class), + DistinctValueAggregateFunction(DistinctValueAggregateFunction.class), PercentileContAggregateFunction(PercentileContAggregateFunction.class), PercentRankAggregateFunction(PercentRankAggregateFunction.class), StddevPopFunction(StddevPopFunction.class), http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java new file mode 100644 index 0000000..fefb077 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.aggregator; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.schema.PArrayDataType; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.PhoenixArray; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.tuple.Tuple; + +public class DistinctValueClientAggregator extends DistinctValueWithCountClientAggregator { + private final PDataType valueType; + private final PDataType resultType; + + public DistinctValueClientAggregator(SortOrder sortOrder, PDataType valueType, PDataType resultType) { + super(sortOrder); + this.valueType = valueType; + this.resultType = resultType; + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + if (buffer == null || buffer.length == 0) { + Object[] values = new Object[valueVsCount.size()]; + int i = 0; + for (ImmutableBytesPtr key : valueVsCount.keySet()) { + values[i++] = valueType.toObject(key, sortOrder); + } + PhoenixArray array = PArrayDataType.instantiatePhoenixArray(valueType, values); + buffer = resultType.toBytes(array, sortOrder); + } + ptr.set(buffer); + return true; + } + + @Override + protected PDataType getResultDataType() { + return resultType; + } + + @Override + protected int getBufferLength() { + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java index 4bd2a17..281879e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java @@ -54,7 +54,7 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator { private int compressThreshold; private byte[] buffer = null; - private Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>(); + protected Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>(); public DistinctValueWithCountServerAggregator(Configuration conf) { super(SortOrder.getDefault()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java new file mode 100644 index 0000000..6877409 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.function; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.DistinctValueClientAggregator; +import org.apache.phoenix.expression.aggregator.DistinctValueWithCountClientAggregator; +import org.apache.phoenix.expression.aggregator.DistinctValueWithCountServerAggregator; +import org.apache.phoenix.parse.FunctionParseNode.Argument; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; +import org.apache.phoenix.schema.PDataType; + +@BuiltInFunction(name=DistinctValueAggregateFunction.NAME, args= {@Argument()} ) +public class DistinctValueAggregateFunction extends DistinctValueWithCountAggregateFunction { + public static final String NAME = "COLLECTDISTINCT"; + + public DistinctValueAggregateFunction() { + } + + public DistinctValueAggregateFunction(List<Expression> children) { + super(children); + } + + @Override + public Aggregator newServerAggregator(Configuration conf) { + return new DistinctValueWithCountServerAggregator(conf); + } + + @Override + public DistinctValueWithCountClientAggregator newClientAggregator() { + PDataType baseType = getAggregatorExpression().getDataType().isArrayType() ? PDataType.VARBINARY : getAggregatorExpression().getDataType(); + PDataType resultType = PDataType.fromTypeId(baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE); + return new DistinctValueClientAggregator(getAggregatorExpression().getSortOrder(), baseType, resultType); + } + + @Override + public String getName() { + return NAME; + } + + @Override + public PDataType getDataType() { + PDataType baseType = getAggregatorExpression().getDataType().isArrayType() ? PDataType.VARBINARY : getAggregatorExpression().getDataType(); + return PDataType.fromTypeId(baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index 863b535..b6245ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -69,16 +69,16 @@ public class HashCacheClient { * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException { + public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException { /** * Serialize and compress hashCacheTable */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - serialize(ptr, iterator, projector, estimatedSize, onExpressions, keyRangeRhsExpression, keyRangeRhsValues); + serialize(ptr, iterator, projector, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues); return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef); } - private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException { + private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException { long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE); estimatedSize = Math.min(estimatedSize, maxSize); if (estimatedSize > Integer.MAX_VALUE) { @@ -93,6 +93,7 @@ public class HashCacheClient { WritableUtils.writeVInt(out, ExpressionType.valueOf(expression).ordinal()); expression.write(out); } + out.writeBoolean(singleValueOnly); int exprSize = baOut.size() + Bytes.SIZEOF_INT; out.writeInt(exprSize); int nRows = 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 257fa1f..697d862 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -27,9 +27,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.xerial.snappy.Snappy; - import org.apache.phoenix.cache.HashCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -67,6 +68,7 @@ public class HashCacheFactory implements ServerCacheFactory { private class HashCacheImpl implements HashCache { private final Map<ImmutableBytesPtr,List<Tuple>> hashCache; private final MemoryChunk memoryChunk; + private final boolean singleValueOnly; private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk) { try { @@ -83,6 +85,7 @@ public class HashCacheFactory implements ServerCacheFactory { expression.readFields(dataInput); onExpressions.add(expression); } + this.singleValueOnly = dataInput.readBoolean(); int exprSize = dataInput.readInt(); offset += exprSize; int nRows = dataInput.readInt(); @@ -117,8 +120,14 @@ public class HashCacheFactory implements ServerCacheFactory { } @Override - public List<Tuple> get(ImmutableBytesPtr hashKey) { - return hashCache.get(hashKey); + public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException { + List<Tuple> ret = hashCache.get(hashKey); + if (singleValueOnly && ret != null && ret.size() > 1) { + SQLException ex = new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException(); + ServerUtil.throwIOException(ex.getMessage(), ex); + } + + return ret; } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java index a51ca5c..5dd13f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java @@ -43,13 +43,15 @@ public class JoinTableNode extends TableNode { private final TableNode lhs; private final TableNode rhs; private final ParseNode onNode; + private final boolean singleValueOnly; - JoinTableNode(JoinType type, TableNode lhs, TableNode rhs, ParseNode onNode) { + JoinTableNode(JoinType type, TableNode lhs, TableNode rhs, ParseNode onNode, boolean singleValueOnly) { super(null); this.type = type; this.lhs = lhs; this.rhs = rhs; this.onNode = onNode; + this.singleValueOnly = singleValueOnly; } public JoinType getType() { @@ -67,6 +69,10 @@ public class JoinTableNode extends TableNode { public ParseNode getOnNode() { return onNode; } + + public boolean isSingleValueOnly() { + return singleValueOnly; + } @Override public <T> T accept(TableNodeVisitor<T> visitor) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 85fd978..4f5e7ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -417,7 +417,7 @@ public class ParseNodeFactory { public TableNode table(TableNode table, List<JoinPartNode> parts) { for (JoinPartNode part : parts) { - table = new JoinTableNode(part.getType(), table, part.getTable(), part.getOnNode()); + table = new JoinTableNode(part.getType(), table, part.getTable(), part.getOnNode(), false); } return table; } @@ -426,8 +426,8 @@ public class ParseNodeFactory { return new JoinPartNode(type, onNode, table); } - public JoinTableNode join(JoinType type, TableNode lhs, TableNode rhs, ParseNode on) { - return new JoinTableNode(type, lhs, rhs, on); + public JoinTableNode join(JoinType type, TableNode lhs, TableNode rhs, ParseNode on, boolean singleValueOnly) { + return new JoinTableNode(type, lhs, rhs, on, singleValueOnly); } public DerivedTableNode derivedTable (String alias, SelectStatement select) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java index 06ac1c6..338a45b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java @@ -561,7 +561,7 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> { if (lhsNode == normLhsNode && rhsNode == normRhsNode && onNode == normOnNode) return joinNode; - return NODE_FACTORY.join(joinNode.getType(), normLhsNode, normRhsNode, normOnNode); + return NODE_FACTORY.join(joinNode.getType(), normLhsNode, normRhsNode, normOnNode, joinNode.isSingleValueOnly()); } @Override
