HIVE-9774: Print yarn application id to console [Spark Branch] (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b9f99591 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b9f99591 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b9f99591 Branch: refs/heads/master Commit: b9f99591f6350d20902708c28bfbb708d7603c6e Parents: e1a7503 Author: Rui Li <[email protected]> Authored: Tue Jan 19 09:51:29 2016 +0800 Committer: Rui Li <[email protected]> Committed: Thu Jan 28 14:53:06 2016 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 22 +++++++++++++++----- .../spark/status/RemoteSparkJobMonitor.java | 15 +++++++++++++ .../ql/exec/spark/status/SparkJobStatus.java | 2 ++ .../spark/status/impl/LocalSparkJobStatus.java | 5 +++++ .../spark/status/impl/RemoteSparkJobStatus.java | 22 ++++++++++++++++++++ 5 files changed, 61 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b9d4b5e..0ff0b39 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -387,6 +387,7 @@ public class HiveConf extends Configuration { // a symbolic name to reference in the Hive source code. Properties with non-null // values will override any values set in the underlying Hadoop configuration. HADOOPBIN("hadoop.bin.path", findHadoopBinary(), "", true), + YARNBIN("yarn.bin.path", findYarnBinary(), "", true), HIVE_FS_HAR_IMPL("fs.har.impl", "org.apache.hadoop.hive.shims.HiveHarFileSystem", "The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop versions less than 0.20"), MAPREDMAXSPLITSIZE(FileInputFormat.SPLIT_MAXSIZE, 256000000L, "", true), @@ -2796,16 +2797,27 @@ public class HiveConf extends Configuration { } private static String findHadoopBinary() { + String val = findHadoopHome(); + // if can't find hadoop home we can at least try /usr/bin/hadoop + val = (val == null ? File.separator + "usr" : val) + + File.separator + "bin" + File.separator + "hadoop"; + // Launch hadoop command file on windows. + return val + (Shell.WINDOWS ? ".cmd" : ""); + } + + private static String findYarnBinary() { + String val = findHadoopHome(); + val = (val == null ? "yarn" : val + File.separator + "bin" + File.separator + "yarn"); + return val + (Shell.WINDOWS ? ".cmd" : ""); + } + + private static String findHadoopHome() { String val = System.getenv("HADOOP_HOME"); // In Hadoop 1.X and Hadoop 2.X HADOOP_HOME is gone and replaced with HADOOP_PREFIX if (val == null) { val = System.getenv("HADOOP_PREFIX"); } - // and if all else fails we can at least try /usr/bin/hadoop - val = (val == null ? File.separator + "usr" : val) - + File.separator + "bin" + File.separator + "hadoop"; - // Launch hadoop command file on windows. - return val + (Shell.WINDOWS ? ".cmd" : ""); + return val; } public String getDefaultValue() { http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index fb0498a..6990e80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,10 +34,12 @@ import org.apache.spark.JobExecutionStatus; public class RemoteSparkJobMonitor extends SparkJobMonitor { private RemoteSparkJobStatus sparkJobStatus; + private final HiveConf hiveConf; public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) { super(hiveConf); this.sparkJobStatus = sparkJobStatus; + this.hiveConf = hiveConf; } @Override @@ -77,6 +79,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + printAppInfo(); // print job stages. console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + "] stages:"); @@ -137,4 +140,16 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); return rc; } + + private void printAppInfo() { + String sparkMaster = hiveConf.get("spark.master"); + if (sparkMaster != null && sparkMaster.startsWith("yarn")) { + String appID = sparkJobStatus.getAppID(); + if (appID != null) { + console.printInfo("Running with YARN Application = " + appID); + console.printInfo("Kill Command = " + + HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + " application -kill " + appID); + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index fa45ec8..7959089 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -29,6 +29,8 @@ import java.util.Map; */ public interface SparkJobStatus { + String getAppID(); + int getJobId(); JobExecutionStatus getState() throws HiveException; http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index ebc5c16..3c15521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -66,6 +66,11 @@ public class LocalSparkJobStatus implements SparkJobStatus { } @Override + public String getAppID() { + return sparkContext.sc().applicationId(); + } + + @Override public int getJobId() { return jobId; } http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e8d581f..d84c026 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -62,6 +62,17 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } @Override + public String getAppID() { + Future<String> getAppID = sparkClient.run(new GetAppIDJob()); + try { + return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Failed to get APP ID.", e); + return null; + } + } + + @Override public int getJobId() { return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1; } @@ -268,4 +279,15 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } }; } + + private static class GetAppIDJob implements Job<String> { + + public GetAppIDJob() { + } + + @Override + public String call(JobContext jc) throws Exception { + return jc.sc().sc().applicationId(); + } + } }
