Author: hashutosh
Date: Thu Dec 19 17:41:25 2013
New Revision: 1552375

URL: http://svn.apache.org/r1552375
Log:
HIVE-6041 : Incorrect task dependency graph for skewed join optimization (Navis 
via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/skewjoin_noskew.q
    hive/trunk/ql/src/test/results/clientpositive/skewjoin_noskew.q.out
Modified:
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
    hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1552375&r1=1552374&r2=1552375&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
 Thu Dec 19 17:41:25 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
+import 
org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -114,6 +115,14 @@ public final class GenMRSkewJoinProcesso
       return;
     }
 
+    List<Task<? extends Serializable>> children = currTask.getChildTasks();
+    if (children != null && children.size() > 1) {
+      throw new SemanticException("Should not happened");
+    }
+
+    Task<? extends Serializable> child =
+        children != null && children.size() == 1 ? children.get(0) : null;
+
     String baseTmpDir = parseCtx.getContext().getMRTmpFileURI();
 
     JoinDesc joinDescriptor = joinOp.getConf();
@@ -333,25 +342,27 @@ public final class GenMRSkewJoinProcesso
       listWorks.add(skewJoinMapJoinTask.getWork());
       listTasks.add(skewJoinMapJoinTask);
     }
+    if (children != null) {
+      for (Task<? extends Serializable> tsk : listTasks) {
+        for (Task<? extends Serializable> oldChild : children) {
+          tsk.addDependentTask(oldChild);
+        }
+      }
+    }
+    if (child != null) {
+      listTasks.add(child);
+    }
+    ConditionalResolverSkewJoinCtx context =
+        new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child);
 
     ConditionalWork cndWork = new ConditionalWork(listWorks);
     ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, 
parseCtx.getConf());
     cndTsk.setListTasks(listTasks);
     cndTsk.setResolver(new ConditionalResolverSkewJoin());
-    cndTsk
-        .setResolverCtx(new 
ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(
-        bigKeysDirToTaskMap));
-    List<Task<? extends Serializable>> oldChildTasks = 
currTask.getChildTasks();
+    cndTsk.setResolverCtx(context);
     currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
     currTask.addDependentTask(cndTsk);
 
-    if (oldChildTasks != null) {
-      for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
-        for (Task<? extends Serializable> oldChild : oldChildTasks) {
-          tsk.addDependentTask(oldChild);
-        }
-      }
-    }
     return;
   }
 

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java?rev=1552375&r1=1552374&r2=1552375&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
 Thu Dec 19 17:41:25 2013
@@ -50,7 +50,8 @@ public class ConditionalResolverSkewJoin
     // tables into corresponding different dirs (one dir per table).
     // this map stores mapping from "big key dir" to its corresponding mapjoin
     // task.
