Repository: hive
Updated Branches:
  refs/heads/master 3915980fe -> b14113be4


HIVE-18525: Add explain plan to Hive on Spark Web UI (Sahil Takiar, reviewed by 
Aihua Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b14113be
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b14113be
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b14113be

Branch: refs/heads/master
Commit: b14113be4c67217e030ae5158ce584362a721483
Parents: 3915980
Author: Sahil Takiar <takiar.sa...@gmail.com>
Authored: Fri Apr 13 13:50:57 2018 -0700
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Fri Apr 13 13:50:57 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +-
 .../apache/hadoop/hive/ql/log/PerfLogger.java   |   1 +
 .../apache/hadoop/hive/ql/exec/ExplainTask.java |  22 ++--
 .../hadoop/hive/ql/exec/spark/CacheTran.java    |  10 +-
 .../hadoop/hive/ql/exec/spark/MapInput.java     |  13 +-
 .../hadoop/hive/ql/exec/spark/MapTran.java      |   7 +-
 .../hadoop/hive/ql/exec/spark/ReduceTran.java   |   7 +-
 .../hadoop/hive/ql/exec/spark/ShuffleTran.java  |  12 +-
 .../hadoop/hive/ql/exec/spark/SparkPlan.java    |  57 ++++++++-
 .../hive/ql/exec/spark/SparkPlanGenerator.java  |  14 +--
 .../hadoop/hive/ql/exec/spark/SparkTran.java    |   3 +
 .../hive/ql/exec/spark/TestSparkPlan.java       | 122 +++++++++++++++++++
 12 files changed, 243 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e540d02..e533ee6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2830,8 +2830,12 @@ public class HiveConf extends Configuration {
     HIVE_SPARK_EXPLAIN_USER("hive.spark.explain.user", false,
         "Whether to show explain result at user level.\n" +
         "When enabled, will log EXPLAIN output for the query at user level. 
Spark only."),
+    HIVE_SPARK_LOG_EXPLAIN_WEBUI("hive.spark.log.explain.webui", true, 
"Whether to show the " +
+        "explain plan in the Spark Web UI. Only shows the regular EXPLAIN 
plan, and ignores " +
+        "any extra EXPLAIN configuration (e.g. hive.spark.explain.user, etc.). 
The explain " +
+        "plan for each stage is truncated at 100,000 characters."),
 
-    // prefix used to auto generated column aliases (this should be started 
with '_')
+    // prefix used to auto generated column aliases (this should be s,tarted 
with '_')
     
HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", 
"_c",
         "String used as a prefix when auto generating column alias.\n" +
         "By default the prefix label will be appended with a column position 
number to form the column alias. \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java 
b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 764a832..c1e1b7f 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -76,6 +76,7 @@ public class PerfLogger {
   public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
   public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
   public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph";
+  public static final String SPARK_CREATE_EXPLAIN_PLAN = 
"SparkCreateExplainPlan.";
   public static final String SPARK_SUBMIT_JOB = "SparkSubmitJob";
   public static final String SPARK_RUN_JOB = "SparkRunJob";
   public static final String SPARK_CREATE_TRAN = "SparkCreateTran.";

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index 14c6398..0b30721 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -610,7 +610,7 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
   }
 
   private JSONArray outputList(List<?> l, PrintStream out, boolean hasHeader,
-      boolean extended, boolean jsonOutput, int indent) throws Exception {
+      boolean extended, boolean jsonOutput, int indent, boolean inTest) throws 
Exception {
 
     boolean first_el = true;
     boolean nl = false;
@@ -634,7 +634,7 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
           out.println();
         }
         JSONObject jsonOut = outputPlan(o, out, extended,
-            jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent));
+            jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent), 
"", inTest);
         if (jsonOutput) {
           outputArray.put(jsonOut);
         }
@@ -672,10 +672,13 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
   @VisibleForTesting
   JSONObject outputPlan(Object work, PrintStream out,
       boolean extended, boolean jsonOutput, int indent, String appendToHeader) 
throws Exception {
+    return outputPlan(work, out, extended, jsonOutput, indent, appendToHeader,
+            queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST));
+  }
 
