Repository: tajo
Updated Branches:
  refs/heads/master 438010f92 -> 42f3b4dd8


TAJO-927: Broadcast Join with Large, Small, Large, Small tables makes a wrong 
plan. (Hyoungjun Kim via hyunsik)

Closes #65


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/42f3b4dd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/42f3b4dd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/42f3b4dd

Branch: refs/heads/master
Commit: 42f3b4dd8fbf1a5a46a4469c7ac919fb32089225
Parents: 438010f
Author: Hyunsik Choi <[email protected]>
Authored: Fri Jul 11 14:48:02 2014 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Fri Jul 11 14:48:02 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../engine/planner/global/GlobalPlanner.java    | 13 +--
 .../planner/global/TestBroadcastJoinPlan.java   | 84 ++++++++++++++++++++
 3 files changed, 95 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/42f3b4dd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e9e512d..0694eba 100644
--- a/CHANGES
+++ b/CHANGES
@@ -82,6 +82,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
  
+    TAJO-927: Broadcast Join with Large, Small, Large, Small tables 
+    makes a wrong plan. (Hyoungjun Kim via hyunsik)
+
     TAJO-913: Add some missed tests for constant value group-by keys. 
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/42f3b4dd/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 2d0dd10..4e27574 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -317,14 +317,18 @@ public class GlobalPlanner {
       }
 
       JoinNode blockJoinNode = null;
-      int numCandidateLargeTable = 0;
+      if (!leftBroadcast && !rightBroadcast) {
+        // In the case of large, large, small, small
+        // all small tables broadcast to right large table
+        numLargeTables = 1;
+      }
       for(LogicalNode eachNode: joinNode.getBroadcastCandidateTargets()) {
-        if (numCandidateLargeTable >= 2 || numLargeTables > 2) {
-          break;
-        }
         if (eachNode.getPID() == joinNode.getPID()) {
           continue;
         }
+        if (numLargeTables >= 2) {
+          break;
+        }
         JoinNode broadcastJoinNode = (JoinNode)eachNode;
         ScanNode scanNode = broadcastJoinNode.getRightChild();
         if (getTableVolume(scanNode) < broadcastThreshold) {
@@ -334,7 +338,6 @@ public class GlobalPlanner {
               + getTableVolume(scanNode) + ") is marked a broadcasted table");
         } else {
           numLargeTables++;
-          numCandidateLargeTable++;
           if (numLargeTables < 2) {
             blockJoinNode = broadcastJoinNode;
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/42f3b4dd/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
index 7a9a1c7..fd07ae4 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.global;
 
+import junit.framework.TestCase;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
@@ -941,4 +942,87 @@ public class TestBroadcastJoinPlan {
 
     assertEquals(3, index);
   }
+
+  @Test
+  public final void testBroadcastCasebyCase1() throws IOException, 
PlanningException {
+    // large, small, large, small
+    String query = "select count(*) from large1 " +
+        "inner join small1 on large1_id = small1_id " +
+        "left outer join large2 on large1_id = large2_id " +
+        "left outer join small2 on large1_id = small2_id " ;
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr = analyzer.parse(query);
+    LogicalPlan plan = 
planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
+
+    // (((default.large1 ⋈θ default.small1) ⟕ default.large2) ⟕ 
default.small2)
+    /*
+    |-eb_1404871198908_0000_000007
+      |-eb_1404871198908_0000_000006
+        |-eb_1404871198908_0000_000005   (join eb3, eb3, broadcast small2)
+          |-eb_1404871198908_0000_000004 (scan large2)
+          |-eb_1404871198908_0000_000003 (scan large1, broadcast small1)
+    */
+
+    ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
+    int index = 0;
+    while (ebCursor.hasNext()) {
+      ExecutionBlock eb = ebCursor.nextBlock();
+      if(index == 0) {
+        LogicalNode node = eb.getPlan();
+        assertEquals(NodeType.JOIN, node.getType());
+        JoinNode joinNode = (JoinNode)node;
+
+        ScanNode scanNode1 = joinNode.getLeftChild();
+        ScanNode scanNode2 = joinNode.getRightChild();
+        assertEquals("default.large1", scanNode1.getCanonicalName());
+        assertEquals("default.small1", scanNode2.getCanonicalName());
+
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+
+        assertEquals(1, broadcastTables.size());
+        assertTrue(broadcastTables.contains("default.small1"));
+      } else if(index == 1) {
+        LogicalNode node = eb.getPlan();
+        assertEquals(NodeType.SCAN, node.getType());
+        ScanNode scanNode = (ScanNode)node;
+
+        assertEquals("default.large2", scanNode.getCanonicalName());
+
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        TestCase.assertEquals(0, broadcastTables.size());
+      } else if(index == 2) {
+        LogicalNode node = eb.getPlan();
+        assertEquals(NodeType.GROUP_BY, node.getType());
+        JoinNode joinNode = ((GroupbyNode)node).getChild();
+
+        JoinNode joinNode2 = joinNode.getLeftChild();
+        ScanNode scanNode = joinNode.getRightChild();
+        assertEquals("default.small2", scanNode.getCanonicalName());
+
+        ScanNode scanNode2 = joinNode2.getLeftChild();
+        ScanNode scanNode3 = joinNode2.getRightChild();
+
+        assertTrue(scanNode2.getCanonicalName().indexOf("000003") >= 0);
+        assertTrue(scanNode3.getCanonicalName().indexOf("000004") >= 0);
+
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+
+        TestCase.assertEquals(1, broadcastTables.size());
+        TestCase.assertTrue(broadcastTables.contains("default.small2"));
+      }
+      index++;
+    }
+
+    TestCase.assertEquals(5, index);
+  }
 }

Reply via email to