HIVE-11363: Prewarm Hive on Spark containers [Spark Branch] (reviewed by Chao)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d8e79a6c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d8e79a6c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d8e79a6c Branch: refs/heads/branch-1 Commit: d8e79a6c02636bab40b1432479353780885fecf0 Parents: d7413e4 Author: xzhang <xzhang@xzdt> Authored: Thu Jul 30 12:48:31 2015 -0700 Committer: xzhang <xzhang@xzdt> Committed: Sat Aug 1 20:57:13 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 ++ .../ql/exec/spark/HiveSparkClientFactory.java | 5 +- .../ql/exec/spark/RemoteHiveSparkClient.java | 51 ++++++++++++++++++-- 3 files changed, 54 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d8e79a6c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 54e9cbb..3544142 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2218,6 +2218,11 @@ public class HiveConf extends Configuration { SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE( "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size in dynamic pruning."), + SPARK_PREWARM_CONTAINERS("hive.spark.prewarm.containers", false, "Whether to prewarn containers for Spark." + + "If enabled, Hive will spend no more than 60 seconds to wait for the containers to come up " + + "before any query can be executed."), + SPARK_PREWARM_NUM_CONTAINERS("hive.spark.prewarm.num.containers", 10, "The minimum number of containers to be prewarmed for Spark." + + "Applicable only if hive.spark.prewarm.containers is set to true."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), http://git-wip-us.apache.org/repos/asf/hive/blob/d8e79a6c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 007db75..e12a97d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -36,7 +36,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.spark.SparkConf; -import org.apache.spark.SparkException; import com.google.common.base.Joiner; import com.google.common.base.Splitter; @@ -52,9 +51,7 @@ public class HiveSparkClientFactory { private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer"; private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) - throws IOException, SparkException { - + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { Map<String, String> sparkConf = initiateSparkConf(hiveconf); // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. http://git-wip-us.apache.org/repos/asf/hive/blob/d8e79a6c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 4073d2b..92167e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -59,7 +60,6 @@ import org.apache.hive.spark.client.SparkClientFactory; import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkConf; -import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; @@ -85,11 +85,56 @@ public class RemoteHiveSparkClient implements HiveSparkClient { private final transient long sparkClientTimtout; - RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException { + RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws Exception { this.hiveConf = hiveConf; + sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, + TimeUnit.SECONDS); sparkConf = HiveSparkClientFactory.generateSparkConf(conf); remoteClient = SparkClientFactory.createClient(conf, hiveConf); - sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS); + + if (HiveConf.getBoolVar(hiveConf, ConfVars.SPARK_PREWARM_CONTAINERS) && + hiveConf.get("spark.master").startsWith("yarn-")) { + int minExecutors = getExecutorsToWarm(); + if (minExecutors <= 0) { + return; + } + + LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors); + + // Spend at most 60s to wait for executors to come up. + int curExecutors = 0; + long ts = System.currentTimeMillis(); + do { + curExecutors = getExecutorCount(); + if (curExecutors >= minExecutors) { + LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors); + return; + } + Thread.sleep(1000); // sleep 1 second + } while (System.currentTimeMillis() - ts < 60000); + + LOG.info("Timeout (60s) occurred while prewarming executors. The current number of executors is " + curExecutors); + } + } + + /** + * Please note that the method is very tied with Spark documentation 1.4.1 regarding + * dynamic allocation, such as default values. + * @return + */ + private int getExecutorsToWarm() { + int minExecutors = + HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.SPARK_PREWARM_NUM_CONTAINERS); + boolean dynamicAllocation = hiveConf.getBoolean("spark.dynamicAllocation.enabled", false); + if (dynamicAllocation) { + int min = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 0); + int initExecutors = sparkConf.getInt("spark.dynamicAllocation.initialExecutors", min); + minExecutors = Math.min(minExecutors, initExecutors); + } else { + int execInstances = sparkConf.getInt("spark.executor.instances", 2); + minExecutors = Math.min(minExecutors, execInstances); + } + return minExecutors; } @Override