Repository: hive Updated Branches: refs/heads/master e73891415 -> 3f316cb5a
HIVE-13293: Cache RDD to improve parallel order by performance for HoS (Rui reviewed by Xuefu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3f316cb5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3f316cb5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3f316cb5 Branch: refs/heads/master Commit: 3f316cb5aeeefecf2c432c77442164a7dd6514d3 Parents: e738914 Author: Rui Li <[email protected]> Authored: Tue May 17 15:43:43 2016 +0800 Committer: Rui Li <[email protected]> Committed: Tue May 17 15:44:16 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/spark/SortByShuffler.java | 9 ++++++++- .../hadoop/hive/ql/exec/spark/SparkPlanGenerator.java | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3f316cb5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 766813c..a6350d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -24,6 +24,7 @@ import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.storage.StorageLevel; import scala.Tuple2; import java.util.*; @@ -31,12 +32,14 @@ import java.util.*; public class SortByShuffler implements SparkShuffler { private final boolean totalOrder; + private final SparkPlan sparkPlan; /** * @param totalOrder whether this shuffler provides total order shuffle. */ - public SortByShuffler(boolean totalOrder) { + public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { this.totalOrder = totalOrder; + this.sparkPlan = sparkPlan; } @Override @@ -45,6 +48,10 @@ public class SortByShuffler implements SparkShuffler { JavaPairRDD<HiveKey, BytesWritable> rdd; if (totalOrder) { if (numPartitions > 0) { + if (numPartitions > 1 && input.getStorageLevel() == StorageLevel.NONE()) { + input.persist(StorageLevel.DISK_ONLY()); + sparkPlan.addCachedRDDId(input.id()); + } rdd = input.sortByKey(true, numPartitions); } else { rdd = input.sortByKey(true); http://git-wip-us.apache.org/repos/asf/hive/blob/3f316cb5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 6abef4e..66ffe5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -215,9 +215,9 @@ public class SparkPlanGenerator { "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; if (edge.isMRShuffle()) { - shuffler = new SortByShuffler(false); + shuffler = new SortByShuffler(false, sparkPlan); } else if (edge.isShuffleSort()) { - shuffler = new SortByShuffler(true); + shuffler = new SortByShuffler(true, sparkPlan); } else { shuffler = new GroupByShuffler(); }
