Repository: phoenix Updated Branches: refs/heads/calcite 8c19e1c13 -> 1a18e8f87
PHOENIX-2225 Support Correlate (Nested-loop Join) in Phoenix/Calcite Integration; PHOENIX-2226 Support Semi-Join in Phoenix/Calcite Integration Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1a18e8f8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1a18e8f8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1a18e8f8 Branch: refs/heads/calcite Commit: 1a18e8f87aeb88ecca78134fb98a994f1caddf63 Parents: 8c19e1c Author: maryannxue <[email protected]> Authored: Thu Sep 10 10:46:37 2015 -0400 Committer: maryannxue <[email protected]> Committed: Thu Sep 10 10:46:37 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 237 +++++++++++++++++-- .../apache/phoenix/calcite/CalciteUtils.java | 64 +++++ .../calcite/jdbc/PhoenixPrepareImpl.java | 2 + .../calcite/metadata/PhoenixRelMdCollation.java | 11 + .../calcite/rel/PhoenixAbstractJoin.java | 22 -- .../calcite/rel/PhoenixAbstractSemiJoin.java | 41 ++++ .../phoenix/calcite/rel/PhoenixClientJoin.java | 2 +- .../calcite/rel/PhoenixClientSemiJoin.java | 119 ++++++++++ .../phoenix/calcite/rel/PhoenixCorrelate.java | 98 ++++++++ .../apache/phoenix/calcite/rel/PhoenixRel.java | 5 + .../calcite/rel/PhoenixRelImplementorImpl.java | 21 +- .../calcite/rel/PhoenixServerAggregate.java | 2 +- .../phoenix/calcite/rel/PhoenixServerJoin.java | 2 +- .../calcite/rel/PhoenixServerSemiJoin.java | 120 ++++++++++ .../phoenix/calcite/rel/PhoenixTableScan.java | 31 ++- .../rel/PhoenixToEnumerableConverter.java | 3 +- .../phoenix/calcite/rel/PhoenixUncollect.java | 4 + .../phoenix/calcite/rel/PhoenixValues.java | 3 + .../calcite/rules/PhoenixConverterRules.java | 112 ++++++++- .../rules/PhoenixFilterScanMergeRule.java | 23 +- .../apache/phoenix/execute/AggregatePlan.java | 2 +- .../org/apache/phoenix/execute/ScanPlan.java | 2 +- .../phoenix/execute/SortMergeJoinPlan.java | 8 +- .../phoenix/calcite/ToExpressionTest.java | 3 +- 24 files changed, 867 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 2b8352d..cfec6fb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -328,18 +328,21 @@ public class CalciteIT extends BaseClientManagedTimeIT { try { conn.createStatement().execute( "CREATE TABLE " + SCORES_TABLE_NAME - + "(student_id INTEGER PRIMARY KEY, scores INTEGER[])"); + + "(student_id INTEGER NOT NULL, subject_id INTEGER NOT NULL, scores INTEGER[] CONSTRAINT pk PRIMARY KEY (student_id, subject_id))"); PreparedStatement stmt = conn.prepareStatement( "UPSERT INTO " + SCORES_TABLE_NAME - + " VALUES(?, ?)"); + + " VALUES(?, ?, ?)"); stmt.setInt(1, 1); - stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {85, 80, 82})); + stmt.setInt(2, 1); + stmt.setArray(3, conn.createArrayOf("INTEGER", new Integer[] {85, 80, 82})); stmt.execute(); stmt.setInt(1, 2); - stmt.setArray(2, null); + stmt.setInt(2, 1); + stmt.setArray(3, null); stmt.execute(); stmt.setInt(1, 3); - stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {87, 88, 80})); + stmt.setInt(2, 2); + stmt.setArray(3, conn.createArrayOf("INTEGER", new Integer[] {87, 88, 80})); stmt.execute(); conn.commit(); } catch (TableAlreadyExistsException e) { @@ -860,26 +863,6 @@ public class CalciteIT extends BaseClientManagedTimeIT { .resultIs(new Object[][] {{1}, {2}}) .close(); } - - @Test public void testSubquery() { - start().sql("SELECT \"order_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")") - .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixClientProject(order_id=[$0], QUANTITY=[$4])\n" + - " PhoenixToClientConverter\n" + - " PhoenixServerJoin(condition=[AND(=($2, $7), =($4, $8))], joinType=[inner])\n" + - " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + - " PhoenixServerAggregate(group=[{7}], EXPR$0=[MAX($4)])\n" + - " PhoenixServerJoin(condition=[=($7, $2)], joinType=[inner])\n" + - " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + - " PhoenixServerAggregate(group=[{2}])\n" + - " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n") - .resultIs(new Object[][]{ - {"000000000000001", 1000}, - {"000000000000003", 3000}, - {"000000000000004", 4000}, - {"000000000000005", 5000}}) - .close(); - } @Test public void testScalarSubquery() { start().sql("select \"item_id\", name, (select max(quantity) sq \n" @@ -1039,7 +1022,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixUncollect\n" + " PhoenixToClientConverter\n" + - " PhoenixServerProject(EXPR$0=[$1])\n" + + " PhoenixServerProject(EXPR$0=[$2])\n" + " PhoenixTableScan(table=[[phoenix, SCORES]])\n") .resultIs(new Object[][] { {85}, @@ -1049,6 +1032,208 @@ public class CalciteIT extends BaseClientManagedTimeIT { {88}, {80}}) .close(); + start().sql("SELECT s.student_id, t.score FROM " + SCORES_TABLE_NAME + " s, UNNEST((SELECT scores FROM " + SCORES_TABLE_NAME + " s2 where s.student_id = s2.student_id)) AS t(score)") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(STUDENT_ID=[$0], SCORE=[$3])\n" + + " PhoenixCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{0}])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, SCORES]])\n" + + " PhoenixUncollect\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(EXPR$0=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, SCORES]], filter=[=($cor0.STUDENT_ID, $0)])\n") + .resultIs(new Object[][] { + {1, 85}, + {1, 80}, + {1, 82}, + {3, 87}, + {3, 88}, + {3, 80}}) + .close(); + } + + @Test public void testCorrelateAndDecorrelation() { + Properties correlProps = new Properties(); + correlProps.setProperty("forceDecorrelate", Boolean.FALSE.toString()); + Properties decorrelProps = new Properties(); + decorrelProps.setProperty("forceDecorrelate", Boolean.TRUE.toString()); + + String q1 = "select \"order_id\", quantity from " + JOIN_ORDER_TABLE_FULL_NAME + " o where quantity = (select max(quantity) from " + JOIN_ORDER_TABLE_FULL_NAME + " o2 where o.\"item_id\" = o2.\"item_id\")"; + Object[][] r1 = new Object[][] { + {"000000000000001", 1000}, + {"000000000000003", 3000}, + {"000000000000004", 4000}, + {"000000000000005", 5000}}; + String p1Correlate = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(order_id=[$0], QUANTITY=[$4])\n" + + " PhoenixFilter(condition=[=($4, $7)])\n" + + " PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{2}])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[MAX($4)])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]], filter=[=($cor0.item_id, $2)])\n"; + String p1Decorrelated = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(order_id=[$0], QUANTITY=[$4])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[AND(=($2, $7), =($4, $8))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixServerAggregate(group=[{7}], EXPR$0=[MAX($4)])\n" + + " PhoenixServerJoin(condition=[=($7, $2)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixServerAggregate(group=[{2}])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n"; + start(correlProps, false).sql(q1).explainIs(p1Correlate).resultIs(r1).close(); + start(decorrelProps, false).sql(q1).explainIs(p1Decorrelated).resultIs(r1).close(); + + String q2 = "select name from " + JOIN_ITEM_TABLE_FULL_NAME + " i where price = (select max(price) from " + JOIN_ITEM_TABLE_FULL_NAME + " i2 where i.\"item_id\" = i2.\"item_id\" and i.name = i2.name and i2.\"item_id\" <> 'invalid001')"; + Object[][] r2 = new Object[][]{ + {"T1"}, + {"T2"}, + {"T3"}, + {"T4"}, + {"T5"}, + {"T6"}}; + String p2Correlate = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(NAME=[$1])\n" + + " PhoenixFilter(condition=[=($2, $7)])\n" + + " PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{0, 1}])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[MAX($2)])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]], filter=[AND(=($cor0.item_id, $0), =($cor0.NAME, $1), <>($0, 'invalid001'))])\n"; + String p2Decorrelated = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(NAME=[$1])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[AND(=($0, $7), =($1, $8), =($2, $9))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixServerAggregate(group=[{0, 1}], EXPR$0=[MAX($4)])\n" + + " PhoenixServerJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])\n" + + " PhoenixServerProject(item_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]], filter=[<>($0, 'invalid001')])\n"; + start(correlProps, false).sql(q2).explainIs(p2Correlate).resultIs(r2).close(); + start(decorrelProps, false).sql(q2).explainIs(p2Decorrelated).resultIs(r2).close(); + + String q3a = "select \"item_id\", name from " + JOIN_ITEM_TABLE_FULL_NAME + " i where exists (select 1 from " + JOIN_ORDER_TABLE_FULL_NAME + " o where i.\"item_id\" = o.\"item_id\")"; + Object[][] r3a = new Object[][] { + {"0000000001", "T1"}, + {"0000000002", "T2"}, + {"0000000003", "T3"}, + {"0000000006", "T6"}}; + String p3aCorrelate = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(item_id=[$0], NAME=[$1])\n" + + " PhoenixFilter(condition=[IS NOT NULL($7)])\n" + + " PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{0}])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixServerAggregate(group=[{}], agg#0=[MIN($0)])\n" + + " PhoenixServerProject($f0=[true])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]], filter=[=($cor0.item_id, $2)])\n"; + String p3aDecorrelated = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(item_id=[$0], NAME=[$1])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerSemiJoin(condition=[=($0, $8)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixClientProject($f0=[true], item_id0=[$7])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[=($7, $2)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(item_id=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n"; + start(correlProps, false).sql(q3a).explainIs(p3aCorrelate).resultIs(r3a).close(); + start(decorrelProps, false).sql(q3a).explainIs(p3aDecorrelated).resultIs(r3a).close(); + // Test PhoenixClientSemiJoin + String q3b = "select \"item_id\", name from " + JOIN_ITEM_TABLE_FULL_NAME + " i where exists (select 1 from " + JOIN_ITEM_TABLE_FULL_NAME + " o where i.\"item_id\" = o.\"item_id\" and name <> 'INVALID-1')"; + Object[][] r3b = new Object[][] { + {"0000000001", "T1"}, + {"0000000002", "T2"}, + {"0000000003", "T3"}, + {"0000000004", "T4"}, + {"0000000005", "T5"}, + {"0000000006", "T6"}}; + String p3bDecorrelated = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(item_id=[$0], NAME=[$1])\n" + + " PhoenixClientSemiJoin(condition=[=($0, $8)], joinType=[inner])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixClientProject($f0=[true], item_id0=[$0])\n" + + " PhoenixClientJoin(condition=[=($0, $1)], joinType=[inner])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(item_id=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]], filter=[<>(CAST($1):VARCHAR(9) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", 'INVALID-1')])\n"; + start(decorrelProps, false).sql(q3b).explainIs(p3bDecorrelated).resultIs(r3b).close(); + + String q4 = "select \"item_id\", name from " + JOIN_ITEM_TABLE_FULL_NAME + " i where \"item_id\" in (select \"item_id\" from " + JOIN_ORDER_TABLE_FULL_NAME + ")"; + Object[][] r4 = new Object[][] { + {"0000000001", "T1"}, + {"0000000002", "T2"}, + {"0000000003", "T3"}, + {"0000000006", "T6"}}; + String p4Decorrelated = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(item_id=[$0], NAME=[$1])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerSemiJoin(condition=[=($2, $5)], joinType=[inner])\n" + + " PhoenixServerProject($f0=[$0], $f1=[$1], $f7=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n"; + start(decorrelProps, false).sql(q4).explainIs(p4Decorrelated).resultIs(r4).close(); + + // CALCITE-864: switching orders and items in the first join wouldn't work. + String q5 = "select \"order_id\" from " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " + JOIN_ORDER_TABLE_FULL_NAME + " o on o.\"item_id\" = i.\"item_id\" where quantity = (select max(quantity) from " + JOIN_ORDER_TABLE_FULL_NAME + " o2 JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i2 on o2.\"item_id\" = i2.\"item_id\" where i.\"supplier_id\" = i2.\"supplier_id\")"; + Object [][] r5 = new Object[][] { + {"000000000000003"}, + {"000000000000005"}, + {"000000000000004"}}; + String p5Correlate = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(order_id=[$7])\n" + + " PhoenixFilter(condition=[=($11, $14)])\n" + + " PhoenixCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{5}])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[=($9, $0)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[MAX($4)])\n" + + " PhoenixServerJoin(condition=[=($2, $7)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]], filter=[=($cor0.supplier_id, $5)])\n"; + String p5Decorrelated = + "PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(order_id=[$7])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[AND(=($9, $0), =($5, $14))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[=($4, $8)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixServerAggregate(group=[{14}], EXPR$0=[MAX($4)])\n" + + " PhoenixServerJoin(condition=[=($2, $7)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[=($7, $5)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixServerAggregate(group=[{5}])\n" + + " PhoenixServerJoin(condition=[=($9, $0)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n"; + start(correlProps, false).sql(q5).explainIs(p5Correlate).resultIs(r5).close(); + start(decorrelProps, false).sql(q5).explainIs(p5Decorrelated).resultIs(r5).close(); } @Test public void testSelectFromView() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index d3666d2..ef67de0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -7,10 +7,14 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.calcite.avatica.util.ByteString; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SemiJoinType; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlKind; @@ -71,6 +75,7 @@ import org.apache.phoenix.expression.function.RoundTimestampExpression; import org.apache.phoenix.expression.function.SqrtFunction; import org.apache.phoenix.expression.function.TrimFunction; import org.apache.phoenix.expression.function.UpperFunction; +import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.types.PDataType; @@ -100,6 +105,48 @@ public class CalciteUtils { public static PDataType sqlTypeNameToPDataType(SqlTypeName sqlTypeName) { return PDataType.fromTypeId(sqlTypeName.getJdbcOrdinal()); } + + public static JoinType convertJoinType(JoinRelType type) { + JoinType ret = null; + switch (type) { + case INNER: + ret = JoinType.Inner; + break; + case LEFT: + ret = JoinType.Left; + break; + case RIGHT: + ret = JoinType.Right; + break; + case FULL: + ret = JoinType.Full; + break; + default: + } + + return ret; + } + + public static JoinType convertSemiJoinType(SemiJoinType type) { + JoinType ret = null; + switch (type) { + case INNER: + ret = JoinType.Inner; + break; + case LEFT: + ret = JoinType.Left; + break; + case SEMI: + ret = JoinType.Semi; + break; + case ANTI: + ret = JoinType.Anti; + break; + default: + } + + return ret; + } private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = Maps .newHashMapWithExpectedSize(ExpressionType.values().length); @@ -513,6 +560,21 @@ public class CalciteUtils { } }); + EXPRESSION_MAP.put(SqlKind.FIELD_ACCESS, new ExpressionFactory() { + @SuppressWarnings("rawtypes") + @Override + public Expression newExpression(RexNode node, Implementor implementor) { + RexFieldAccess fieldAccess = (RexFieldAccess) node; + RexNode refExpr = fieldAccess.getReferenceExpr(); + if (refExpr.getKind() != SqlKind.CORREL_VARIABLE) { + throw new UnsupportedOperationException("Non-correl-variable as reference expression of RexFieldAccess."); + } + String varId = ((RexCorrelVariable) refExpr).getName(); + int index = fieldAccess.getField().getIndex(); + PDataType type = sqlTypeNameToPDataType(node.getType().getSqlTypeName()); + return implementor.newFieldAccessExpression(varId, index, type); + } + }); EXPRESSION_MAP.put(SqlKind.CAST, new ExpressionFactory() { @SuppressWarnings("rawtypes") @@ -629,6 +691,7 @@ public class CalciteUtils { } }); EXPRESSION_MAP.put(SqlKind.CEIL, new ExpressionFactory() { + @SuppressWarnings("rawtypes") @Override public Expression newExpression(RexNode node, Implementor implementor) { //TODO Phoenix only support separate arguments. @@ -652,6 +715,7 @@ public class CalciteUtils { } }); EXPRESSION_MAP.put(SqlKind.FLOOR, new ExpressionFactory() { + @SuppressWarnings("rawtypes") @Override public Expression newExpression(RexNode node, Implementor implementor) { // TODO Phoenix only support separate arguments. http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java index 226cf70..c0d3383 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java @@ -1,5 +1,6 @@ package org.apache.phoenix.calcite.jdbc; +import org.apache.calcite.adapter.enumerable.EnumerableRules; import org.apache.calcite.jdbc.CalcitePrepare; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.RelOptCostFactory; @@ -48,6 +49,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl { RelOptCostFactory costFactory) { RelOptPlanner planner = super.createPlanner(prepareContext, externalContext, costFactory); + planner.removeRule(EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE); planner.removeRule(JoinCommuteRule.INSTANCE); planner.addRule(JoinCommuteRule.SWAP_OUTER); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java index 821d7b9..1b559f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java @@ -10,9 +10,11 @@ import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdCollation; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.sql.SemiJoinType; import org.apache.calcite.util.BuiltInMethod; import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.calcite.rel.PhoenixClientJoin; +import org.apache.phoenix.calcite.rel.PhoenixCorrelate; import org.apache.phoenix.calcite.rel.PhoenixLimit; import org.apache.phoenix.calcite.rel.PhoenixServerJoin; @@ -25,6 +27,10 @@ public class PhoenixRelMdCollation { private PhoenixRelMdCollation() { } + public ImmutableList<RelCollation> collations(PhoenixCorrelate correlate) { + return ImmutableList.copyOf(correlate(correlate.getLeft(), correlate.getRight(), correlate.getJoinType())); + } + public ImmutableList<RelCollation> collations(PhoenixLimit limit) { return ImmutableList.copyOf(RelMdCollation.limit(limit.getInput())); } @@ -37,6 +43,11 @@ public class PhoenixRelMdCollation { return ImmutableList.copyOf(PhoenixRelMdCollation.mergeJoin(join.getLeft(), join.getRight(), join.joinInfo.leftKeys, join.joinInfo.rightKeys)); } + /** Helper method to determine a {@link PhoenixCorrelate}'s collation. */ + public static List<RelCollation> correlate(RelNode left, RelNode right, SemiJoinType joinType) { + return RelMetadataQuery.collations(left); + } + /** Helper method to determine a {@link PhoenixServerJoin}'s collation. */ public static List<RelCollation> hashJoin(RelNode left, RelNode right, JoinRelType joinType) { return RelMetadataQuery.collations(left); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java index 829c401..3355ee2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java @@ -16,7 +16,6 @@ import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; -import org.apache.phoenix.parse.JoinTableNode.JoinType; /** * Implementation of {@link org.apache.calcite.rel.core.Join} @@ -57,25 +56,4 @@ abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel { return plan; } - - protected static JoinType convertJoinType(JoinRelType type) { - JoinType ret = null; - switch (type) { - case INNER: - ret = JoinType.Inner; - break; - case LEFT: - ret = JoinType.Left; - break; - case RIGHT: - ret = JoinType.Right; - break; - case FULL: - ret = JoinType.Full; - break; - default: - } - - return ret; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java new file mode 100644 index 0000000..e788a75 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java @@ -0,0 +1,41 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.Iterator; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; + +abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements PhoenixRel { + + protected PhoenixAbstractSemiJoin(RelOptCluster cluster, RelTraitSet traitSet, + RelNode left, RelNode right, RexNode condition, + ImmutableIntList leftKeys, ImmutableIntList rightKeys) { + super(cluster, traitSet, left, right, condition, leftKeys, rightKeys); + } + + protected QueryPlan implementInput(Implementor implementor, int index, List<Expression> conditionExprs) { + assert index <= 1; + + PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right; + ImmutableIntList keys = index == 0 ? leftKeys : rightKeys; + QueryPlan plan = implementor.visitInput(0, input); + for (Iterator<Integer> iter = keys.iterator(); iter.hasNext();) { + Integer i = iter.next(); + conditionExprs.add(implementor.newColumnExpression(i)); + } + if (conditionExprs.isEmpty()) { + conditionExprs.add(LiteralExpression.newConstant(0)); + } + + return plan; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java index c6cf214..599661c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java @@ -120,7 +120,7 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin { PTable rightTable = implementor.getTableRef().getTable(); implementor.popContext(); - JoinType type = convertJoinType(getJoinType()); + JoinType type = CalciteUtils.convertJoinType(getJoinType()); PTable joinedTable; try { joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java new file mode 100644 index 0000000..87dc44d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java @@ -0,0 +1,119 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.SortMergeJoinPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + +public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements + PhoenixRel { + + public static PhoenixClientSemiJoin create( + final RelNode left, final RelNode right, RexNode condition) { + RelOptCluster cluster = left.getCluster(); + final JoinInfo joinInfo = JoinInfo.of(left, right, condition); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixRel.CLIENT_CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return PhoenixRelMdCollation.mergeJoin(left, right, joinInfo.leftKeys, joinInfo.rightKeys); + } + }); + return new PhoenixClientSemiJoin(cluster, traits, left, right, condition, + joinInfo.leftKeys, joinInfo.rightKeys); + } + + private PhoenixClientSemiJoin(RelOptCluster cluster, RelTraitSet traitSet, + RelNode left, RelNode right, RexNode condition, + ImmutableIntList leftKeys, ImmutableIntList rightKeys) { + super(cluster, traitSet, left, right, condition, leftKeys, rightKeys); + } + + @Override + public SemiJoin copy(RelTraitSet traitSet, RexNode condition, + RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { + assert joinType == JoinRelType.INNER; + return create(left, right, condition); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + if (getLeft().getConvention() != PhoenixRel.CLIENT_CONVENTION + || getRight().getConvention() != PhoenixRel.CLIENT_CONVENTION) + return planner.getCostFactory().makeInfiniteCost(); + + if ((!leftKeys.isEmpty() && !RelCollations.contains(RelMetadataQuery.collations(getLeft()), leftKeys)) + || (!rightKeys.isEmpty() && !RelCollations.contains(RelMetadataQuery.collations(getRight()), rightKeys))) + return planner.getCostFactory().makeInfiniteCost(); + + double rowCount = RelMetadataQuery.getRowCount(this); + + double leftRowCount = RelMetadataQuery.getRowCount(getLeft()); + if (Double.isInfinite(leftRowCount)) { + rowCount = leftRowCount; + } else { + rowCount += leftRowCount; + double rightRowCount = RelMetadataQuery.getRowCount(getRight()); + if (Double.isInfinite(rightRowCount)) { + rowCount = rightRowCount; + } else { + rowCount += rightRowCount; + } + } + RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0); + + return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<Expression> leftExprs = Lists.<Expression> newArrayList(); + List<Expression> rightExprs = Lists.<Expression> newArrayList(); + + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns() && getJoinType() != JoinRelType.FULL, true)); + QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); + TableRef joinedTable = implementor.getTableRef(); + implementor.popContext(); + + implementor.pushContext(new ImplementorContext(false, true)); + QueryPlan rightPlan = implementInput(implementor, 1, rightExprs); + implementor.popContext(); + + JoinType type = JoinType.Semi; + implementor.setTableRef(joinedTable); + PhoenixStatement stmt = leftPlan.getContext().getStatement(); + ColumnResolver resolver = leftPlan.getContext().getResolver(); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + + return new SortMergeJoinPlan(context, leftPlan.getStatement(), + joinedTable, type, leftPlan, rightPlan, leftExprs, rightExprs, + joinedTable.getTable(), joinedTable.getTable(), null, 0, false); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java new file mode 100644 index 0000000..430e282 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java @@ -0,0 +1,98 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.sql.SemiJoinType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; +import org.apache.phoenix.compile.JoinCompiler; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.CorrelatePlan; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.base.Supplier; + +public class PhoenixCorrelate extends Correlate implements PhoenixRel { + + public static PhoenixCorrelate create(final RelNode left, final RelNode right, + CorrelationId correlationId, ImmutableBitSet requiredColumns, + final SemiJoinType joinType) { + RelOptCluster cluster = left.getCluster(); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixRel.CLIENT_CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return PhoenixRelMdCollation.correlate(left, right, joinType); + } + }); + return new PhoenixCorrelate(cluster, traits, left, right, correlationId, + requiredColumns, joinType); + } + + private PhoenixCorrelate(RelOptCluster cluster, RelTraitSet traits, + RelNode left, RelNode right, CorrelationId correlationId, + ImmutableBitSet requiredColumns, SemiJoinType joinType) { + super(cluster, traits, left, right, correlationId, requiredColumns, + joinType); + } + + @Override + public Correlate copy(RelTraitSet traitSet, RelNode left, RelNode right, + CorrelationId correlationId, ImmutableBitSet requiredColumns, + SemiJoinType joinType) { + return create(left, right, correlationId, requiredColumns, joinType); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + if (getLeft().getConvention() != PhoenixRel.CLIENT_CONVENTION + || getRight().getConvention() != PhoenixRel.CLIENT_CONVENTION) + return planner.getCostFactory().makeInfiniteCost(); + + return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true)); + QueryPlan leftPlan = implementor.visitInput(0, (PhoenixRel) getLeft()); + PTable leftTable = implementor.getTableRef().getTable(); + implementor.popContext(); + + implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), implementor.getTableRef()); + + implementor.pushContext(new ImplementorContext(false, true)); + QueryPlan rightPlan = implementor.visitInput(1, (PhoenixRel) getRight()); + PTable rightTable = implementor.getTableRef().getTable(); + implementor.popContext(); + + JoinType type = CalciteUtils.convertSemiJoinType(getJoinType()); + PTable joinedTable; + try { + joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + TableRef tableRef = new TableRef(joinedTable); + implementor.setTableRef(tableRef); + + return new CorrelatePlan(leftPlan, rightPlan, getCorrelVariable(), + type, false, implementor.getRuntimeContext(), joinedTable, + leftTable, rightTable, leftTable.getColumns().size() - leftTable.getPKColumns().size()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java index 1136ea6..305af62 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java @@ -8,11 +8,13 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.execute.RuntimeContext; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PDataType; /** * Relational expression in Phoenix. @@ -68,6 +70,9 @@ public interface PhoenixRel extends RelNode { interface Implementor { QueryPlan visitInput(int i, PhoenixRel input); ColumnExpression newColumnExpression(int index); + @SuppressWarnings("rawtypes") + Expression newFieldAccessExpression(String variableId, int index, PDataType type); + RuntimeContext getRuntimeContext(); void setTableRef(TableRef tableRef); TableRef getTableRef(); void pushContext(ImplementorContext context); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index 28b4f51..d4b304a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@ -14,8 +14,10 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.TupleProjectionCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.RuntimeContext; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.schema.ColumnRef; @@ -28,14 +30,17 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PDataType; import com.google.common.collect.Lists; public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { + private final RuntimeContext runtimeContext; private TableRef tableRef; private Stack<ImplementorContext> contextStack; - public PhoenixRelImplementorImpl() { + public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; this.contextStack = new Stack<ImplementorContext>(); pushContext(new ImplementorContext(true, false)); } @@ -50,7 +55,19 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { ColumnRef colRef = new ColumnRef(this.tableRef, index); return colRef.newColumnExpression(); } - + + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + TableRef variableDef = runtimeContext.getCorrelateVariableDef(variableId); + Expression fieldAccessExpr = new ColumnRef(variableDef, index).newColumnExpression(); + return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); + } + + @Override + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } @Override public void setTableRef(TableRef tableRef) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java index 8c0c83a..a707b14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java @@ -74,7 +74,7 @@ public class PhoenixServerAggregate extends PhoenixAbstractAggregate { GroupBy groupBy = super.getGroupBy(implementor); super.serializeAggregators(implementor, context, groupBy.isEmpty()); - QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null); + QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null, basePlan.getDynamicFilter()); if (hashJoinPlan != null) { aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java index 1935251..047deac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java @@ -114,7 +114,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin { PTable rightTable = implementor.getTableRef().getTable(); implementor.popContext(); - JoinType type = convertJoinType(getJoinType()); + JoinType type = CalciteUtils.convertJoinType(getJoinType()); PTable joinedTable; try { joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java new file mode 100644 index 0000000..4593bdb --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java @@ -0,0 +1,120 @@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + +public class PhoenixServerSemiJoin extends PhoenixAbstractSemiJoin { + + public static PhoenixServerSemiJoin create( + final RelNode left, final RelNode right, RexNode condition) { + RelOptCluster cluster = left.getCluster(); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixRel.SERVERJOIN_CONVENTION) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return PhoenixRelMdCollation.hashJoin(left, right, JoinRelType.INNER); + } + }); + final JoinInfo joinInfo = JoinInfo.of(left, right, condition); + assert joinInfo.isEqui(); + return new PhoenixServerSemiJoin(cluster, traits, left, right, condition, + joinInfo.leftKeys, joinInfo.rightKeys); + } + + private PhoenixServerSemiJoin(RelOptCluster cluster, RelTraitSet traitSet, + RelNode left, RelNode right, RexNode condition, + ImmutableIntList leftKeys, ImmutableIntList rightKeys) { + super(cluster, traitSet, left, right, condition, leftKeys, rightKeys); + } + + @Override + public SemiJoin copy(RelTraitSet traitSet, RexNode condition, + RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { + assert joinType == JoinRelType.INNER; + return create(left, right, condition); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + if (getLeft().getConvention() != PhoenixRel.SERVER_CONVENTION + || getRight().getConvention() != PhoenixRel.CLIENT_CONVENTION) + return planner.getCostFactory().makeInfiniteCost(); + + //TODO return infinite cost if RHS size exceeds memory limit. + + double rowCount = RelMetadataQuery.getRowCount(this); + + double leftRowCount = RelMetadataQuery.getRowCount(getLeft()); + if (Double.isInfinite(leftRowCount)) { + rowCount = leftRowCount; + } else { + rowCount += leftRowCount; + double rightRowCount = RelMetadataQuery.getRowCount(getRight()); + if (Double.isInfinite(rightRowCount)) { + rowCount = rightRowCount; + } else { + rowCount += Util.nLogN(rightRowCount); + } + } + + RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0); + + return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<Expression> leftExprs = Lists.<Expression> newArrayList(); + List<Expression> rightExprs = Lists.<Expression> newArrayList(); + + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true)); + QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); + TableRef joinedTable = implementor.getTableRef(); + implementor.popContext(); + + implementor.pushContext(new ImplementorContext(false, true)); + QueryPlan rightPlan = implementInput(implementor, 1, rightExprs); + implementor.popContext(); + + JoinType type = JoinType.Semi; + implementor.setTableRef(joinedTable); + @SuppressWarnings("unchecked") + HashJoinInfo hashJoinInfo = new HashJoinInfo( + joinedTable.getTable(), + new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, + (List<Expression>[]) (new List[] {leftExprs}), + new JoinType[] {type}, new boolean[] {true}, + new PTable[] {null}, new int[] {0}, null, null); + + return HashJoinPlan.create((SelectStatement) (leftPlan.getStatement()), leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)}); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 171dd26..5bf9569 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@ -32,6 +32,7 @@ import org.apache.phoenix.compile.WhereOptimizer; import org.apache.phoenix.execute.ScanPlan; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.SelectStatement; @@ -40,6 +41,7 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; import com.google.common.base.Supplier; @@ -88,7 +90,24 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); PTable pTable = phoenixTable.getTable(); TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); - Implementor tmpImplementor = new PhoenixRelImplementorImpl(); + // We use a implementor with a special implementation for field access + // here, which translates RexFieldAccess into a LiteralExpression + // with a sample value. This will achieve 3 goals at a time: + // 1) avoid getting exception when translating RexFieldAccess at this + // time when the correlate variable has not been defined yet. + // 2) get a guess of ScanRange even if the runtime value is absent. + // 3) test whether this dynamic filter is worth a recompile at runtime. + Implementor tmpImplementor = new PhoenixRelImplementorImpl(null) { + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + try { + return LiteralExpression.newConstant(type.getSampleValue(), type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }; tmpImplementor.setTableRef(tableRef); SelectStatement select = SelectStatement.SELECT_ONE; PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); @@ -163,10 +182,18 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { ColumnResolver resolver = FromCompiler.getResolver(tableRef); StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); SelectStatement select = SelectStatement.SELECT_ONE; + Expression dynamicFilter = null; if (filter != null) { Expression filterExpr = CalciteUtils.toExpression(filter, implementor); filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); WhereCompiler.setScanFilter(context, select, filterExpr, true, false); + // TODO This is not absolutely strict. We may have a filter like: + // pk = '0' and pk = $cor0 where $cor0 happens to get a sample value + // as '0', thus making the below test return false and adding an + // unnecessary dynamic filter. This would only be a performance bug though. + if (!context.getScanRanges().equals(this.scanRanges)) { + dynamicFilter = filterExpr; + } } projectAllColumnFamilies(context.getScan(), phoenixTable.getTable()); if (implementor.getCurrentContext().forceProject()) { @@ -178,7 +205,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { Integer limit = null; OrderBy orderBy = OrderBy.EMPTY_ORDER_BY; ParallelIteratorFactory iteratorFactory = null; - return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true); + return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, dynamicFilter); } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java index e4cd07d..b171dac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java @@ -27,6 +27,7 @@ import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.execute.DelegateQueryPlan; +import org.apache.phoenix.execute.RuntimeContextImpl; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; @@ -83,7 +84,7 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume } static QueryPlan makePlan(PhoenixRel rel) { - final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl(); + final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl(new RuntimeContextImpl()); final QueryPlan plan = phoenixImplementor.visitInput(0, rel); return new DelegateQueryPlan(plan) { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java index aa53ae4..d39f3a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java @@ -13,6 +13,8 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.UnnestArrayPlan; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; public class PhoenixUncollect extends Uncollect implements PhoenixRel { @@ -53,6 +55,8 @@ public class PhoenixUncollect extends Uncollect implements PhoenixRel { } catch (SQLException e) { throw new RuntimeException(e); } + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); return new UnnestArrayPlan(plan, arrayExpression, false); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java index 89aaa07..22284e6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java @@ -40,6 +40,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; @@ -116,6 +117,8 @@ public class PhoenixValues extends Values implements PhoenixRel { TupleProjector projector = implementor.project(exprs); literalResult.add(projector.projectResults(baseTuple)); } + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); try { PhoenixStatement stmt = new PhoenixStatement(phoenixConnection); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index 210306d..c60c27b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -18,15 +18,18 @@ import org.apache.calcite.rel.convert.ConverterRule; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Aggregate.Group; import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Correlate; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.SemiJoin; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.core.Uncollect; import org.apache.calcite.rel.core.Union; import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalCorrelate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; @@ -40,13 +43,16 @@ import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate; import org.apache.phoenix.calcite.rel.PhoenixClientAggregate; import org.apache.phoenix.calcite.rel.PhoenixClientJoin; import org.apache.phoenix.calcite.rel.PhoenixClientProject; +import org.apache.phoenix.calcite.rel.PhoenixClientSemiJoin; import org.apache.phoenix.calcite.rel.PhoenixClientSort; +import org.apache.phoenix.calcite.rel.PhoenixCorrelate; import org.apache.phoenix.calcite.rel.PhoenixFilter; import org.apache.phoenix.calcite.rel.PhoenixLimit; import org.apache.phoenix.calcite.rel.PhoenixRel; import org.apache.phoenix.calcite.rel.PhoenixServerAggregate; import org.apache.phoenix.calcite.rel.PhoenixServerJoin; import org.apache.phoenix.calcite.rel.PhoenixServerProject; +import org.apache.phoenix.calcite.rel.PhoenixServerSemiJoin; import org.apache.phoenix.calcite.rel.PhoenixServerSort; import org.apache.phoenix.calcite.rel.PhoenixToClientConverter; import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter; @@ -85,8 +91,11 @@ public class PhoenixConverterRules { PhoenixUnionRule.INSTANCE, PhoenixClientJoinRule.INSTANCE, PhoenixServerJoinRule.INSTANCE, + PhoenixClientSemiJoinRule.INSTANCE, + PhoenixServerSemiJoinRule.INSTANCE, PhoenixValuesRule.INSTANCE, PhoenixUncollectRule.INSTANCE, + PhoenixCorrelateRule.INSTANCE, }; public static final RelOptRule[] CONVERTIBLE_RULES = { @@ -106,8 +115,11 @@ public class PhoenixConverterRules { PhoenixUnionRule.CONVERTIBLE, PhoenixClientJoinRule.CONVERTIBLE, PhoenixServerJoinRule.CONVERTIBLE, + PhoenixClientSemiJoinRule.INSTANCE, + PhoenixServerSemiJoinRule.INSTANCE, PhoenixValuesRule.INSTANCE, PhoenixUncollectRule.INSTANCE, + PhoenixCorrelateRule.INSTANCE, }; /** Base class for planner rules that convert a relational expression to @@ -275,7 +287,7 @@ public class PhoenixConverterRules { * {@link PhoenixFilter}. */ public static class PhoenixFilterRule extends PhoenixConverterRule { - private static Predicate<LogicalFilter> IS_CONVERTIBLE = new Predicate<LogicalFilter>() { + protected static Predicate<LogicalFilter> IS_CONVERTIBLE = new Predicate<LogicalFilter>() { @Override public boolean apply(LogicalFilter input) { return isConvertible(input); @@ -582,6 +594,78 @@ public class PhoenixConverterRules { } /** + * Rule to convert a {@link org.apache.calcite.rel.core.SemiJoin} to a + * {@link PhoenixClientSemiJoin}. + */ + public static class PhoenixClientSemiJoinRule extends PhoenixConverterRule { + + public static final PhoenixClientSemiJoinRule INSTANCE = new PhoenixClientSemiJoinRule(); + + private PhoenixClientSemiJoinRule() { + super(SemiJoin.class, Convention.NONE, + PhoenixRel.CLIENT_CONVENTION, "PhoenixClientSemiJoinRule"); + } + + public RelNode convert(RelNode rel) { + final SemiJoin join = (SemiJoin) rel; + RelNode left = join.getLeft(); + RelNode right = join.getRight(); + + JoinInfo joinInfo = JoinInfo.of(join.getLeft(), join.getRight(), join.getCondition()); + if (!joinInfo.leftKeys.isEmpty()) { + List<RelFieldCollation> leftFieldCollations = Lists.newArrayList(); + for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { + leftFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING)); + } + RelCollation leftCollation = RelCollations.of(leftFieldCollations); + left = LogicalSort.create(left, leftCollation, null, null); + + List<RelFieldCollation> rightFieldCollations = Lists.newArrayList(); + for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) { + rightFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING)); + } + RelCollation rightCollation = RelCollations.of(rightFieldCollations); + right = LogicalSort.create(right, rightCollation, null, null); + } + + return PhoenixClientSemiJoin.create( + convert( + left, + left.getTraitSet().replace(PhoenixRel.CLIENT_CONVENTION)), + convert( + right, + right.getTraitSet().replace(PhoenixRel.CLIENT_CONVENTION)), + join.getCondition()); + } + } + + /** + * Rule to convert a {@link org.apache.calcite.rel.core.SemiJoin} to a + * {@link PhoenixServerSemiJoin}. + */ + public static class PhoenixServerSemiJoinRule extends PhoenixConverterRule { + + public static final PhoenixServerSemiJoinRule INSTANCE = new PhoenixServerSemiJoinRule(); + + private PhoenixServerSemiJoinRule() { + super(SemiJoin.class, Convention.NONE, + PhoenixRel.SERVERJOIN_CONVENTION, "PhoenixServerSemiJoinRule"); + } + + public RelNode convert(RelNode rel) { + final SemiJoin join = (SemiJoin) rel; + return PhoenixServerSemiJoin.create( + convert( + join.getLeft(), + join.getLeft().getTraitSet().replace(PhoenixRel.SERVER_CONVENTION)), + convert( + join.getRight(), + join.getRight().getTraitSet().replace(PhoenixRel.CLIENT_CONVENTION)), + join.getCondition()); + } + } + + /** * Rule to convert a {@link org.apache.calcite.rel.core.Values} to a * {@link PhoenixValues}. */ @@ -624,6 +708,32 @@ public class PhoenixConverterRules { } /** + * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalCorrelate} to a + * {@link PhoenixCorrelate}. + */ + public static class PhoenixCorrelateRule extends PhoenixConverterRule { + + private static final PhoenixCorrelateRule INSTANCE = new PhoenixCorrelateRule(); + + private PhoenixCorrelateRule() { + super(LogicalCorrelate.class, Convention.NONE, + PhoenixRel.CLIENT_CONVENTION, "PhoenixCorrelateRule"); + } + + public RelNode convert(RelNode rel) { + final Correlate correlate = (Correlate) rel; + return PhoenixCorrelate.create( + convert(correlate.getLeft(), + correlate.getLeft().getTraitSet().replace(out)), + convert(correlate.getRight(), + correlate.getRight().getTraitSet().replace(out)), + correlate.getCorrelationId(), + correlate.getRequiredColumns(), + correlate.getJoinType()); + } + } + + /** * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect} * to an {@link PhoenixIntersectRel}. o/ http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java index d717a1e..f846d45 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java @@ -11,18 +11,27 @@ public class PhoenixFilterScanMergeRule extends RelOptRule { /** Predicate that returns true if a table scan has no filter. */ private static final Predicate<PhoenixTableScan> NO_FILTER = - new Predicate<PhoenixTableScan>() { - @Override - public boolean apply(PhoenixTableScan phoenixTableScan) { - return phoenixTableScan.filter == null; - } - }; + new Predicate<PhoenixTableScan>() { + @Override + public boolean apply(PhoenixTableScan phoenixTableScan) { + return phoenixTableScan.filter == null; + } + }; + + /** Predicate that returns true if a filter is Phoenix implementable. */ + private static Predicate<Filter> IS_CONVERTIBLE = + new Predicate<Filter>() { + @Override + public boolean apply(Filter input) { + return PhoenixConverterRules.isConvertible(input); + } + }; public static final PhoenixFilterScanMergeRule INSTANCE = new PhoenixFilterScanMergeRule(); private PhoenixFilterScanMergeRule() { super( - operand(Filter.class, + operand(Filter.class, null, IS_CONVERTIBLE, operand(PhoenixTableScan.class, null, NO_FILTER, any()))); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index da55fb5..f38c637 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -85,7 +85,7 @@ public class AggregatePlan extends BaseQueryPlan { this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, groupBy, having, null); } - private AggregatePlan( + public AggregatePlan( StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having, Expression dynamicFilter) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 2d408bc..cf4dce9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -86,7 +86,7 @@ public class ScanPlan extends BaseQueryPlan { this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter, null); } - private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index da6e541..c582cd2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -647,9 +647,11 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public QueryPlan limit(Integer limit) { - // This should never be reached, since SortMergeJoinPlan should always be - // wrapped inside a ClientProcessingPlan. - throw new UnsupportedOperationException(); + if (limit == null) + return this; + + return new ClientScanPlan(this.getContext(), this.getStatement(), this.getTableRef(), + this.getProjector(), limit, null, OrderBy.EMPTY_ORDER_BY, this); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a18e8f8/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java index 89578f6..3734b4c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java @@ -28,6 +28,7 @@ import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.ExpressionCompiler; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.RuntimeContextImpl; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -79,7 +80,7 @@ public class ToExpressionTest extends BaseConnectionlessQueryTest { } public ExpressionChecker checkExpressionEquality() { - Implementor implementor = new PhoenixRelImplementorImpl(); + Implementor implementor = new PhoenixRelImplementorImpl(new RuntimeContextImpl()); implementor.setTableRef(new TableRef(table)); Expression e = CalciteUtils.toExpression(this.calciteExpr, implementor); assertEquals(this.phoenixExpr,e);