-    HashMap<String, Task<? extends Serializable>> dirToTaskMap;
+    private HashMap<String, Task<? extends Serializable>> dirToTaskMap;
+    private Task<? extends Serializable> noSkewTask;
 
     /**
      * For serialization use only.
@@ -59,9 +60,11 @@ public class ConditionalResolverSkewJoin
     }
 
     public ConditionalResolverSkewJoinCtx(
-        HashMap<String, Task<? extends Serializable>> dirToTaskMap) {
+        HashMap<String, Task<? extends Serializable>> dirToTaskMap,
+        Task<? extends Serializable> noSkewTask) {
       super();
       this.dirToTaskMap = dirToTaskMap;
+      this.noSkewTask = noSkewTask;
     }
 
     public HashMap<String, Task<? extends Serializable>> getDirToTaskMap() {
@@ -72,6 +75,14 @@ public class ConditionalResolverSkewJoin
         HashMap<String, Task<? extends Serializable>> dirToTaskMap) {
       this.dirToTaskMap = dirToTaskMap;
     }
+
+    public Task<? extends Serializable> getNoSkewTask() {
+      return noSkewTask;
+    }
+
+    public void setNoSkewTask(Task<? extends Serializable> noSkewTask) {
+      this.noSkewTask = noSkewTask;
+    }
   }
 
   public ConditionalResolverSkewJoin() {
@@ -111,6 +122,9 @@ public class ConditionalResolverSkewJoin
     } catch (IOException e) {
       e.printStackTrace();
     }
+    if (resTsks.isEmpty() && ctx.getNoSkewTask() != null) {
+      resTsks.add(ctx.getNoSkewTask());
+    }
     return resTsks;
   }
 

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoin_noskew.q
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoin_noskew.q?rev=1552375&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoin_noskew.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoin_noskew.q Thu Dec 19 
17:41:25 2013
@@ -0,0 +1,9 @@
+set hive.auto.convert.join=false;
+set hive.optimize.skewjoin=true;
+
+explain
+create table noskew as select a.* from src a join src b on a.key=b.key order 
by a.key limit 30;
+
+create table noskew as select a.* from src a join src b on a.key=b.key order 
by a.key limit 30;
+
+select * from noskew;

Modified: hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out?rev=1552375&r1=1552374&r2=1552375&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out Thu Dec 19 
17:41:25 2013
@@ -60,7 +60,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-5 depends on stages: Stage-1 , consists of Stage-6
+  Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-0
   Stage-6
   Stage-4 depends on stages: Stage-6
   Stage-0 depends on stages: Stage-1, Stage-4
@@ -701,7 +701,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-5 depends on stages: Stage-1 , consists of Stage-6
+  Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-2
   Stage-6
   Stage-4 depends on stages: Stage-6
   Stage-2 depends on stages: Stage-1, Stage-4
@@ -928,7 +928,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-5 depends on stages: Stage-1 , consists of Stage-6
+  Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-2
   Stage-6
   Stage-4 depends on stages: Stage-6
   Stage-2 depends on stages: Stage-1, Stage-4
@@ -1173,7 +1173,7 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-7 depends on stages: Stage-1 , consists of Stage-8, Stage-9
+  Stage-7 depends on stages: Stage-1 , consists of Stage-8, Stage-9, Stage-2
   Stage-8
   Stage-5 depends on stages: Stage-8
   Stage-2 depends on stages: Stage-1, Stage-5, Stage-6

Added: hive/trunk/ql/src/test/results/clientpositive/skewjoin_noskew.q.out
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/skewjoin_noskew.q.out?rev=1552375&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/skewjoin_noskew.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/skewjoin_noskew.q.out Thu Dec 
19 17:41:25 2013
@@ -0,0 +1,225 @@
+PREHOOK: query: explain
+create table noskew as select a.* from src a join src b on a.key=b.key order 
by a.key limit 30
+PREHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: query: explain
+create table noskew as select a.* from src a join src b on a.key=b.key order 
by a.key limit 30
+POSTHOOK: type: CREATETABLE_AS_SELECT
+ABSTRACT SYNTAX TREE:
+  (TOK_CREATETABLE (TOK_TABNAME noskew) TOK_LIKETABLE (TOK_QUERY (TOK_FROM 
(TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= 
(. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT 
(TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR 
(TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (. 
(TOK_TABLE_OR_COL a) key))) (TOK_LIMIT 30))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-6 depends on stages: Stage-1 , consists of Stage-7, Stage-2
+  Stage-7
+  Stage-5 depends on stages: Stage-7
+  Stage-2 depends on stages: Stage-1, Stage-5
+  Stage-0 depends on stages: Stage-2
+  Stage-8 depends on stages: Stage-0
+  Stage-3 depends on stages: Stage-8
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        a 
+          TableScan
+            alias: a
+            Reduce Output Operator
+              key expressions:
+                    expr: key
+                    type: string
+              sort order: +
+              Map-reduce partition columns:
+                    expr: key
+                    type: string
+              tag: 0
+              value expressions:
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+        b 
+          TableScan
+            alias: b
+            Reduce Output Operator
+              key expressions:
+                    expr: key
+                    type: string
+              sort order: +
+              Map-reduce partition columns:
+                    expr: key
+                    type: string
+              tag: 1
+      Reduce Operator Tree:
+        Join Operator
+          condition map:
+               Inner Join 0 to 1
+          condition expressions:
+            0 {VALUE._col0} {VALUE._col1}
+            1 
+          handleSkewJoin: true
+          outputColumnNames: _col0, _col1
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: string
+                  expr: _col1
+                  type: string
+            outputColumnNames: _col0, _col1
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-6
+    Conditional Operator
+
+  Stage: Stage-7
+    Map Reduce Local Work
+      Alias -> Map Local Tables:
+        1 
+          Fetch Operator
+            limit: -1
+      Alias -> Map Local Operator Tree:
+        1 
+          TableScan
+            HashTable Sink Operator
+              condition expressions:
+                0 {0_VALUE_0} {0_VALUE_1}
+                1 
+              handleSkewJoin: false
+              keys:
+                0 [Column[joinkey0]]
+                1 [Column[joinkey0]]
+              Position of Big Table: 0
+
+  Stage: Stage-5
+    Map Reduce
+      Alias -> Map Operator Tree:
+        0 
+          TableScan
+            Map Join Operator
+              condition map:
+                   Inner Join 0 to 1
+              condition expressions:
+                0 {0_VALUE_0} {0_VALUE_1}
+                1 
+              handleSkewJoin: false
+              keys:
+                0 [Column[joinkey0]]
+                1 [Column[joinkey0]]
+              outputColumnNames: _col0, _col1
+              Position of Big Table: 0
+              Select Operator
+                expressions:
+                      expr: _col0
+                      type: string
+                      expr: _col1
+                      type: string
+                outputColumnNames: _col0, _col1
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 0
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+      Local Work:
+        Map Reduce Local Work
+
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+          TableScan
+            Reduce Output Operator
+              key expressions:
+                    expr: _col0
+                    type: string
+              sort order: +
+              tag: -1
+              value expressions:
+                    expr: _col0
+                    type: string
+                    expr: _col1
+                    type: string
+      Reduce Operator Tree:
+        Extract
+          Limit
+            File Output Operator
+              compressed: false
+              GlobalTableId: 1
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  name: default.noskew
+
+  Stage: Stage-0
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+
+  Stage: Stage-8
+      Create Table Operator:
+        Create Table
+          columns: key string, value string
+          if not exists: false
+          input format: org.apache.hadoop.mapred.TextInputFormat
+          # buckets: -1
+          output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
+          name: noskew
+          isExternal: false
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: create table noskew as select a.* from src a join src b on 
a.key=b.key order by a.key limit 30
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+POSTHOOK: query: create table noskew as select a.* from src a join src b on 
a.key=b.key order by a.key limit 30
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@noskew
+PREHOOK: query: select * from noskew
+PREHOOK: type: QUERY
+PREHOOK: Input: default@noskew
+#### A masked pattern was here ####
+POSTHOOK: query: select * from noskew
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@noskew
+#### A masked pattern was here ####
+0      val_0
+0      val_0
+0      val_0
+0      val_0
+0      val_0
+0      val_0
+0      val_0
+0      val_0
+0      val_0
+10     val_10
+100    val_100
+100    val_100
+100    val_100
+100    val_100
+103    val_103
+103    val_103
+103    val_103
+103    val_103
+104    val_104
+104    val_104
+104    val_104
+104    val_104
+105    val_105
+11     val_11
+111    val_111
+113    val_113
+113    val_113
+113    val_113
+113    val_113
+114    val_114


Reply via email to