Repository: tajo Updated Branches: refs/heads/master 438010f92 -> 42f3b4dd8
TAJO-927: Broadcast Join with Large, Small, Large, Small tables makes a wrong plan. (Hyoungjun Kim via hyunsik) Closes #65 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/42f3b4dd Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/42f3b4dd Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/42f3b4dd Branch: refs/heads/master Commit: 42f3b4dd8fbf1a5a46a4469c7ac919fb32089225 Parents: 438010f Author: Hyunsik Choi <[email protected]> Authored: Fri Jul 11 14:48:02 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Fri Jul 11 14:48:02 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../engine/planner/global/GlobalPlanner.java | 13 +-- .../planner/global/TestBroadcastJoinPlan.java | 84 ++++++++++++++++++++ 3 files changed, 95 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/42f3b4dd/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e9e512d..0694eba 100644 --- a/CHANGES +++ b/CHANGES @@ -82,6 +82,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-927: Broadcast Join with Large, Small, Large, Small tables + makes a wrong plan. (Hyoungjun Kim via hyunsik) + TAJO-913: Add some missed tests for constant value group-by keys. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/42f3b4dd/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 2d0dd10..4e27574 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -317,14 +317,18 @@ public class GlobalPlanner { } JoinNode blockJoinNode = null; - int numCandidateLargeTable = 0; + if (!leftBroadcast && !rightBroadcast) { + // In the case of large, large, small, small + // all small tables broadcast to right large table + numLargeTables = 1; + } for(LogicalNode eachNode: joinNode.getBroadcastCandidateTargets()) { - if (numCandidateLargeTable >= 2 || numLargeTables > 2) { - break; - } if (eachNode.getPID() == joinNode.getPID()) { continue; } + if (numLargeTables >= 2) { + break; + } JoinNode broadcastJoinNode = (JoinNode)eachNode; ScanNode scanNode = broadcastJoinNode.getRightChild(); if (getTableVolume(scanNode) < broadcastThreshold) { @@ -334,7 +338,6 @@ public class GlobalPlanner { + getTableVolume(scanNode) + ") is marked a broadcasted table"); } else { numLargeTables++; - numCandidateLargeTable++; if (numLargeTables < 2) { blockJoinNode = broadcastJoinNode; } http://git-wip-us.apache.org/repos/asf/tajo/blob/42f3b4dd/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java index 7a9a1c7..fd07ae4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.global; +import junit.framework.TestCase; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; @@ -941,4 +942,87 @@ public class TestBroadcastJoinPlan { assertEquals(3, index); } + + @Test + public final void testBroadcastCasebyCase1() throws IOException, PlanningException { + // large, small, large, small + String query = "select count(*) from large1 " + + "inner join small1 on large1_id = small1_id " + + "left outer join large2 on large1_id = large2_id " + + "left outer join small2 on large1_id = small2_id " ; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + // (((default.large1 âθ default.small1) â default.large2) â default.small2) + /* + |-eb_1404871198908_0000_000007 + |-eb_1404871198908_0000_000006 + |-eb_1404871198908_0000_000005 (join eb3, eb3, broadcast small2) + |-eb_1404871198908_0000_000004 (scan large2) + |-eb_1404871198908_0000_000003 (scan large1, broadcast small1) + */ + + ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); + int index = 0; + while (ebCursor.hasNext()) { + ExecutionBlock eb = ebCursor.nextBlock(); + if(index == 0) { + LogicalNode node = eb.getPlan(); + assertEquals(NodeType.JOIN, node.getType()); + JoinNode joinNode = (JoinNode)node; + + ScanNode scanNode1 = joinNode.getLeftChild(); + ScanNode scanNode2 = joinNode.getRightChild(); + assertEquals("default.large1", scanNode1.getCanonicalName()); + assertEquals("default.small1", scanNode2.getCanonicalName()); + + Collection<String> broadcastTables = eb.getBroadcastTables(); + + assertEquals(1, broadcastTables.size()); + assertTrue(broadcastTables.contains("default.small1")); + } else if(index == 1) { + LogicalNode node = eb.getPlan(); + assertEquals(NodeType.SCAN, node.getType()); + ScanNode scanNode = (ScanNode)node; + + assertEquals("default.large2", scanNode.getCanonicalName()); + + Collection<String> broadcastTables = eb.getBroadcastTables(); + TestCase.assertEquals(0, broadcastTables.size()); + } else if(index == 2) { + LogicalNode node = eb.getPlan(); + assertEquals(NodeType.GROUP_BY, node.getType()); + JoinNode joinNode = ((GroupbyNode)node).getChild(); + + JoinNode joinNode2 = joinNode.getLeftChild(); + ScanNode scanNode = joinNode.getRightChild(); + assertEquals("default.small2", scanNode.getCanonicalName()); + + ScanNode scanNode2 = joinNode2.getLeftChild(); + ScanNode scanNode3 = joinNode2.getRightChild(); + + assertTrue(scanNode2.getCanonicalName().indexOf("000003") >= 0); + assertTrue(scanNode3.getCanonicalName().indexOf("000004") >= 0); + + Collection<String> broadcastTables = eb.getBroadcastTables(); + + TestCase.assertEquals(1, broadcastTables.size()); + TestCase.assertTrue(broadcastTables.contains("default.small2")); + } + index++; + } + + TestCase.assertEquals(5, index); + } }
