HIVE-10550: Dynamic RDD caching optimization for HoS [Spark Branch] (Chengxiang reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/441d4c08 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/441d4c08 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/441d4c08 Branch: refs/heads/hbase-metastore Commit: 441d4c081b8bc4fa0b7d7f65ef34816d005d41b4 Parents: 37cccbc Author: chengxiang <chengxi...@apache.com> Authored: Thu May 28 13:31:26 2015 +0800 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Mon Jun 1 14:04:41 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/spark/CacheTran.java | 54 ++++++++++++++++++++ .../hadoop/hive/ql/exec/spark/MapTran.java | 17 +++--- .../hadoop/hive/ql/exec/spark/ReduceTran.java | 17 +++--- .../hadoop/hive/ql/exec/spark/SparkPlan.java | 3 ++ .../hive/ql/exec/spark/SparkPlanGenerator.java | 25 +++++++-- .../hive/ql/exec/spark/SparkUtilities.java | 36 +++++++++++++ 6 files changed, 134 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java new file mode 100644 index 0000000..5ec27ec --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.storage.StorageLevel; + +public abstract class CacheTran<KI extends WritableComparable, VI, KO extends WritableComparable, VO> + implements SparkTran<KI, VI, KO, VO> { + // whether to cache current RDD. + private boolean caching = false; + private JavaPairRDD<KO, VO> cachedRDD; + + protected CacheTran(boolean cache) { + this.caching = cache; + } + + @Override + public JavaPairRDD<KO, VO> transform( + JavaPairRDD<KI, VI> input) { + if (caching) { + if (cachedRDD == null) { + cachedRDD = doTransform(input); + cachedRDD.persist(StorageLevel.MEMORY_AND_DISK()); + } + return cachedRDD; + } else { + return doTransform(input); + } + } + + public Boolean isCacheEnable() { + return caching; + } + + protected abstract JavaPairRDD<KO, VO> doTransform(JavaPairRDD<KI, VI> input); +} http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index 2170243..2a18991 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -22,12 +22,20 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class MapTran implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> { +public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> { private HiveMapFunction mapFunc; private String name = "MapTran"; + public MapTran() { + this(false); + } + + public MapTran(boolean cache) { + super(cache); + } + @Override - public JavaPairRDD<HiveKey, BytesWritable> transform( + public JavaPairRDD<HiveKey, BytesWritable> doTransform( JavaPairRDD<BytesWritable, BytesWritable> input) { return input.mapPartitionsToPair(mapFunc); } @@ -42,11 +50,6 @@ public class MapTran implements SparkTran<BytesWritable, BytesWritable, HiveKey, } @Override - public Boolean isCacheEnable() { - return null; - } - - @Override public void setName(String name) { this.name = name; } http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index e60dfac..3d56876 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -22,12 +22,20 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>, HiveKey, BytesWritable> { +public class ReduceTran extends CacheTran<HiveKey, Iterable<BytesWritable>, HiveKey, BytesWritable> { private HiveReduceFunction reduceFunc; private String name = "Reduce"; + public ReduceTran() { + this(false); + } + + public ReduceTran(boolean caching) { + super(caching); + } + @Override - public JavaPairRDD<HiveKey, BytesWritable> transform( + public JavaPairRDD<HiveKey, BytesWritable> doTransform( JavaPairRDD<HiveKey, Iterable<BytesWritable>> input) { return input.mapPartitionsToPair(reduceFunc); } @@ -42,11 +50,6 @@ public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>, H } @Override - public Boolean isCacheEnable() { - return null; - } - - @Override public void setName(String name) { this.name = name; } http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index ee5c78a..762f734 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -88,6 +88,9 @@ public class SparkPlan { } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); + if (LOG.isDebugEnabled()) { + LOG.info("print generated spark rdd graph:\n" + SparkUtilities.rddGraphToString(finalRDD)); + } return finalRDD; } http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 3f240f5..3a1eff8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -102,7 +102,7 @@ public class SparkPlanGenerator { try { for (BaseWork work : sparkWork.getAllWork()) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); - SparkTran tran = generate(work); + SparkTran tran = generate(work, sparkWork); SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); sparkPlan.addTran(tran); sparkPlan.connect(parentTran, tran); @@ -206,18 +206,19 @@ public class SparkPlanGenerator { return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache); } - private SparkTran generate(BaseWork work) throws Exception { + private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception { initStatsPublisher(work); JobConf newJobConf = cloneJobConf(work); checkSpecs(work, newJobConf); byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); + boolean caching = isCachingWork(work, sparkWork); if (work instanceof MapWork) { - MapTran mapTran = new MapTran(); + MapTran mapTran = new MapTran(caching); HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); mapTran.setMapFunction(mapFunc); return mapTran; } else if (work instanceof ReduceWork) { - ReduceTran reduceTran = new ReduceTran(); + ReduceTran reduceTran = new ReduceTran(caching); HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter); reduceTran.setReduceFunction(reduceFunc); return reduceTran; @@ -227,6 +228,22 @@ public class SparkPlanGenerator { } } + private boolean isCachingWork(BaseWork work, SparkWork sparkWork) { + boolean caching = true; + List<BaseWork> children = sparkWork.getChildren(work); + if (children.size() < 2) { + caching = false; + } else { + // do not cache this if its child RDD is intend to be cached. + for (BaseWork child : children) { + if (cloneToWork.containsKey(child)) { + caching = false; + } + } + } + return caching; + } + private void checkSpecs(BaseWork work, JobConf jc) throws Exception { Set<Operator<?>> opList = work.getAllOperators(); for (Operator<?> op : opList) { http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index c012af8..ef612bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collection; import java.util.UUID; import org.apache.commons.io.FilenameUtils; @@ -33,7 +34,12 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.Dependency; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.rdd.RDD; +import org.apache.spark.rdd.UnionRDD; +import scala.collection.JavaConversions; /** * Contains utilities methods used as part of Spark tasks. @@ -116,4 +122,34 @@ public class SparkUtilities { SessionState.get().setSparkSession(sparkSession); return sparkSession; } + + + public static String rddGraphToString(JavaPairRDD rdd) { + StringBuilder sb = new StringBuilder(); + rddToString(rdd.rdd(), sb, ""); + return sb.toString(); + } + + private static void rddToString(RDD rdd, StringBuilder sb, String offset) { + sb.append(offset).append(rdd.getClass().getCanonicalName()).append("[").append(rdd.hashCode()).append("]"); + if (rdd.getStorageLevel().useMemory()) { + sb.append("(cached)"); + } + sb.append("\n"); + Collection<Dependency> dependencies = JavaConversions.asJavaCollection(rdd.dependencies()); + if (dependencies != null) { + offset += "\t"; + for (Dependency dependency : dependencies) { + RDD parentRdd = dependency.rdd(); + rddToString(parentRdd, sb, offset); + } + } else if (rdd instanceof UnionRDD) { + UnionRDD unionRDD = (UnionRDD) rdd; + offset += "\t"; + Collection<RDD> parentRdds = JavaConversions.asJavaCollection(unionRDD.rdds()); + for (RDD parentRdd : parentRdds) { + rddToString(parentRdd, sb, offset); + } + } + } }