-    // Are we running tests?
-    final boolean inTest = 
queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST);
-
+  public JSONObject outputPlan(Object work, PrintStream out,
+                               boolean extended, boolean jsonOutput, int 
indent,
+                               String appendToHeader, boolean inTest) throws 
Exception {
     // Check if work has an explain annotation
     Annotation note = AnnotationUtils.getAnnotation(work.getClass(), 
Explain.class);
 
@@ -773,7 +776,7 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
       if (operator.getConf() != null) {
         String appender = isLogical ? " (" + operator.getOperatorId() + ")" : 
"";
         JSONObject jsonOut = outputPlan(operator.getConf(), out, extended,
-            jsonOutput, jsonOutput ? 0 : indent, appender);
+            jsonOutput, jsonOutput ? 0 : indent, appender, inTest);
         if (this.work != null && (this.work.isUserLevelExplain() || 
this.work.isFormatted())) {
           if (jsonOut != null && jsonOut.length() > 0) {
             ((JSONObject) 
jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:",
@@ -795,7 +798,7 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
         if (operator.getChildOperators() != null) {
           int cindent = jsonOutput ? 0 : indent + 2;
           for (Operator<? extends OperatorDesc> op : 
operator.getChildOperators()) {
-            JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, 
cindent);
+            JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, 
cindent, "", inTest);
             if (jsonOutput) {
               
((JSONObject)json.get(JSONObject.getNames(json)[0])).accumulate("children", 
jsonOut);
             }
@@ -971,7 +974,8 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
               out.print(header);
             }
 
-            JSONArray jsonOut = outputList(l, out, !skipHeader && 
!emptyHeader, extended, jsonOutput, ind);
+            JSONArray jsonOut = outputList(l, out, !skipHeader && 
!emptyHeader, extended,
+                    jsonOutput, ind, inTest);
 
             if (jsonOutput && !l.isEmpty()) {
               json.put(header, jsonOut);
@@ -985,7 +989,7 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
             if (!skipHeader && out != null) {
               out.println(header);
             }
-            JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, 
ind);
+            JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, 
ind, "", inTest);
             if (jsonOutput && jsonOut != null && jsonOut.length() != 0) {
               if (!skipHeader) {
                 json.put(header, jsonOut);

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
index 4b77ac9..770a100 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.storage.StorageLevel;
@@ -28,10 +29,12 @@ public abstract class CacheTran<KI extends 
WritableComparable, VI, KO extends Wr
   private boolean caching = false;
   private JavaPairRDD<KO, VO> cachedRDD;
   protected final String name;
+  private final BaseWork baseWork;
 
-  protected CacheTran(boolean cache, String name) {
+  protected CacheTran(boolean cache, String name, BaseWork baseWork) {
     this.caching = cache;
     this.name = name;
+    this.baseWork = baseWork;
   }
 
   @Override
@@ -59,4 +62,9 @@ public abstract class CacheTran<KI extends 
WritableComparable, VI, KO extends Wr
   public String getName() {
     return name;
   }
+
+  @Override
+  public BaseWork getBaseWork() {
+    return baseWork;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
index b1a0d55..b242f57 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
@@ -37,17 +38,20 @@ public class MapInput implements 
SparkTran<WritableComparable, Writable,
   private boolean toCache;
   private final SparkPlan sparkPlan;
   private final String name;
+  private final BaseWork baseWork;
 
   public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, 
Writable> hadoopRDD) {
-    this(sparkPlan, hadoopRDD, false, "MapInput");
+    this(sparkPlan, hadoopRDD, false, "MapInput", null);
   }
 
   public MapInput(SparkPlan sparkPlan,
-      JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, 
String name) {
+                  JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean 
toCache, String
+                          name, BaseWork baseWork) {
     this.hadoopRDD = hadoopRDD;
     this.toCache = toCache;
     this.sparkPlan = sparkPlan;
     this.name = name;
+    this.baseWork = baseWork;
   }
 
   public void setToCache(boolean toCache) {
@@ -98,4 +102,9 @@ public class MapInput implements 
SparkTran<WritableComparable, Writable,
   public Boolean isCacheEnable() {
     return new Boolean(toCache);
   }
+
+  @Override
+  public BaseWork getBaseWork() {
+    return baseWork;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index b102f51..7e95b12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
@@ -26,11 +27,11 @@ public class MapTran extends CacheTran<BytesWritable, 
BytesWritable, HiveKey, By
   private HiveMapFunction mapFunc;
 
   public MapTran() {
-    this(false, "MapTran");
+    this(false, "MapTran", null);
   }
 
-  public MapTran(boolean cache, String name) {
-    super(cache, name);
+  public MapTran(boolean cache, String name, BaseWork baseWork) {
+    super(cache, name, baseWork);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index 3b34c78..4bafcb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
@@ -26,11 +27,11 @@ public class ReduceTran<V> extends CacheTran<HiveKey, V, 
HiveKey, BytesWritable>
   private HiveReduceFunction<V> reduceFunc;
 
   public ReduceTran() {
-    this(false, "Reduce");
+    this(false, "Reduce", null);
   }
 
-  public ReduceTran(boolean caching, String name) {
-    super(caching, name);
+  public ReduceTran(boolean caching, String name, BaseWork baseWork) {
+    super(caching, name, baseWork);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index 40ff01a..f698079 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -31,19 +32,21 @@ public class ShuffleTran implements SparkTran<HiveKey, 
BytesWritable, HiveKey, B
   private final SparkPlan sparkPlan;
   private final String name;
   private final SparkEdgeProperty edge;
+  private final BaseWork baseWork;
 
   public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) {
-    this(sparkPlan, sf, n, false, "Shuffle", null);
+    this(sparkPlan, sf, n, false, "Shuffle", null, null);
   }
 
   public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean 
toCache, String name,
-                     SparkEdgeProperty edge) {
+                     SparkEdgeProperty edge, BaseWork baseWork) {
     shuffler = sf;
     numOfPartitions = n;
     this.toCache = toCache;
     this.sparkPlan = sparkPlan;
     this.name = name;
     this.edge = edge;
+    this.baseWork = baseWork;
   }
 
   @Override
@@ -71,6 +74,11 @@ public class ShuffleTran implements SparkTran<HiveKey, 
BytesWritable, HiveKey, B
     return new Boolean(toCache);
   }
 
+  @Override
+  public BaseWork getBaseWork() {
+    return baseWork;
+  }
+
   public SparkShuffler getShuffler() {
     return shuffler;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index b21e386..8244dcb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -26,6 +28,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.util.CallSite;
 import org.slf4j.Logger;
@@ -50,9 +59,11 @@ public class SparkPlan {
   private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new 
HashMap<SparkTran, List<SparkTran>>();
   private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
 
+  private final JobConf jobConf;
   private final SparkContext sc;
 
-  SparkPlan(SparkContext sc) {
+  SparkPlan(JobConf jobConf, SparkContext sc) {
+    this.jobConf = jobConf;
     this.sc = sc;
   }
 
@@ -68,7 +79,7 @@ public class SparkPlan {
         // Root tran, it must be MapInput
         Preconditions.checkArgument(tran instanceof MapInput,
             "AssertionError: tran must be an instance of MapInput");
-        sc.setCallSite(CallSite.apply(tran.getName(), ""));
+        sc.setCallSite(CallSite.apply(tran.getName(), 
getLongFormCallSite(tran)));
         rdd = tran.transform(null);
       } else {
         for (SparkTran parent : parents) {
@@ -82,7 +93,7 @@ public class SparkPlan {
             rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")");
           }
         }
-        sc.setCallSite(CallSite.apply(tran.getName(), ""));
+        sc.setCallSite(CallSite.apply(tran.getName(), 
getLongFormCallSite(tran)));
         rdd = tran.transform(rdd);
       }
 
@@ -109,6 +120,46 @@ public class SparkPlan {
     return finalRDD;
   }
 
+  /**
+   * Takes a {@link SparkTran} object that creates the longForm for the RDD's 
{@link CallSite}.
+   * It does this my creating an {@link ExplainTask} and running it over the
+   * {@link SparkTran#getBaseWork()} object. The explain output is serialized 
to the string,
+   * which is logged and returned. If any errors are encountered while 
creating the explain plan,
+   * an error message is simply logged, but no {@link Exception} is thrown.
+   *
+   * @param tran the {@link SparkTran} to create the long call site for
+   *
+   * @return a {@link String} containing the explain plan for the given {@link 
SparkTran}
+   */
+  private String getLongFormCallSite(SparkTran tran) {
+    if 
(this.jobConf.getBoolean(HiveConf.ConfVars.HIVE_SPARK_LOG_EXPLAIN_WEBUI.varname,
 HiveConf
+            .ConfVars.HIVE_SPARK_LOG_EXPLAIN_WEBUI.defaultBoolVal)) {
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_EXPLAIN_PLAN 
+ tran.getName());
+
+      ExplainWork explainWork = new ExplainWork();
+      explainWork.setConfig(new ExplainConfiguration());
+      ExplainTask explainTask = new ExplainTask();
+      explainTask.setWork(explainWork);
+
+      String explainOutput = "";
+      try {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        explainTask.outputPlan(tran.getBaseWork(), new 
PrintStream(outputStream), false, false, 0,
+                null, 
this.jobConf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname,
+                        HiveConf.ConfVars.HIVE_IN_TEST.defaultBoolVal));
+        explainOutput = StringUtils.abbreviate(tran.getName() + " Explain 
Plan:\n\n" + outputStream
+                .toString(), 100000);
+        LOG.debug(explainOutput);
+      } catch (Exception e) {
+        LOG.error("Error while generating explain plan for " + tran.getName(), 
e);
+      }
+
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_EXPLAIN_PLAN + 
tran.getName());
+      return explainOutput;
+    }
+    return "";
+  }
+
   public void addTran(SparkTran tran) {
     rootTrans.add(tran);
     leafTrans.add(tran);

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index c9a3196..d71d705 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -102,7 +102,7 @@ public class SparkPlanGenerator {
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
-    SparkPlan sparkPlan = new SparkPlan(this.sc.sc());
+    SparkPlan sparkPlan = new SparkPlan(this.jobConf, this.sc.sc());
     cloneToWork = sparkWork.getCloneToWork();
     workToTranMap.clear();
     workToParentWorkTranMap.clear();
@@ -145,7 +145,7 @@ public class SparkPlanGenerator {
       boolean toCache = cloneToWork.containsKey(work);
       List<BaseWork> parentWorks = sparkWork.getParents(work);
       SparkEdgeProperty sparkEdgeProperty = 
sparkWork.getEdgeProperty(parentWorks.get(0), work);
-      result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName());
+      result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName(), 
work);
       sparkPlan.addTran(result);
       for (BaseWork parentWork : parentWorks) {
         sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -216,12 +216,12 @@ public class SparkPlanGenerator {
             (toCache ? ", cached)" : ")");
 
     // Caching is disabled for MapInput due to HIVE-8920
-    MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName);
+    MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName, 
mapWork);
     return result;
   }
 
   private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, 
boolean toCache,
-                               String name) {
+                               String name, BaseWork work) {
 
     Preconditions.checkArgument(!edge.isShuffleNone(),
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
@@ -233,7 +233,7 @@ public class SparkPlanGenerator {
     } else {
       shuffler = new GroupByShuffler();
     }
-    return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), 
toCache, name, edge);
+    return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), 
toCache, name, edge, work);
   }
 
   private SparkTran generate(BaseWork work, SparkWork sparkWork) throws 
Exception {
@@ -257,12 +257,12 @@ public class SparkPlanGenerator {
               "Can't make path " + outputPath + " : " + e.getMessage());
         }
       }
-      MapTran mapTran = new MapTran(caching, work.getName());
+      MapTran mapTran = new MapTran(caching, work.getName(), work);
       HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter);
       mapTran.setMapFunction(mapFunc);
       return mapTran;
     } else if (work instanceof ReduceWork) {
-      ReduceTran reduceTran = new ReduceTran(caching, work.getName());
+      ReduceTran reduceTran = new ReduceTran(caching, work.getName(), work);
       HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, 
sparkReporter);
       reduceTran.setReduceFunction(reduceFunc);
       return reduceTran;

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
index f9057b9..29f8b3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.spark.api.java.JavaPairRDD;
 
