Repository: phoenix Updated Branches: refs/heads/calcite 73d7f9621 -> 9434d5e6e
First join query worked Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9434d5e6 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9434d5e6 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9434d5e6 Branch: refs/heads/calcite Commit: 9434d5e6e45dbe100a0f0b3e5aaa172f01145aa2 Parents: 73d7f96 Author: maryannxue <[email protected]> Authored: Fri Mar 13 19:28:07 2015 -0400 Committer: maryannxue <[email protected]> Committed: Fri Mar 13 19:28:07 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 20 +++++ .../org/apache/phoenix/calcite/PhoenixJoin.java | 88 +++++++++++++++++++- .../org/apache/phoenix/calcite/PhoenixRel.java | 1 + .../calcite/PhoenixRelImplementorImpl.java | 5 ++ 4 files changed, 113 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9434d5e6/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index afd8d83..cf2a3a0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -16,6 +16,8 @@ import java.sql.*; import java.util.Arrays; import java.util.List; +import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME; +import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.*; @@ -179,6 +181,9 @@ public class CalciteTest extends BaseClientManagedTimeIT { DriverManager.getConnection(url).unwrap(PhoenixConnection.class); ensureTableCreated(url, ATABLE_NAME); initATableValues(getOrganizationId(), null, url); + ensureTableCreated(url, JOIN_ITEM_TABLE_FULL_NAME); + ensureTableCreated(url, JOIN_SUPPLIER_TABLE_FULL_NAME); + initJoinTableValues(url, null, null); calciteConnection.getRootSchema().add("phoenix", new PhoenixSchema(phoenixConnection)); calciteConnection.setSchema("phoenix"); @@ -214,6 +219,21 @@ public class CalciteTest extends BaseClientManagedTimeIT { {"00A323122312312", "a", "00D300000000XHP"}, {"00A423122312312", "a", "00D300000000XHP"}}); } + + @Test public void testJoin() throws Exception { + testConnect("select t1.entity_id, t2.a_string, t1.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id where t1.a_string = 'a'", + new Object[][] {{"00A123122312312", "a", "00D300000000XHP"}, + {"00A223122312312", "a", "00D300000000XHP"}, + {"00A323122312312", "a", "00D300000000XHP"}, + {"00A423122312312", "a", "00D300000000XHP"}}); +// testConnect("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"", +// new Object[][] {{"0000000001", "T1", "0000000001", "S1"}, +// {"0000000002", "T2", "0000000001", "S1"}, +// {"0000000003", "T3", "0000000002", "S2"}, +// {"0000000004", "T4", "0000000002", "S2"}, +// {"0000000005", "T5", "0000000005", "S5"}, +// {"0000000006", "T6", "0000000006", "S6"}}); + } @Test public void testExplainPlanForSelectWhereQuery() { start() http://git-wip-us.apache.org/repos/asf/phoenix/blob/9434d5e6/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java index bf31d97..d0b0777 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java @@ -1,16 +1,31 @@ package org.apache.phoenix.calcite; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.JoinCompiler; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; /** * Implementation of {@link org.apache.calcite.rel.core.Join} @@ -31,6 +46,77 @@ public class PhoenixJoin extends Join implements PhoenixRel { @Override public QueryPlan implement(Implementor implementor) { - throw new UnsupportedOperationException(); + PhoenixRel left = (PhoenixRel) getLeft(); + PhoenixRel right = (PhoenixRel) getRight(); + boolean hashRHS = (left instanceof PhoenixTableScan) && getJoinType() != JoinRelType.RIGHT; + if (!hashRHS) + throw new UnsupportedOperationException(); + + JoinInfo joinInfo = JoinInfo.of(left, right, getCondition()); + List<Expression> leftExprs = Lists.<Expression> newArrayList(); + List<Expression> rightExprs = Lists.<Expression> newArrayList(); + implementor.pushContext(new ImplementorContext(true)); + QueryPlan leftPlan = implementor.visitInput(0, left); + PTable leftTable = implementor.getTableRef().getTable(); + for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { + Integer index = iter.next(); + leftExprs.add(implementor.newColumnExpression(index)); + } + if (leftExprs.isEmpty()) { + leftExprs.add(LiteralExpression.newConstant(0)); + } + implementor.popContext(); + implementor.pushContext(new ImplementorContext(false)); + QueryPlan rightPlan = implementor.visitInput(1, right); + PTable rightTable = implementor.getTableRef().getTable(); + for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) { + Integer index = iter.next(); + rightExprs.add(implementor.newColumnExpression(index)); + } + if (rightExprs.isEmpty()) { + rightExprs.add(LiteralExpression.newConstant(0)); + } + implementor.popContext(); + + JoinType type = convertJoinType(getJoinType()); + PTable joinedTable; + try { + joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + implementor.setTableRef(new TableRef(joinedTable)); + Expression postFilterExpr = CalciteUtils.toExpression(joinInfo.getRemaining(getCluster().getRexBuilder()), implementor); + @SuppressWarnings("unchecked") + HashJoinInfo hashJoinInfo = new HashJoinInfo( + joinedTable, new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, + (List<Expression>[]) (new List[] {leftExprs}), + new JoinType[] {type}, new boolean[] {true}, + new PTable[] {rightTable}, + new int[] {leftTable.getColumns().size() - leftTable.getPKColumns().size()}, + postFilterExpr, null); + + return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)}); + } + + private JoinType convertJoinType(JoinRelType type) { + JoinType ret = null; + switch (type) { + case INNER: + ret = JoinType.Inner; + break; + case LEFT: + ret = JoinType.Left; + break; + case RIGHT: + ret = JoinType.Right; + break; + case FULL: + ret = JoinType.Full; + break; + default: + } + + return ret; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9434d5e6/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java index a1f15d3..4909d64 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java @@ -43,6 +43,7 @@ public interface PhoenixRel extends RelNode { QueryPlan visitInput(int i, PhoenixRel input); ColumnExpression newColumnExpression(int index); void setTableRef(TableRef tableRef); + TableRef getTableRef(); void pushContext(ImplementorContext context); ImplementorContext popContext(); ImplementorContext getCurrentContext(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9434d5e6/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java index 2eafbf8..2a403ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java @@ -33,6 +33,11 @@ class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { public void setTableRef(TableRef tableRef) { this.tableRef = tableRef; } + + @Override + public TableRef getTableRef() { + return this.tableRef; + } @Override public void pushContext(ImplementorContext context) {
