TAJO-894: Left outer join with partitioned large table and small table returns 
empty result. (Hyoungjun Kim via hyunsik)

 Closes #50


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/88d04346
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/88d04346
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/88d04346

Branch: refs/heads/window_function
Commit: 88d043467a7da5971cc3e0f535eb158cfa1d1ac5
Parents: 38f1a57
Author: Hyunsik Choi <[email protected]>
Authored: Mon Jun 30 19:41:55 2014 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Mon Jun 30 19:41:55 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../engine/planner/global/GlobalPlanner.java    | 39 ++++++++++++++++---
 .../master/rm/TajoWorkerResourceManager.java    |  3 +-
 .../tajo/engine/query/TestJoinBroadcast.java    | 40 ++++++++++++++++++++
 4 files changed, 79 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 61b5529..06f6ec1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -74,6 +74,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-894: Left outer join with partitioned large table and small table 
+    returns empty result. (Hyoungjun Kim via hyunsik)
+
     TAJO-867: OUTER JOIN with empty result subquery produces a wrong result.
     (Hyoungjun Kim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index f12b7e2..edd5674 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -199,7 +199,6 @@ public class GlobalPlanner {
     if (node instanceof RelationNode) {
       switch (node.getType()) {
       case SCAN:
-      case PARTITIONS_SCAN:
         ScanNode scanNode = (ScanNode) node;
         if (scanNode.getTableDesc().getStats() == null) {
           // TODO - this case means that data is not located in HDFS. So, we 
need additional
@@ -208,6 +207,20 @@ public class GlobalPlanner {
         } else {
           return scanNode.getTableDesc().getStats().getNumBytes();
         }
+      case PARTITIONS_SCAN:
+        PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node;
+        if (pScanNode.getTableDesc().getStats() == null) {
+          // TODO - this case means that data is not located in HDFS. So, we 
need additional
+          // broadcast method.
+          return Long.MAX_VALUE;
+        } else {
+          // if there is no selected partition
+          if (pScanNode.getInputPaths() == null || 
pScanNode.getInputPaths().length == 0) {
+            return 0;
+          } else {
+            return pScanNode.getTableDesc().getStats().getNumBytes();
+          }
+        }
       case TABLE_SUBQUERY:
         return computeDescendentVolume(((TableSubQueryNode) 
node).getSubQuery());
       default:
@@ -227,6 +240,23 @@ public class GlobalPlanner {
     return node.getType() == NodeType.SCAN || node.getType() == 
NodeType.PARTITIONS_SCAN;
   }
 
+  /**
+   * Get a volume of a table of a partitioned table
+   * @param scanNode ScanNode corresponding to a table
+   * @return table volume (bytes)
+   */
+  private static long getTableVolume(ScanNode scanNode) {
+    long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
+    if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
+      PartitionedTableScanNode pScanNode = (PartitionedTableScanNode)scanNode;
+      if (pScanNode.getInputPaths() == null || 
pScanNode.getInputPaths().length == 0) {
+        scanBytes = 0L;
+      }
+    }
+
+    return scanBytes;
+  }
+
   private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode 
joinNode,
                                         ExecutionBlock leftBlock, 
ExecutionBlock rightBlock)
       throws PlanningException {
@@ -242,8 +272,7 @@ public class GlobalPlanner {
       int numLargeTables = 0;
       for(LogicalNode eachNode: joinNode.getBroadcastTargets()) {
         ScanNode scanNode = (ScanNode)eachNode;
-        TableDesc tableDesc = scanNode.getTableDesc();
-        if (tableDesc.getStats().getNumBytes() < broadcastThreshold) {
+        if (getTableVolume(scanNode) < broadcastThreshold) {
           broadtargetTables.add(scanNode);
           LOG.info("The table " + scanNode.getCanonicalName() + " ("
               + scanNode.getTableDesc().getStats().getNumBytes() + ") is 
marked a broadcasted table");
@@ -283,10 +312,10 @@ public class GlobalPlanner {
       TableDesc rightDesc = rightScan.getTableDesc();
       long broadcastThreshold = 
conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
 
-      if (leftDesc.getStats().getNumBytes() < broadcastThreshold) {
+      if (getTableVolume(leftScan) < broadcastThreshold) {
         leftBroadcasted = true;
       }
-      if (rightDesc.getStats().getNumBytes() < broadcastThreshold) {
+      if (getTableVolume(rightScan) < broadcastThreshold) {
         rightBroadcasted = true;
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 18f2d24..3915225 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -348,9 +348,10 @@ public class TajoWorkerResourceManager extends 
CompositeService implements Worke
               Thread.sleep(100);
             }
           }
-
         } catch(InterruptedException ie) {
           LOG.error(ie);
+        } catch (Throwable t) {
+          LOG.error(t);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 2e3b899..2a2b8c3 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -446,6 +446,46 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
     cleanupQuery(res);
   }
 
+  @Test
+  public final void testCasebyCase1() throws Exception {
+    // Left outer join with a small table and a large partition table which 
not matched any partition path.
+    String tableName = 
CatalogUtil.normalizeIdentifier("largePartitionedTable");
+    testBase.execute(
+        "create table " + tableName + " (l_partkey int4, l_suppkey int4, 
l_linenumber int4, \n" +
+            "l_quantity float8, l_extendedprice float8, l_discount float8, 
l_tax float8, \n" +
+            "l_returnflag text, l_linestatus text, l_shipdate text, 
l_commitdate text, \n" +
+            "l_receiptdate text, l_shipinstruct text, l_shipmode text, 
l_comment text) \n" +
+            "partition by column(l_orderkey int4) ").close();
+    TajoTestingCluster cluster = testBase.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    executeString("insert overwrite into " + tableName +
+        " select l_partkey, l_suppkey, l_linenumber, \n" +
+        " l_quantity, l_extendedprice, l_discount, l_tax, \n" +
+        " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" +
+        " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey 
from lineitem_large");
+
+    ResultSet res = executeString(
+        "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a 
" +
+            "left outer join " + tableName + " b " +
+            "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000"
+    );
+
+    String expected = "key1,key2\n" +
+        "-------------------------------\n" +
+        "1,null\n" +
+        "1,null\n" +
+        "2,null\n" +
+        "3,null\n" +
+        "3,null\n";
+
+    try {
+      assertEquals(expected, resultSetToString(res));
+    } finally {
+      cleanupQuery(res);
+    }
+  }
 
   static interface TupleCreator {
     public Tuple createTuple(String[] columnDatas);

Reply via email to