Repository: hive Updated Branches: refs/heads/master 4e9562f1e -> ed4fa73ba
HIVE-19733: RemoteSparkJobStatus#getSparkStageProgress inefficient implementation (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed4fa73b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed4fa73b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed4fa73b Branch: refs/heads/master Commit: ed4fa73ba740026ac0d4297d6a45432dc60d1073 Parents: 4e9562f Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Mon Jul 23 18:35:04 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Jul 23 18:35:41 2018 -0500 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 3 + .../spark/status/impl/RemoteSparkJobStatus.java | 108 +++++++++++++------ 2 files changed, 78 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ed4fa73b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 06d0ed3..37bc153 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -599,6 +599,9 @@ public enum ErrorMsg { SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"), REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."), + SPARK_GET_STAGES_INFO_TIMEOUT(30048, "Spark job GetSparkStagesInfoJob timed out after {0} seconds.", true), + SPARK_GET_STAGES_INFO_INTERRUPTED(30049, "Spark job GetSparkStagesInfoJob was interrupted."), + SPARK_GET_STAGES_INFO_EXECUTIONERROR(30050, "Spark job GetSparkStagesInfoJob failed in execution while getting job info due to exception {0}", true), //========================== 40000 range starts here ========================// http://git-wip-us.apache.org/repos/asf/hive/blob/ed4fa73b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 832832b..3d41443 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -45,6 +45,7 @@ import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -103,18 +104,20 @@ public class RemoteSparkJobStatus implements SparkJobStatus { @Override public Map<SparkStage, SparkStageProgress> getSparkStageProgress() throws HiveException { + List<SparkStageInfo> sparkStagesInfo = getSparkStagesInfo(); Map<SparkStage, SparkStageProgress> stageProgresses = new HashMap<SparkStage, SparkStageProgress>(); - for (int stageId : getStageIds()) { - SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); - if (sparkStageInfo != null && sparkStageInfo.name() != null) { - int runningTaskCount = sparkStageInfo.numActiveTasks(); - int completedTaskCount = sparkStageInfo.numCompletedTasks(); - int failedTaskCount = sparkStageInfo.numFailedTasks(); - int totalTaskCount = sparkStageInfo.numTasks(); - SparkStageProgress sparkStageProgress = new SparkStageProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); - stageProgresses.put(stage, sparkStageProgress); + if (sparkStagesInfo != null) { + for (SparkStageInfo sparkStageInfo : sparkStagesInfo) { + if (sparkStageInfo != null && sparkStageInfo.name() != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = + new SparkStageProgress(totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); + } } } return stageProgresses; @@ -212,14 +215,26 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } } - private SparkStageInfo getSparkStageInfo(int stageId) { - Future<SparkStageInfo> getStageInfo = sparkClient.run(new GetStageInfoJob(stageId)); - try { - return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); - } catch (Throwable t) { - LOG.warn("Error getting stage info", t); + private List<SparkStageInfo> getSparkStagesInfo()throws HiveException { + + Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 + ? jobHandle.getSparkJobIds().get(0) : null; + if (sparkJobId == null) { return null; } + Future<ArrayList<SparkStageInfo>> getStagesInfo = sparkClient.run( + new GetSparkStagesInfoJob(jobHandle.getClientJobId(), sparkJobId)); + try { + return getStagesInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_TIMEOUT, + Long.toString(sparkClientTimeoutInSeconds)); + } catch (InterruptedException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_INTERRUPTED); + } catch (ExecutionException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_EXECUTIONERROR, + Throwables.getRootCause(e).getMessage()); + } } public JobHandle.State getRemoteJobState() { @@ -229,25 +244,24 @@ public class RemoteSparkJobStatus implements SparkJobStatus { return jobHandle.getState(); } - private static class GetJobInfoJob implements Job<SparkJobInfo> { + private static class GetSparkStagesInfoJob implements Job<ArrayList<SparkStageInfo>> { private final String clientJobId; private final int sparkJobId; - private GetJobInfoJob() { + private GetSparkStagesInfoJob() { // For serialization. this(null, -1); } - GetJobInfoJob(String clientJobId, int sparkJobId) { + GetSparkStagesInfoJob(String clientJobId, int sparkJobId) { this.clientJobId = clientJobId; this.sparkJobId = sparkJobId; } - @Override - public SparkJobInfo call(JobContext jc) throws Exception { + public ArrayList<SparkStageInfo> call(JobContext jc) throws Exception { SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); if (jobInfo == null) { - List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId); + ArrayList<JavaFutureAction<?>> list = new ArrayList<>(jc.getMonitoredJobs().get(clientJobId)); if (list != null && list.size() == 1) { JavaFutureAction<?> futureAction = list.get(0); if (futureAction.isDone()) { @@ -266,25 +280,53 @@ public class RemoteSparkJobStatus implements SparkJobStatus { if (jobInfo == null) { jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); } - return jobInfo; + ArrayList<SparkStageInfo> sparkStageInfos = new ArrayList<>(); + int[] stageIds = jobInfo.stageIds(); + for(Integer stageid : stageIds) { + SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageid); + sparkStageInfos.add(stageInfo); + } + return sparkStageInfos; } } + private static class GetJobInfoJob implements Job<SparkJobInfo> { + private final String clientJobId; + private final int sparkJobId; - private static class GetStageInfoJob implements Job<SparkStageInfo> { - private final int stageId; - - private GetStageInfoJob() { + private GetJobInfoJob() { // For serialization. - this(-1); + this(null, -1); } - GetStageInfoJob(int stageId) { - this.stageId = stageId; + GetJobInfoJob(String clientJobId, int sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; } @Override - public SparkStageInfo call(JobContext jc) throws Exception { - return jc.sc().statusTracker().getStageInfo(stageId); + public SparkJobInfo call(JobContext jc) throws Exception { + SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + if (jobInfo == null) { + List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId); + if (list != null && list.size() == 1) { + JavaFutureAction<?> futureAction = list.get(0); + if (futureAction.isDone()) { + boolean futureSucceed = true; + try { + futureAction.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + sparkJobId, e); + futureSucceed = false; + } + jobInfo = getDefaultJobInfo(sparkJobId, + futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); + } + } + } + if (jobInfo == null) { + jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); + } + return jobInfo; } }