Repository: hive Updated Branches: refs/heads/master 3915980fe -> b14113be4
HIVE-18525: Add explain plan to Hive on Spark Web UI (Sahil Takiar, reviewed by Aihua Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b14113be Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b14113be Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b14113be Branch: refs/heads/master Commit: b14113be4c67217e030ae5158ce584362a721483 Parents: 3915980 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Fri Apr 13 13:50:57 2018 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Fri Apr 13 13:50:57 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 +- .../apache/hadoop/hive/ql/log/PerfLogger.java | 1 + .../apache/hadoop/hive/ql/exec/ExplainTask.java | 22 ++-- .../hadoop/hive/ql/exec/spark/CacheTran.java | 10 +- .../hadoop/hive/ql/exec/spark/MapInput.java | 13 +- .../hadoop/hive/ql/exec/spark/MapTran.java | 7 +- .../hadoop/hive/ql/exec/spark/ReduceTran.java | 7 +- .../hadoop/hive/ql/exec/spark/ShuffleTran.java | 12 +- .../hadoop/hive/ql/exec/spark/SparkPlan.java | 57 ++++++++- .../hive/ql/exec/spark/SparkPlanGenerator.java | 14 +-- .../hadoop/hive/ql/exec/spark/SparkTran.java | 3 + .../hive/ql/exec/spark/TestSparkPlan.java | 122 +++++++++++++++++++ 12 files changed, 243 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e540d02..e533ee6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2830,8 +2830,12 @@ public class HiveConf extends Configuration { HIVE_SPARK_EXPLAIN_USER("hive.spark.explain.user", false, "Whether to show explain result at user level.\n" + "When enabled, will log EXPLAIN output for the query at user level. Spark only."), + HIVE_SPARK_LOG_EXPLAIN_WEBUI("hive.spark.log.explain.webui", true, "Whether to show the " + + "explain plan in the Spark Web UI. Only shows the regular EXPLAIN plan, and ignores " + + "any extra EXPLAIN configuration (e.g. hive.spark.explain.user, etc.). The explain " + + "plan for each stage is truncated at 100,000 characters."), - // prefix used to auto generated column aliases (this should be started with '_') + // prefix used to auto generated column aliases (this should be s,tarted with '_') HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", "_c", "String used as a prefix when auto generating column alias.\n" + "By default the prefix label will be appended with a column position number to form the column alias. \n" + http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 764a832..c1e1b7f 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -76,6 +76,7 @@ public class PerfLogger { public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning"; public static final String SPARK_BUILD_PLAN = "SparkBuildPlan"; public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph"; + public static final String SPARK_CREATE_EXPLAIN_PLAN = "SparkCreateExplainPlan."; public static final String SPARK_SUBMIT_JOB = "SparkSubmitJob"; public static final String SPARK_RUN_JOB = "SparkRunJob"; public static final String SPARK_CREATE_TRAN = "SparkCreateTran."; http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index 14c6398..0b30721 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -610,7 +610,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { } private JSONArray outputList(List<?> l, PrintStream out, boolean hasHeader, - boolean extended, boolean jsonOutput, int indent) throws Exception { + boolean extended, boolean jsonOutput, int indent, boolean inTest) throws Exception { boolean first_el = true; boolean nl = false; @@ -634,7 +634,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { out.println(); } JSONObject jsonOut = outputPlan(o, out, extended, - jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent)); + jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent), "", inTest); if (jsonOutput) { outputArray.put(jsonOut); } @@ -672,10 +672,13 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { @VisibleForTesting JSONObject outputPlan(Object work, PrintStream out, boolean extended, boolean jsonOutput, int indent, String appendToHeader) throws Exception { + return outputPlan(work, out, extended, jsonOutput, indent, appendToHeader, + queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST)); + } - // Are we running tests? - final boolean inTest = queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST); - + public JSONObject outputPlan(Object work, PrintStream out, + boolean extended, boolean jsonOutput, int indent, + String appendToHeader, boolean inTest) throws Exception { // Check if work has an explain annotation Annotation note = AnnotationUtils.getAnnotation(work.getClass(), Explain.class); @@ -773,7 +776,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { if (operator.getConf() != null) { String appender = isLogical ? " (" + operator.getOperatorId() + ")" : ""; JSONObject jsonOut = outputPlan(operator.getConf(), out, extended, - jsonOutput, jsonOutput ? 0 : indent, appender); + jsonOutput, jsonOutput ? 0 : indent, appender, inTest); if (this.work != null && (this.work.isUserLevelExplain() || this.work.isFormatted())) { if (jsonOut != null && jsonOut.length() > 0) { ((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:", @@ -795,7 +798,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { if (operator.getChildOperators() != null) { int cindent = jsonOutput ? 0 : indent + 2; for (Operator<? extends OperatorDesc> op : operator.getChildOperators()) { - JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, cindent); + JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, cindent, "", inTest); if (jsonOutput) { ((JSONObject)json.get(JSONObject.getNames(json)[0])).accumulate("children", jsonOut); } @@ -971,7 +974,8 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { out.print(header); } - JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, jsonOutput, ind); + JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, + jsonOutput, ind, inTest); if (jsonOutput && !l.isEmpty()) { json.put(header, jsonOut); @@ -985,7 +989,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { if (!skipHeader && out != null) { out.println(header); } - JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, ind); + JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, ind, "", inTest); if (jsonOutput && jsonOut != null && jsonOut.length() != 0) { if (!skipHeader) { json.put(header, jsonOut); http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java index 4b77ac9..770a100 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.WritableComparable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; @@ -28,10 +29,12 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr private boolean caching = false; private JavaPairRDD<KO, VO> cachedRDD; protected final String name; + private final BaseWork baseWork; - protected CacheTran(boolean cache, String name) { + protected CacheTran(boolean cache, String name, BaseWork baseWork) { this.caching = cache; this.name = name; + this.baseWork = baseWork; } @Override @@ -59,4 +62,9 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr public String getName() { return name; } + + @Override + public BaseWork getBaseWork() { + return baseWork; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index b1a0d55..b242f57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; @@ -37,17 +38,20 @@ public class MapInput implements SparkTran<WritableComparable, Writable, private boolean toCache; private final SparkPlan sparkPlan; private final String name; + private final BaseWork baseWork; public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD) { - this(sparkPlan, hadoopRDD, false, "MapInput"); + this(sparkPlan, hadoopRDD, false, "MapInput", null); } public MapInput(SparkPlan sparkPlan, - JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, String name) { + JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, String + name, BaseWork baseWork) { this.hadoopRDD = hadoopRDD; this.toCache = toCache; this.sparkPlan = sparkPlan; this.name = name; + this.baseWork = baseWork; } public void setToCache(boolean toCache) { @@ -98,4 +102,9 @@ public class MapInput implements SparkTran<WritableComparable, Writable, public Boolean isCacheEnable() { return new Boolean(toCache); } + + @Override + public BaseWork getBaseWork() { + return baseWork; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index b102f51..7e95b12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -26,11 +27,11 @@ public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, By private HiveMapFunction mapFunc; public MapTran() { - this(false, "MapTran"); + this(false, "MapTran", null); } - public MapTran(boolean cache, String name) { - super(cache, name); + public MapTran(boolean cache, String name, BaseWork baseWork) { + super(cache, name, baseWork); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 3b34c78..4bafcb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -26,11 +27,11 @@ public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey, BytesWritable> private HiveReduceFunction<V> reduceFunc; public ReduceTran() { - this(false, "Reduce"); + this(false, "Reduce", null); } - public ReduceTran(boolean caching, String name) { - super(caching, name); + public ReduceTran(boolean caching, String name, BaseWork baseWork) { + super(caching, name, baseWork); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 40ff01a..f698079 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -31,19 +32,21 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B private final SparkPlan sparkPlan; private final String name; private final SparkEdgeProperty edge; + private final BaseWork baseWork; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { - this(sparkPlan, sf, n, false, "Shuffle", null); + this(sparkPlan, sf, n, false, "Shuffle", null, null); } public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String name, - SparkEdgeProperty edge) { + SparkEdgeProperty edge, BaseWork baseWork) { shuffler = sf; numOfPartitions = n; this.toCache = toCache; this.sparkPlan = sparkPlan; this.name = name; this.edge = edge; + this.baseWork = baseWork; } @Override @@ -71,6 +74,11 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B return new Boolean(toCache); } + @Override + public BaseWork getBaseWork() { + return baseWork; + } + public SparkShuffler getShuffler() { return shuffler; } http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index b21e386..8244dcb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -26,6 +28,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; +import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.mapred.JobConf; import org.apache.spark.SparkContext; import org.apache.spark.util.CallSite; import org.slf4j.Logger; @@ -50,9 +59,11 @@ public class SparkPlan { private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>(); private final Set<Integer> cachedRDDIds = new HashSet<Integer>(); + private final JobConf jobConf; private final SparkContext sc; - SparkPlan(SparkContext sc) { + SparkPlan(JobConf jobConf, SparkContext sc) { + this.jobConf = jobConf; this.sc = sc; } @@ -68,7 +79,7 @@ public class SparkPlan { // Root tran, it must be MapInput Preconditions.checkArgument(tran instanceof MapInput, "AssertionError: tran must be an instance of MapInput"); - sc.setCallSite(CallSite.apply(tran.getName(), "")); + sc.setCallSite(CallSite.apply(tran.getName(), getLongFormCallSite(tran))); rdd = tran.transform(null); } else { for (SparkTran parent : parents) { @@ -82,7 +93,7 @@ public class SparkPlan { rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")"); } } - sc.setCallSite(CallSite.apply(tran.getName(), "")); + sc.setCallSite(CallSite.apply(tran.getName(), getLongFormCallSite(tran))); rdd = tran.transform(rdd); } @@ -109,6 +120,46 @@ public class SparkPlan { return finalRDD; } + /** + * Takes a {@link SparkTran} object that creates the longForm for the RDD's {@link CallSite}. + * It does this my creating an {@link ExplainTask} and running it over the + * {@link SparkTran#getBaseWork()} object. The explain output is serialized to the string, + * which is logged and returned. If any errors are encountered while creating the explain plan, + * an error message is simply logged, but no {@link Exception} is thrown. + * + * @param tran the {@link SparkTran} to create the long call site for + * + * @return a {@link String} containing the explain plan for the given {@link SparkTran} + */ + private String getLongFormCallSite(SparkTran tran) { + if (this.jobConf.getBoolean(HiveConf.ConfVars.HIVE_SPARK_LOG_EXPLAIN_WEBUI.varname, HiveConf + .ConfVars.HIVE_SPARK_LOG_EXPLAIN_WEBUI.defaultBoolVal)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_EXPLAIN_PLAN + tran.getName()); + + ExplainWork explainWork = new ExplainWork(); + explainWork.setConfig(new ExplainConfiguration()); + ExplainTask explainTask = new ExplainTask(); + explainTask.setWork(explainWork); + + String explainOutput = ""; + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + explainTask.outputPlan(tran.getBaseWork(), new PrintStream(outputStream), false, false, 0, + null, this.jobConf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, + HiveConf.ConfVars.HIVE_IN_TEST.defaultBoolVal)); + explainOutput = StringUtils.abbreviate(tran.getName() + " Explain Plan:\n\n" + outputStream + .toString(), 100000); + LOG.debug(explainOutput); + } catch (Exception e) { + LOG.error("Error while generating explain plan for " + tran.getName(), e); + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_EXPLAIN_PLAN + tran.getName()); + return explainOutput; + } + return ""; + } + public void addTran(SparkTran tran) { rootTrans.add(tran); leafTrans.add(tran); http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index c9a3196..d71d705 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -102,7 +102,7 @@ public class SparkPlanGenerator { public SparkPlan generate(SparkWork sparkWork) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN); - SparkPlan sparkPlan = new SparkPlan(this.sc.sc()); + SparkPlan sparkPlan = new SparkPlan(this.jobConf, this.sc.sc()); cloneToWork = sparkWork.getCloneToWork(); workToTranMap.clear(); workToParentWorkTranMap.clear(); @@ -145,7 +145,7 @@ public class SparkPlanGenerator { boolean toCache = cloneToWork.containsKey(work); List<BaseWork> parentWorks = sparkWork.getParents(work); SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work); - result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName()); + result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName(), work); sparkPlan.addTran(result); for (BaseWork parentWork : parentWorks) { sparkPlan.connect(workToTranMap.get(parentWork), result); @@ -216,12 +216,12 @@ public class SparkPlanGenerator { (toCache ? ", cached)" : ")"); // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName); + MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName, mapWork); return result; } private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache, - String name) { + String name, BaseWork work) { Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); @@ -233,7 +233,7 @@ public class SparkPlanGenerator { } else { shuffler = new GroupByShuffler(); } - return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge); + return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge, work); } private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception { @@ -257,12 +257,12 @@ public class SparkPlanGenerator { "Can't make path " + outputPath + " : " + e.getMessage()); } } - MapTran mapTran = new MapTran(caching, work.getName()); + MapTran mapTran = new MapTran(caching, work.getName(), work); HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); mapTran.setMapFunction(mapFunc); return mapTran; } else if (work instanceof ReduceWork) { - ReduceTran reduceTran = new ReduceTran(caching, work.getName()); + ReduceTran reduceTran = new ReduceTran(caching, work.getName(), work); HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter); reduceTran.setReduceFunction(reduceFunc); return reduceTran; http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index f9057b9..29f8b3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.WritableComparable; import org.apache.spark.api.java.JavaPairRDD; @@ -28,5 +29,7 @@ public interface SparkTran<KI extends WritableComparable, VI, KO extends Writabl public String getName(); + public BaseWork getBaseWork(); + public Boolean isCacheEnable(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java new file mode 100644 index 0000000..3fe32a0 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java @@ -0,0 +1,122 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.spark.Dependency; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.spark.rdd.HadoopRDD; +import org.apache.spark.rdd.MapPartitionsRDD; +import org.apache.spark.rdd.RDD; +import org.apache.spark.rdd.ShuffledRDD; + +import org.junit.Assert; +import org.junit.Test; + +import scala.Tuple2; +import scala.collection.JavaConversions; + +import java.util.List; + + +public class TestSparkPlan { + + @Test + public void testSetRDDCallSite() throws Exception { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + + FileSystem fs = FileSystem.getLocal(conf); + Path tmpDir = new Path("TestSparkPlan-tmp"); + + SessionState.start(conf); + + IDriver driver = null; + JavaSparkContext sc = null; + + try { + driver = DriverFactory.newDriver(conf); + Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode()); + + driver.compile("select * from test order by col"); + List<SparkTask> sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks()); + Assert.assertEquals(1, sparkTasks.size()); + + SparkTask sparkTask = sparkTasks.get(0); + + JobConf jobConf = new JobConf(conf); + + SparkConf sparkConf = new SparkConf(); + sparkConf.setMaster("local"); + sparkConf.setAppName("TestSparkPlan-app"); + sc = new JavaSparkContext(sparkConf); + + SparkPlanGenerator sparkPlanGenerator = new SparkPlanGenerator(sc, null, jobConf, tmpDir, + null); + SparkPlan sparkPlan = sparkPlanGenerator.generate(sparkTask.getWork()); + RDD<Tuple2<HiveKey, BytesWritable>> reducerRdd = sparkPlan.generateGraph().rdd(); + + Assert.assertTrue(reducerRdd.name().contains("Reducer 2")); + Assert.assertTrue(reducerRdd instanceof MapPartitionsRDD); + Assert.assertTrue(reducerRdd.creationSite().shortForm().contains("Reducer 2")); + Assert.assertTrue(reducerRdd.creationSite().longForm().contains("Explain Plan")); + Assert.assertTrue(reducerRdd.creationSite().longForm().contains("Reducer 2")); + + List<Dependency<?>> rdds = JavaConversions.seqAsJavaList(reducerRdd.dependencies()); + Assert.assertEquals(1, rdds.size()); + RDD shuffledRdd = rdds.get(0).rdd(); + + Assert.assertTrue(shuffledRdd.name().contains("Reducer 2")); + Assert.assertTrue(shuffledRdd.name().contains("SORT")); + Assert.assertTrue(shuffledRdd instanceof ShuffledRDD); + Assert.assertTrue(shuffledRdd.creationSite().shortForm().contains("Reducer 2")); + Assert.assertTrue(shuffledRdd.creationSite().longForm().contains("Explain Plan")); + Assert.assertTrue(shuffledRdd.creationSite().longForm().contains("Reducer 2")); + + rdds = JavaConversions.seqAsJavaList(shuffledRdd.dependencies()); + Assert.assertEquals(1, rdds.size()); + RDD mapRdd = rdds.get(0).rdd(); + + Assert.assertTrue(mapRdd.name().contains("Map 1")); + Assert.assertTrue(mapRdd instanceof MapPartitionsRDD); + Assert.assertTrue(mapRdd.creationSite().shortForm().contains("Map 1")); + Assert.assertTrue(mapRdd.creationSite().longForm().contains("Explain Plan")); + Assert.assertTrue(mapRdd.creationSite().longForm().contains("Map 1")); + + rdds = JavaConversions.seqAsJavaList(mapRdd.dependencies()); + Assert.assertEquals(1, rdds.size()); + RDD hadoopRdd = rdds.get(0).rdd(); + + Assert.assertTrue(hadoopRdd.name().contains("Map 1")); + Assert.assertTrue(hadoopRdd.name().contains("test")); + Assert.assertTrue(hadoopRdd instanceof HadoopRDD); + Assert.assertTrue(hadoopRdd.creationSite().shortForm().contains("Map 1")); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode()); + driver.destroy(); + } + if (sc != null) { + sc.close(); + } + if (fs.exists(tmpDir)) { + fs.delete(tmpDir, true); + } + } + } +}