TAJO-795: PlannerUtil::joinJoinKeyForEachTable need to handle theta-join. (jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/70567fcf Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/70567fcf Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/70567fcf Branch: refs/heads/window_function Commit: 70567fcf47b5fdcc33a21be51c6de03e016ae4ff Parents: 5db4746 Author: blrunner <[email protected]> Authored: Wed May 7 12:05:02 2014 +0900 Committer: blrunner <[email protected]> Committed: Wed May 7 12:05:02 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/engine/planner/PlannerUtil.java | 5 +- .../engine/planner/global/GlobalPlanner.java | 3 +- .../apache/tajo/master/TestGlobalPlanner.java | 88 ++++++++++++++++++++ 4 files changed, 95 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/70567fcf/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index ce98a7a..0abdf12 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-795: PlannerUtil::joinJoinKeyForEachTable need to handle theta-join. (jaehwa) + TAJO-792: Insert table error with database name. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/70567fcf/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java index 0cd8f7d..1f97d14 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java @@ -556,8 +556,9 @@ public class PlannerUtil { /** * @return the first array contains left table's columns, and the second array contains right table's columns. */ - public static Column[][] joinJoinKeyForEachTable(EvalNode joinQual, Schema leftSchema, Schema rightSchema) { - List<Column[]> joinKeys = getJoinKeyPairs(joinQual, leftSchema, rightSchema, true); + public static Column[][] joinJoinKeyForEachTable(EvalNode joinQual, Schema leftSchema, + Schema rightSchema, boolean includeThetaJoin) { + List<Column[]> joinKeys = getJoinKeyPairs(joinQual, leftSchema, rightSchema, includeThetaJoin); Column[] leftColumns = new Column[joinKeys.size()]; Column[] rightColumns = new Column[joinKeys.size()]; for (int i = 0; i < joinKeys.size(); i++) { http://git-wip-us.apache.org/repos/asf/tajo/blob/70567fcf/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index b411c6d..9002ac0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -168,8 +168,9 @@ public class GlobalPlanner { DataChannel channel = new DataChannel(childBlock, parent, HASH_SHUFFLE, 32); channel.setStoreType(storeType); if (join.getJoinType() != JoinType.CROSS) { + // ShuffleKeys need to not have thea-join condition because Tajo supports only equi-join. Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(), - leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema()); + leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema(), false); if (leftTable) { channel.setShuffleKeys(joinColumns[0]); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/70567fcf/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java index 4d3b096..0ce7746 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java @@ -18,6 +18,8 @@ package org.apache.tajo.master; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; @@ -25,27 +27,37 @@ import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.engine.eval.BinaryEval; +import org.apache.tajo.engine.eval.EvalType; +import org.apache.tajo.engine.eval.FieldEval; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.LogicalOptimizer; import org.apache.tajo.engine.planner.LogicalPlan; import org.apache.tajo.engine.planner.LogicalPlanner; import org.apache.tajo.engine.planner.PlanningException; +import org.apache.tajo.engine.planner.global.DataChannel; +import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.TUtil; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.Map; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; public class TestGlobalPlanner { + private static Log LOG = LogFactory.getLog(TestGlobalPlanner.class); private static TajoTestingCluster util; private static CatalogService catalog; @@ -129,6 +141,82 @@ public class TestGlobalPlanner { } @Test + public void testThetaJoinKeyPairs() throws Exception { + StringBuilder sb = new StringBuilder(); + sb.append("select n_nationkey, n_name, n_regionkey, t.cnt"); + sb.append(" from nation n"); + sb.append(" join"); + sb.append(" ("); + sb.append(" select r_regionkey, count(*) as cnt"); + sb.append(" from nation n"); + sb.append(" join region r on (n.n_regionkey = r.r_regionkey)"); + sb.append(" group by r_regionkey"); + sb.append(" ) t on (n.n_regionkey = t.r_regionkey)"); + sb.append(" and n.n_nationkey > t.cnt "); + sb.append(" order by n_nationkey"); + + MasterPlan plan = buildPlan(sb.toString()); + ExecutionBlock root = plan.getRoot(); + + Map<BinaryEval, Boolean> evalMap = TUtil.newHashMap(); + BinaryEval eval1 = new BinaryEval(EvalType.EQUAL + , new FieldEval(new Column("default.n.n_regionkey", TajoDataTypes.Type.INT4)) + , new FieldEval(new Column("default.t.r_regionkey", TajoDataTypes.Type.INT4)) + ); + evalMap.put(eval1, Boolean.FALSE); + + BinaryEval eval2 = new BinaryEval(EvalType.EQUAL + , new FieldEval(new Column("default.n.n_nationkey", TajoDataTypes.Type.INT4)) + , new FieldEval(new Column("default.t.cnt", TajoDataTypes.Type.INT4)) + ); + evalMap.put(eval2, Boolean.FALSE); + + visitChildExecutionBLock(plan, root, evalMap); + + // Find required shuffleKey. + Assert.assertTrue(evalMap.get(eval1).booleanValue()); + + // Find that ShuffleKeys only includes equi-join conditions + Assert.assertFalse(evalMap.get(eval2).booleanValue()); + } + + private void visitChildExecutionBLock(MasterPlan plan, ExecutionBlock parentBlock, + Map<BinaryEval, Boolean> qualMap) throws Exception { + boolean isExistLeftField, isExistRightField; + + for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) { + FieldEval leftField = (FieldEval)entry.getKey().getLeftExpr(); + FieldEval rightField = (FieldEval)entry.getKey().getRightExpr(); + + for (ExecutionBlock block : plan.getChilds(parentBlock)) { + isExistLeftField = false; + isExistRightField = false; + + if (plan.getIncomingChannels(block.getId()) != null) { + for (DataChannel channel :plan.getIncomingChannels(block.getId())) { + if (channel.getShuffleKeys() != null) { + for (Column column : channel.getShuffleKeys()) { + if (column.getQualifiedName().equals(leftField.getColumnRef().getQualifiedName())) { + isExistLeftField = true; + } else if (column.getQualifiedName(). + equals(rightField.getColumnRef().getQualifiedName())) { + isExistRightField = true; + } + } + } + } + + if(isExistLeftField && isExistRightField) { + qualMap.put(entry.getKey(), Boolean.TRUE); + } + } + + visitChildExecutionBLock(plan, block, qualMap); + } + } + } + + @Test public void testUnion() throws IOException, PlanningException { buildPlan("select o_custkey as num from orders union select c_custkey as num from customer union select p_partkey as num from part"); }
