Repository: hive Updated Branches: refs/heads/master 43e713746 -> 1e74aca8d
HIVE-18368: Improve Spark Debug RDD Graph (Sahil Takiar, reviewed by Rui Li) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e74aca8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e74aca8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e74aca8 Branch: refs/heads/master Commit: 1e74aca8d09ea2ef636311d2168b4d34198f7194 Parents: 43e7137 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Thu Feb 8 12:45:58 2018 -0800 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Thu Feb 8 12:45:58 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/spark/CacheTran.java | 14 +- .../ql/exec/spark/LocalHiveSparkClient.java | 6 + .../hadoop/hive/ql/exec/spark/MapInput.java | 13 +- .../hadoop/hive/ql/exec/spark/MapTran.java | 17 +- .../hadoop/hive/ql/exec/spark/ReduceTran.java | 17 +- .../hadoop/hive/ql/exec/spark/ShuffleTran.java | 19 ++- .../hadoop/hive/ql/exec/spark/SparkPlan.java | 164 ++----------------- .../hive/ql/exec/spark/SparkPlanGenerator.java | 35 +++- .../hadoop/hive/ql/exec/spark/SparkTran.java | 2 - .../hive/ql/exec/spark/SparkUtilities.java | 36 +--- .../hive/ql/io/CombineHiveInputFormat.java | 2 +- 11 files changed, 85 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java index c5fec7d..4b77ac9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java @@ -27,9 +27,11 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr // whether to cache current RDD. private boolean caching = false; private JavaPairRDD<KO, VO> cachedRDD; + protected final String name; - protected CacheTran(boolean cache) { + protected CacheTran(boolean cache, String name) { this.caching = cache; + this.name = name; } @Override @@ -40,9 +42,10 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr cachedRDD = doTransform(input); cachedRDD.persist(StorageLevel.MEMORY_AND_DISK()); } - return cachedRDD; + return cachedRDD.setName(this.name + " (" + cachedRDD.getNumPartitions() + ", cached)"); } else { - return doTransform(input); + JavaPairRDD<KO, VO> rdd = doTransform(input); + return rdd.setName(this.name + " (" + rdd.getNumPartitions() + ")"); } } @@ -51,4 +54,9 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr } protected abstract JavaPairRDD<KO, VO> doTransform(JavaPairRDD<KI, VI> input); + + @Override + public String getName() { + return name; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index cab97a0..f43b449 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -25,7 +25,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -160,8 +162,12 @@ public class LocalHiveSparkClient implements HiveSparkClient { // Execute generated plan. JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph(); + + sc.setJobGroup("queryId = " + sparkWork.getQueryId(), DagUtils.getQueryName(jobConf)); + // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + // As we always use foreach action to submit RDD graph, it would only trigger one job. int jobId = future.jobIds().get(0); LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus( http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index d240d18..b1a0d55 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -36,17 +36,18 @@ public class MapInput implements SparkTran<WritableComparable, Writable, private JavaPairRDD<WritableComparable, Writable> hadoopRDD; private boolean toCache; private final SparkPlan sparkPlan; - private String name = "MapInput"; + private final String name; public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD) { - this(sparkPlan, hadoopRDD, false); + this(sparkPlan, hadoopRDD, false, "MapInput"); } public MapInput(SparkPlan sparkPlan, - JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) { + JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, String name) { this.hadoopRDD = hadoopRDD; this.toCache = toCache; this.sparkPlan = sparkPlan; + this.name = name; } public void setToCache(boolean toCache) { @@ -66,6 +67,7 @@ public class MapInput implements SparkTran<WritableComparable, Writable, } else { result = hadoopRDD; } + result.setName(this.name); return result; } @@ -96,9 +98,4 @@ public class MapInput implements SparkTran<WritableComparable, Writable, public Boolean isCacheEnable() { return new Boolean(toCache); } - - @Override - public void setName(String name) { - this.name = name; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index 2cc6845..b102f51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -24,14 +24,13 @@ import org.apache.spark.api.java.JavaPairRDD; public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> { private HiveMapFunction mapFunc; - private String name = "MapTran"; public MapTran() { - this(false); + this(false, "MapTran"); } - public MapTran(boolean cache) { - super(cache); + public MapTran(boolean cache, String name) { + super(cache, name); } @Override @@ -43,14 +42,4 @@ public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, By public void setMapFunction(HiveMapFunction mapFunc) { this.mapFunc = mapFunc; } - - @Override - public String getName() { - return name; - } - - @Override - public void setName(String name) { - this.name = name; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 9045e05..3b34c78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -24,14 +24,13 @@ import org.apache.spark.api.java.JavaPairRDD; public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey, BytesWritable> { private HiveReduceFunction<V> reduceFunc; - private String name = "Reduce"; public ReduceTran() { - this(false); + this(false, "Reduce"); } - public ReduceTran(boolean caching) { - super(caching); + public ReduceTran(boolean caching, String name) { + super(caching, name); } @Override @@ -43,14 +42,4 @@ public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey, BytesWritable> public void setReduceFunction(HiveReduceFunction<V> redFunc) { this.reduceFunc = redFunc; } - - @Override - public String getName() { - return name; - } - - @Override - public void setName(String name) { - this.name = name; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index aec96bc..40ff01a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; @@ -28,17 +29,21 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B private final int numOfPartitions; private final boolean toCache; private final SparkPlan sparkPlan; - private String name = "Shuffle"; + private final String name; + private final SparkEdgeProperty edge; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { - this(sparkPlan, sf, n, false); + this(sparkPlan, sf, n, false, "Shuffle", null); } - public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache) { + public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String name, + SparkEdgeProperty edge) { shuffler = sf; numOfPartitions = n; this.toCache = toCache; this.sparkPlan = sparkPlan; + this.name = name; + this.edge = edge; } @Override @@ -48,7 +53,8 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B sparkPlan.addCachedRDDId(result.id()); result = result.persist(StorageLevel.MEMORY_AND_DISK()); } - return result; + return result.setName(this.name + " (" + edge.getShuffleType() + ", " + numOfPartitions + + (toCache ? ", cached)" : ")")); } public int getNoOfPartitions() { @@ -65,11 +71,6 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B return new Boolean(toCache); } - @Override - public void setName(String name) { - this.name = name; - } - public SparkShuffler getShuffler() { return shuffler; } http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index 5d27692..b21e386 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.spark.SparkContext; +import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -48,6 +50,12 @@ public class SparkPlan { private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>(); private final Set<Integer> cachedRDDIds = new HashSet<Integer>(); + private final SparkContext sc; + + SparkPlan(SparkContext sc) { + this.sc = sc; + } + @SuppressWarnings("unchecked") public JavaPairRDD<HiveKey, BytesWritable> generateGraph() { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); @@ -60,6 +68,7 @@ public class SparkPlan { // Root tran, it must be MapInput Preconditions.checkArgument(tran instanceof MapInput, "AssertionError: tran must be an instance of MapInput"); + sc.setCallSite(CallSite.apply(tran.getName(), "")); rdd = tran.transform(null); } else { for (SparkTran parent : parents) { @@ -67,174 +76,37 @@ public class SparkPlan { if (rdd == null) { rdd = prevRDD; } else { + sc.setCallSite(CallSite.apply("UnionRDD (" + rdd.name() + ", " + + prevRDD.name() + ")", "")); rdd = rdd.union(prevRDD); + rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")"); } } + sc.setCallSite(CallSite.apply(tran.getName(), "")); rdd = tran.transform(rdd); } tranToOutputRDDMap.put(tran, rdd); } - logSparkPlan(); - JavaPairRDD<HiveKey, BytesWritable> finalRDD = null; for (SparkTran leafTran : leafTrans) { JavaPairRDD<HiveKey, BytesWritable> rdd = tranToOutputRDDMap.get(leafTran); if (finalRDD == null) { finalRDD = rdd; } else { + sc.setCallSite(CallSite.apply("UnionRDD (" + rdd.name() + ", " + finalRDD.name() + ")", + "")); finalRDD = finalRDD.union(rdd); + finalRDD.setName("UnionRDD (" + finalRDD.getNumPartitions() + ")"); } } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); - if (LOG.isDebugEnabled()) { - LOG.info("print generated spark rdd graph:\n" + SparkUtilities.rddGraphToString(finalRDD)); - } - return finalRDD; - } - - private void addNumberToTrans() { - int i = 1; - String name = null; - - // Traverse leafTran & transGraph add numbers to trans - for (SparkTran leaf : leafTrans) { - name = leaf.getName() + " " + i++; - leaf.setName(name); - } - Set<SparkTran> sparkTrans = transGraph.keySet(); - for (SparkTran tran : sparkTrans) { - name = tran.getName() + " " + i++; - tran.setName(name); - } - } - - private void logSparkPlan() { - addNumberToTrans(); - ArrayList<SparkTran> leafTran = new ArrayList<SparkTran>(); - leafTran.addAll(leafTrans); - - for (SparkTran leaf : leafTrans) { - collectLeafTrans(leaf, leafTran); - } - - // Start Traverse from the leafTrans and get parents of each leafTrans till - // the end - StringBuilder sparkPlan = new StringBuilder( - "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n"); - for (SparkTran leaf : leafTran) { - sparkPlan.append(leaf.getName()); - getSparkPlan(leaf, sparkPlan); - sparkPlan.append("\n"); - } - sparkPlan - .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! "); - LOG.info(sparkPlan.toString()); - } - - private void collectLeafTrans(SparkTran leaf, List<SparkTran> reduceTrans) { - List<SparkTran> parents = getParents(leaf); - if (parents.size() > 0) { - SparkTran nextLeaf = null; - for (SparkTran leafTran : parents) { - if (leafTran instanceof ReduceTran) { - reduceTrans.add(leafTran); - } else { - if (getParents(leafTran).size() > 0) - nextLeaf = leafTran; - } - } - if (nextLeaf != null) - collectLeafTrans(nextLeaf, reduceTrans); - } - } - - private void getSparkPlan(SparkTran tran, StringBuilder sparkPlan) { - List<SparkTran> parents = getParents(tran); - List<SparkTran> nextLeaf = new ArrayList<SparkTran>(); - if (parents.size() > 0) { - sparkPlan.append(" <-- "); - boolean isFirst = true; - for (SparkTran leaf : parents) { - if (isFirst) { - sparkPlan.append("( " + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - isFirst = false; - } else { - sparkPlan.append("," + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - } - // Leave reduceTran it will be expanded in the next line - if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) { - nextLeaf.add(leaf); - } - } - sparkPlan.append(" ) "); - if (nextLeaf.size() > 1) { - logLeafTran(nextLeaf, sparkPlan); - } else { - if (nextLeaf.size() != 0) - getSparkPlan(nextLeaf.get(0), sparkPlan); - } - } - } - - private void logLeafTran(List<SparkTran> parent, StringBuilder sparkPlan) { - sparkPlan.append(" <-- "); - boolean isFirst = true; - for (SparkTran sparkTran : parent) { - List<SparkTran> parents = getParents(sparkTran); - SparkTran leaf = parents.get(0); - if (isFirst) { - sparkPlan.append("( " + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - isFirst = false; - } else { - sparkPlan.append("," + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - } - } - sparkPlan.append(" ) "); - } - private void logShuffleTranStatus(ShuffleTran leaf, StringBuilder sparkPlan) { - int noOfPartitions = leaf.getNoOfPartitions(); - sparkPlan.append(" ( Partitions " + noOfPartitions); - SparkShuffler shuffler = leaf.getShuffler(); - sparkPlan.append(", " + shuffler.getName()); - if (leaf.isCacheEnable()) { - sparkPlan.append(", Cache on"); - } else { - sparkPlan.append(", Cache off"); - } - } + LOG.info("\n\nSpark RDD Graph:\n\n" + finalRDD.toDebugString() + "\n"); - private void logCacheStatus(SparkTran sparkTran, StringBuilder sparkPlan) { - if (sparkTran.isCacheEnable() != null) { - if (sparkTran.isCacheEnable().booleanValue()) { - sparkPlan.append(" (cache on) "); - } else { - sparkPlan.append(" (cache off) "); - } - } + return finalRDD; } public void addTran(SparkTran tran) { http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index c52692d..c9a3196 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -23,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; @@ -51,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -98,7 +102,7 @@ public class SparkPlanGenerator { public SparkPlan generate(SparkWork sparkWork) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN); - SparkPlan sparkPlan = new SparkPlan(); + SparkPlan sparkPlan = new SparkPlan(this.sc.sc()); cloneToWork = sparkWork.getCloneToWork(); workToTranMap.clear(); workToParentWorkTranMap.clear(); @@ -138,9 +142,10 @@ public class SparkPlanGenerator { result = generateMapInput(sparkPlan, (MapWork)work); sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { + boolean toCache = cloneToWork.containsKey(work); List<BaseWork> parentWorks = sparkWork.getParents(work); - result = generate(sparkPlan, - sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work)); + SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work); + result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName()); sparkPlan.addTran(result); for (BaseWork parentWork : parentWorks) { sparkPlan.connect(workToTranMap.get(parentWork), result); @@ -189,6 +194,8 @@ public class SparkPlanGenerator { JobConf jobConf = cloneJobConf(mapWork); Class ifClass = getInputFormat(jobConf, mapWork); + sc.sc().setCallSite(CallSite.apply(mapWork.getName(), "")); + JavaPairRDD<WritableComparable, Writable> hadoopRDD; if (mapWork.getNumMapTasks() != null) { jobConf.setNumMapTasks(mapWork.getNumMapTasks()); @@ -198,12 +205,24 @@ public class SparkPlanGenerator { hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } + boolean toCache = false/*cloneToWork.containsKey(mapWork)*/; + + String tables = mapWork.getAllRootOperators().stream() + .filter(op -> op instanceof TableScanOperator) + .map(ts -> ((TableScanDesc) ts.getConf()).getAlias()) + .collect(Collectors.joining(", ")); + + String rddName = mapWork.getName() + " (" + tables + ", " + hadoopRDD.getNumPartitions() + + (toCache ? ", cached)" : ")"); + // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName); return result; } - private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) { + private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache, + String name) { + Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; @@ -214,7 +233,7 @@ public class SparkPlanGenerator { } else { shuffler = new GroupByShuffler(); } - return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache); + return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge); } private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception { @@ -238,12 +257,12 @@ public class SparkPlanGenerator { "Can't make path " + outputPath + " : " + e.getMessage()); } } - MapTran mapTran = new MapTran(caching); + MapTran mapTran = new MapTran(caching, work.getName()); HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); mapTran.setMapFunction(mapFunc); return mapTran; } else if (work instanceof ReduceWork) { - ReduceTran reduceTran = new ReduceTran(caching); + ReduceTran reduceTran = new ReduceTran(caching, work.getName()); HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter); reduceTran.setReduceFunction(reduceFunc); return reduceTran; http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 037efe1..f9057b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -28,7 +28,5 @@ public interface SparkTran<KI extends WritableComparable, VI, KO extends Writabl public String getName(); - public void setName(String name); - public Boolean isCacheEnable(); } http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index f332790..943a4ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -50,12 +50,8 @@ import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; import org.apache.hive.spark.client.SparkClientUtilities; -import org.apache.spark.Dependency; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.rdd.RDD; -import org.apache.spark.rdd.UnionRDD; -import scala.collection.JavaConversions; + /** * Contains utilities methods used as part of Spark tasks. @@ -138,36 +134,6 @@ public class SparkUtilities { return sparkSession; } - - public static String rddGraphToString(JavaPairRDD rdd) { - StringBuilder sb = new StringBuilder(); - rddToString(rdd.rdd(), sb, ""); - return sb.toString(); - } - - private static void rddToString(RDD rdd, StringBuilder sb, String offset) { - sb.append(offset).append(rdd.getClass().getCanonicalName()).append("[").append(rdd.hashCode()).append("]"); - if (rdd.getStorageLevel().useMemory()) { - sb.append("(cached)"); - } - sb.append("\n"); - Collection<Dependency> dependencies = JavaConversions.asJavaCollection(rdd.dependencies()); - if (dependencies != null) { - offset += "\t"; - for (Dependency dependency : dependencies) { - RDD parentRdd = dependency.rdd(); - rddToString(parentRdd, sb, offset); - } - } else if (rdd instanceof UnionRDD) { - UnionRDD unionRDD = (UnionRDD) rdd; - offset += "\t"; - Collection<RDD> parentRdds = JavaConversions.asJavaCollection(unionRDD.rdds()); - for (RDD parentRdd : parentRdds) { - rddToString(parentRdd, sb, offset); - } - } - } - /** * Generate a temporary path for dynamic partition pruning in Spark branch * TODO: no longer need this if we use accumulator! http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index b698987..1622ae2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -415,7 +415,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ combine.createPool(job, f); poolMap.put(combinePathInputFormat, f); } else { - LOG.info("CombineHiveInputSplit: pool is already created for " + path + + LOG.debug("CombineHiveInputSplit: pool is already created for " + path + "; using filter path " + filterPath); f.addPath(filterPath); }