Repository: hive
Updated Branches:
  refs/heads/master de9aaf607 -> c30dcbb4b


HIVE-20489: Recursive calls to intern path strings causes parse to hang (Janaki 
Lahorani, reviewed by Yongzhi Chen and Naveen Gangam)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c30dcbb4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c30dcbb4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c30dcbb4

Branch: refs/heads/master
Commit: c30dcbb4bdaedb9c59b1078ece63e4a61c7a3dbe
Parents: de9aaf6
Author: Naveen Gangam <[email protected]>
Authored: Mon Sep 10 09:40:01 2018 -0400
Committer: Naveen Gangam <[email protected]>
Committed: Mon Sep 10 09:40:21 2018 -0400

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/util/DAGTraversal.java  |   5 +
 .../hive/ql/optimizer/GenMapRedUtils.java       | 105 ++++++++++---------
 .../hadoop/hive/ql/parse/TaskCompiler.java      |   9 +-
 3 files changed, 64 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c30dcbb4/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java
index 6dce835..cb5dc2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.util;
 
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 
 import java.io.Serializable;
@@ -38,6 +39,10 @@ public class DAGTraversal {
         if (function.skipProcessing(task)) {
           continue;
         }
+        // Add list tasks from conditional tasks
+        if (task instanceof ConditionalTask) {
+          children.addAll(((ConditionalTask) task).getListTasks());
+        }
         if (task.getDependentTasks() != null) {
           children.addAll(task.getDependentTasks());
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30dcbb4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index d887124..e3ae0bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
@@ -894,64 +895,66 @@ public final class GenMapRedUtils {
   }
 
   /**
-   * Called at the end of TaskCompiler::compile to derive final
-   * explain attributes based on previous compilation.
+   * Called at the end of TaskCompiler::compile
+   * This currently does the following for each map work
+   *   1.  Intern the table descriptors of the partitions
+   *   2.  derive final explain attributes based on previous compilation.
+   *
+   * The original implementation had 2 functions internTableDesc and 
deriveFinalExplainAttributes,
+   * respectively implementing 1 and 2 mentioned above.  This was done using 
recursion over the
+   * task graph.  The recursion was inefficient in a couple of ways.
+   *   - For large graphs the recursion was filling up the stack
+   *   - Instead of finding the mapworks, it was walking all possible paths 
from root
+   *     causing a huge performance problem.
+   *
+   * This implementation combines internTableDesc and 
deriveFinalExplainAttributes into 1 call.
+   * This can be done because each refers to information within Map Work and 
performs a specific
+   * action.
+   *
+   * The revised implementation generates all the map works from all MapReduce 
tasks (getMRTasks),
+   * Spark Tasks (getSparkTasks) and Tez tasks (getTezTasks).  Then for each 
of those map works
+   * invokes the respective call.  getMRTasks, getSparkTasks and getTezTasks 
iteratively walks
+   * the task graph to find the respective map works.
+   *
+   * The iterative implementation of these functions was done as part of 
HIVE-17195.  Before
+   * HIVE-17195, these functions were recursive and had the same issue.  So, 
picking this patch
+   * for an older release will also require picking HIVE-17195 at the least.
    */
-  public static void deriveFinalExplainAttributes(
-      Task<? extends Serializable> task, Configuration conf) {
-    // TODO: deriveExplainAttributes should be called here, code is too 
fragile to move it around.
-    if (task instanceof ConditionalTask) {
-      for (Task<? extends Serializable> tsk : ((ConditionalTask) 
task).getListTasks()) {
-        deriveFinalExplainAttributes(tsk, conf);
-      }
-    } else if (task instanceof ExecDriver) {
-      MapredWork work = (MapredWork) task.getWork();
-      work.getMapWork().deriveLlap(conf, true);
-    } else if (task != null && (task.getWork() instanceof TezWork)) {
-      TezWork work = (TezWork)task.getWork();
-      for (BaseWork w : work.getAllWorkUnsorted()) {
-        if (w instanceof MapWork) {
-          ((MapWork)w).deriveLlap(conf, false);
-        }
-      }
-    } else if (task instanceof SparkTask) {
-      SparkWork work = (SparkWork) task.getWork();
-      for (BaseWork w : work.getAllWorkUnsorted()) {
-        if (w instanceof MapWork) {
-          ((MapWork) w).deriveLlap(conf, false);
-        }
+  public static void finalMapWorkChores(
+      List<Task<? extends Serializable>> tasks, Configuration conf,
+      Interner<TableDesc> interner) {
+    List<ExecDriver> mrTasks = Utilities.getMRTasks(tasks);
+    if (!mrTasks.isEmpty()) {
+      for (ExecDriver execDriver : mrTasks) {
+        execDriver.getWork().getMapWork().internTable(interner);
+        execDriver.getWork().getMapWork().deriveLlap(conf, true);
       }
     }
 
-    if (task.getChildTasks() == null) {
-      return;
-    }
-
-    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-      deriveFinalExplainAttributes(childTask, conf);
-    }
-  }
-
-  public static void internTableDesc(Task<?> task, Interner<TableDesc> 
interner) {
-
-    if (task instanceof ConditionalTask) {
-      for (Task tsk : ((ConditionalTask) task).getListTasks()) {
-        internTableDesc(tsk, interner);
-      }
-    } else if (task instanceof ExecDriver) {
-      MapredWork work = (MapredWork) task.getWork();
-      work.getMapWork().internTable(interner);
-    } else if (task != null && (task.getWork() instanceof TezWork)) {
-      TezWork work = (TezWork)task.getWork();
-      for (BaseWork w : work.getAllWorkUnsorted()) {
-        if (w instanceof MapWork) {
-          ((MapWork)w).internTable(interner);
+    List<TezTask> tezTasks = Utilities.getTezTasks(tasks);
+    if (!tezTasks.isEmpty()) {
+      for (TezTask tezTask : tezTasks) {
+        if (tezTask.getWork() instanceof TezWork) {
+          TezWork work = tezTask.getWork();
+          for (BaseWork w : work.getAllWorkUnsorted()) {
+            if (w instanceof MapWork) {
+              ((MapWork)w).internTable(interner);
+              ((MapWork)w).deriveLlap(conf, false);
+            }
+          }
         }
       }
     }
-    if (task.getNumChild() > 0) {
-      for (Task childTask : task.getChildTasks()) {
-        internTableDesc(childTask, interner);
+
+    List<SparkTask> sparkTasks = Utilities.getSparkTasks(tasks);
+    if (!sparkTasks.isEmpty()) {
+      for (SparkTask sparkTask : sparkTasks) {
+        SparkWork work = sparkTask.getWork();
+        for (BaseWork w : work.getAllWorkUnsorted()) {
+          if (w instanceof MapWork) {
+            ((MapWork) w).deriveLlap(conf, false);
+          }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30dcbb4/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 005e7b6..baefe1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -359,10 +359,11 @@ public abstract class TaskCompiler {
     }
 
     Interner<TableDesc> interner = Interners.newStrongInterner();
-    for (Task<? extends Serializable> rootTask : rootTasks) {
-      GenMapRedUtils.internTableDesc(rootTask, interner);
-      GenMapRedUtils.deriveFinalExplainAttributes(rootTask, pCtx.getConf());
-    }
+
+    // Perform Final chores on generated Map works
+    //   1.  Intern the table descriptors
+    //   2.  Derive final explain attributes based on previous compilation.
+    GenMapRedUtils.finalMapWorkChores(rootTasks, pCtx.getConf(), interner);
   }
 
   private String extractTableFullName(StatsTask tsk) throws SemanticException {

Reply via email to