HIVE-10550: Dynamic RDD caching optimization for HoS [Spark Branch] (Chengxiang 
reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/441d4c08
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/441d4c08
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/441d4c08

Branch: refs/heads/hbase-metastore
Commit: 441d4c081b8bc4fa0b7d7f65ef34816d005d41b4
Parents: 37cccbc
Author: chengxiang <chengxi...@apache.com>
Authored: Thu May 28 13:31:26 2015 +0800
Committer: Xuefu Zhang <xzh...@cloudera.com>
Committed: Mon Jun 1 14:04:41 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/CacheTran.java    | 54 ++++++++++++++++++++
 .../hadoop/hive/ql/exec/spark/MapTran.java      | 17 +++---
 .../hadoop/hive/ql/exec/spark/ReduceTran.java   | 17 +++---
 .../hadoop/hive/ql/exec/spark/SparkPlan.java    |  3 ++
 .../hive/ql/exec/spark/SparkPlanGenerator.java  | 25 +++++++--
 .../hive/ql/exec/spark/SparkUtilities.java      | 36 +++++++++++++
 6 files changed, 134 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
new file mode 100644
index 0000000..5ec27ec
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.storage.StorageLevel;
+
+public abstract class CacheTran<KI extends WritableComparable, VI, KO extends 
WritableComparable, VO>
+  implements SparkTran<KI, VI, KO, VO> {
+  // whether to cache current RDD.
+  private boolean caching = false;
+  private JavaPairRDD<KO, VO> cachedRDD;
+
+  protected CacheTran(boolean cache) {
+    this.caching = cache;
+  }
+
+  @Override
+  public JavaPairRDD<KO, VO> transform(
+    JavaPairRDD<KI, VI> input) {
+    if (caching) {
+      if (cachedRDD == null) {
+        cachedRDD = doTransform(input);
+        cachedRDD.persist(StorageLevel.MEMORY_AND_DISK());
+      }
+      return cachedRDD;
+    } else {
+      return doTransform(input);
+    }
+  }
+
+  public Boolean isCacheEnable() {
+    return caching;
+  }
+
+  protected abstract JavaPairRDD<KO, VO> doTransform(JavaPairRDD<KI, VI> 
input);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index 2170243..2a18991 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -22,12 +22,20 @@ import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public class MapTran implements SparkTran<BytesWritable, BytesWritable, 
HiveKey, BytesWritable> {
+public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, 
BytesWritable> {
   private HiveMapFunction mapFunc;
   private String name = "MapTran";
 
+  public MapTran() {
+    this(false);
+  }
+
+  public MapTran(boolean cache) {
+    super(cache);
+  }
+
   @Override
-  public JavaPairRDD<HiveKey, BytesWritable> transform(
+  public JavaPairRDD<HiveKey, BytesWritable> doTransform(
       JavaPairRDD<BytesWritable, BytesWritable> input) {
     return input.mapPartitionsToPair(mapFunc);
   }
@@ -42,11 +50,6 @@ public class MapTran implements SparkTran<BytesWritable, 
BytesWritable, HiveKey,
   }
 
   @Override
-  public Boolean isCacheEnable() {
-    return null;
-  }
-
-  @Override
   public void setName(String name) {
     this.name = name;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index e60dfac..3d56876 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -22,12 +22,20 @@ import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>, 
HiveKey, BytesWritable> {
+public class ReduceTran extends CacheTran<HiveKey, Iterable<BytesWritable>, 
HiveKey, BytesWritable> {
   private HiveReduceFunction reduceFunc;
   private String name = "Reduce";
 
+  public ReduceTran() {
+    this(false);
+  }
+
+  public ReduceTran(boolean caching) {
+    super(caching);
+  }
+
   @Override
-  public JavaPairRDD<HiveKey, BytesWritable> transform(
+  public JavaPairRDD<HiveKey, BytesWritable> doTransform(
       JavaPairRDD<HiveKey, Iterable<BytesWritable>> input) {
     return input.mapPartitionsToPair(reduceFunc);
   }
@@ -42,11 +50,6 @@ public class ReduceTran implements SparkTran<HiveKey, 
Iterable<BytesWritable>, H
   }
 
   @Override
-  public Boolean isCacheEnable() {
-    return null;
-  }
-
-  @Override
   public void setName(String name) {
     this.name = name;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index ee5c78a..762f734 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -88,6 +88,9 @@ public class SparkPlan {
     }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
+    if (LOG.isDebugEnabled()) {
+      LOG.info("print generated spark rdd graph:\n" + 
SparkUtilities.rddGraphToString(finalRDD));
+    }
     return finalRDD;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 3f240f5..3a1eff8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -102,7 +102,7 @@ public class SparkPlanGenerator {
     try {
       for (BaseWork work : sparkWork.getAllWork()) {
         perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + 
work.getName());
-        SparkTran tran = generate(work);
+        SparkTran tran = generate(work, sparkWork);
         SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work);
         sparkPlan.addTran(tran);
         sparkPlan.connect(parentTran, tran);
@@ -206,18 +206,19 @@ public class SparkPlanGenerator {
     return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), 
toCache);
   }
 
-  private SparkTran generate(BaseWork work) throws Exception {
+  private SparkTran generate(BaseWork work, SparkWork sparkWork) throws 
Exception {
     initStatsPublisher(work);
     JobConf newJobConf = cloneJobConf(work);
     checkSpecs(work, newJobConf);
     byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
+    boolean caching = isCachingWork(work, sparkWork);
     if (work instanceof MapWork) {
-      MapTran mapTran = new MapTran();
+      MapTran mapTran = new MapTran(caching);
       HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter);
       mapTran.setMapFunction(mapFunc);
       return mapTran;
     } else if (work instanceof ReduceWork) {
-      ReduceTran reduceTran = new ReduceTran();
+      ReduceTran reduceTran = new ReduceTran(caching);
       HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, 
sparkReporter);
       reduceTran.setReduceFunction(reduceFunc);
       return reduceTran;
@@ -227,6 +228,22 @@ public class SparkPlanGenerator {
     }
   }
 
+  private boolean isCachingWork(BaseWork work, SparkWork sparkWork) {
+    boolean caching = true;
+    List<BaseWork> children = sparkWork.getChildren(work);
+    if (children.size() < 2) {
+      caching = false;
+    } else {
+      // do not cache this if its child RDD is intend to be cached.
+      for (BaseWork child : children) {
+        if (cloneToWork.containsKey(child)) {
+          caching = false;
+        }
+      }
+    }
+    return caching;
+  }
+
   private void checkSpecs(BaseWork work, JobConf jc) throws Exception {
     Set<Operator<?>> opList = work.getAllOperators();
     for (Operator<?> op : opList) {

http://git-wip-us.apache.org/repos/asf/hive/blob/441d4c08/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index c012af8..ef612bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.UUID;
 
 import org.apache.commons.io.FilenameUtils;
@@ -33,7 +34,12 @@ import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.Dependency;
 import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.rdd.UnionRDD;
+import scala.collection.JavaConversions;
 
 /**
  * Contains utilities methods used as part of Spark tasks.
@@ -116,4 +122,34 @@ public class SparkUtilities {
     SessionState.get().setSparkSession(sparkSession);
     return sparkSession;
   }
+
+
+  public static String rddGraphToString(JavaPairRDD rdd) {
+    StringBuilder sb = new StringBuilder();
+    rddToString(rdd.rdd(), sb, "");
+    return sb.toString();
+  }
+
+  private static void rddToString(RDD rdd, StringBuilder sb, String offset) {
+    
sb.append(offset).append(rdd.getClass().getCanonicalName()).append("[").append(rdd.hashCode()).append("]");
+    if (rdd.getStorageLevel().useMemory()) {
+      sb.append("(cached)");
+    }
+    sb.append("\n");
+    Collection<Dependency> dependencies = 
JavaConversions.asJavaCollection(rdd.dependencies());
+    if (dependencies != null) {
+      offset += "\t";
+      for (Dependency dependency : dependencies) {
+        RDD parentRdd = dependency.rdd();
+        rddToString(parentRdd, sb, offset);
+      }
+    } else if (rdd instanceof UnionRDD) {
+      UnionRDD unionRDD = (UnionRDD) rdd;
+      offset += "\t";
+      Collection<RDD> parentRdds = 
JavaConversions.asJavaCollection(unionRDD.rdds());
+      for (RDD parentRdd : parentRdds) {
+        rddToString(parentRdd, sb, offset);
+      }
+    }
+  }
 }

Reply via email to