@@ -28,5 +29,7 @@ public interface SparkTran<KI extends WritableComparable, VI, 
KO extends Writabl
 
   public String getName();
 
+  public BaseWork getBaseWork();
+
   public Boolean isCacheEnable();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
new file mode 100644
index 0000000..3fe32a0
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
@@ -0,0 +1,122 @@
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.spark.Dependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.spark.rdd.HadoopRDD;
+import org.apache.spark.rdd.MapPartitionsRDD;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.rdd.ShuffledRDD;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+
+import java.util.List;
+
+
+public class TestSparkPlan {
+
+  @Test
+  public void testSetRDDCallSite() throws Exception {
+    HiveConf conf = new HiveConf();
+    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+            SQLStdHiveAuthorizerFactory.class.getName());
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path tmpDir = new Path("TestSparkPlan-tmp");
+
+    SessionState.start(conf);
+
+    IDriver driver = null;
+    JavaSparkContext sc = null;
+
+    try {
+      driver = DriverFactory.newDriver(conf);
+      Assert.assertEquals(0, driver.run("create table test (col 
int)").getResponseCode());
+
+      driver.compile("select * from test order by col");
+      List<SparkTask> sparkTasks = 
Utilities.getSparkTasks(driver.getPlan().getRootTasks());
+      Assert.assertEquals(1, sparkTasks.size());
+
+      SparkTask sparkTask = sparkTasks.get(0);
+
+      JobConf jobConf = new JobConf(conf);
+
+      SparkConf sparkConf = new SparkConf();
+      sparkConf.setMaster("local");
+      sparkConf.setAppName("TestSparkPlan-app");
+      sc = new JavaSparkContext(sparkConf);
+
+      SparkPlanGenerator sparkPlanGenerator = new SparkPlanGenerator(sc, null, 
jobConf, tmpDir,
+              null);
+      SparkPlan sparkPlan = sparkPlanGenerator.generate(sparkTask.getWork());
+      RDD<Tuple2<HiveKey, BytesWritable>> reducerRdd = 
sparkPlan.generateGraph().rdd();
+
+      Assert.assertTrue(reducerRdd.name().contains("Reducer 2"));
+      Assert.assertTrue(reducerRdd instanceof MapPartitionsRDD);
+      
Assert.assertTrue(reducerRdd.creationSite().shortForm().contains("Reducer 2"));
+      Assert.assertTrue(reducerRdd.creationSite().longForm().contains("Explain 
Plan"));
+      Assert.assertTrue(reducerRdd.creationSite().longForm().contains("Reducer 
2"));
+
+      List<Dependency<?>> rdds = 
JavaConversions.seqAsJavaList(reducerRdd.dependencies());
+      Assert.assertEquals(1, rdds.size());
+      RDD shuffledRdd = rdds.get(0).rdd();
+
+      Assert.assertTrue(shuffledRdd.name().contains("Reducer 2"));
+      Assert.assertTrue(shuffledRdd.name().contains("SORT"));
+      Assert.assertTrue(shuffledRdd instanceof ShuffledRDD);
+      
Assert.assertTrue(shuffledRdd.creationSite().shortForm().contains("Reducer 2"));
+      
Assert.assertTrue(shuffledRdd.creationSite().longForm().contains("Explain 
Plan"));
+      
Assert.assertTrue(shuffledRdd.creationSite().longForm().contains("Reducer 2"));
+
+      rdds = JavaConversions.seqAsJavaList(shuffledRdd.dependencies());
+      Assert.assertEquals(1, rdds.size());
+      RDD mapRdd = rdds.get(0).rdd();
+
+      Assert.assertTrue(mapRdd.name().contains("Map 1"));
+      Assert.assertTrue(mapRdd instanceof MapPartitionsRDD);
+      Assert.assertTrue(mapRdd.creationSite().shortForm().contains("Map 1"));
+      Assert.assertTrue(mapRdd.creationSite().longForm().contains("Explain 
Plan"));
+      Assert.assertTrue(mapRdd.creationSite().longForm().contains("Map 1"));
+
+      rdds = JavaConversions.seqAsJavaList(mapRdd.dependencies());
+      Assert.assertEquals(1, rdds.size());
+      RDD hadoopRdd = rdds.get(0).rdd();
+
+      Assert.assertTrue(hadoopRdd.name().contains("Map 1"));
+      Assert.assertTrue(hadoopRdd.name().contains("test"));
+      Assert.assertTrue(hadoopRdd instanceof HadoopRDD);
+      Assert.assertTrue(hadoopRdd.creationSite().shortForm().contains("Map 
1"));
+    } finally {
+      if (driver != null) {
+        Assert.assertEquals(0, driver.run("drop table if exists 
test").getResponseCode());
+        driver.destroy();
+      }
+      if (sc != null) {
+        sc.close();
+      }
+      if (fs.exists(tmpDir)) {
+        fs.delete(tmpDir, true);
+      }
+    }
+  }
+}

Reply via email to