Repository: hive Updated Branches: refs/heads/master e19b861cf -> e7d1781ec
HIVE-19176: Add HoS support to progress bar on Beeline client (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e7d1781e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e7d1781e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e7d1781e Branch: refs/heads/master Commit: e7d1781ec4662e088dcd6ffbe3f866738792ad9b Parents: e19b861 Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Mon Jul 2 11:42:59 2018 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Jul 2 11:42:59 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../org/apache/hive/jdbc/HiveStatement.java | 4 +- .../exec/spark/status/LocalSparkJobMonitor.java | 4 +- .../spark/status/RemoteSparkJobMonitor.java | 4 +- .../ql/exec/spark/status/RenderStrategy.java | 246 +++++++++++++++++++ .../ql/exec/spark/status/SparkJobMonitor.java | 157 +----------- .../hive/ql/exec/spark/TestSparkTask.java | 1 + .../exec/spark/status/TestSparkJobMonitor.java | 29 ++- .../org/apache/hive/service/ServiceUtils.java | 5 +- .../cli/SparkProgressMonitorStatusMapper.java | 52 ++++ .../service/cli/thrift/ThriftCLIService.java | 5 +- 11 files changed, 349 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a3dd53e..7ef22d6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3780,7 +3780,7 @@ public class HiveConf extends Configuration { "hive.server2.in.place.progress", true, "Allows hive server 2 to send progress bar update information. This is currently available" - + " only if the execution engine is tez."), + + " only if the execution engine is tez or Spark."), TEZ_DAG_STATUS_CHECK_INTERVAL("hive.tez.dag.status.check.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), "Interval between subsequent DAG status invocation."), SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true, http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index ad8d1a7..0b38f9c 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -376,7 +376,9 @@ public class HiveStatement implements java.sql.Statement { * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ statusResp = client.GetOperationStatus(statusReq); - inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse()); + if(!isOperationComplete) { + inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse()); + } Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index 2a6c33b..aeef3c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -89,11 +89,11 @@ public class LocalSparkJobMonitor extends SparkJobMonitor { + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); } - printStatus(progressMap, lastProgressMap); + updateFunction.printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; break; case SUCCEEDED: - printStatus(progressMap, lastProgressMap); + updateFunction.printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; double duration = (System.currentTimeMillis() - startTime) / 1000.0; console.printInfo("Status: Finished successfully in " http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 560fb58..87b69cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -131,13 +131,13 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { } } - printStatus(progressMap, lastProgressMap); + updateFunction.printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; } break; case SUCCEEDED: Map<SparkStage, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); - printStatus(progressMap, lastProgressMap); + updateFunction.printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; double duration = (System.currentTimeMillis() - startTime) / 1000.0; console.printInfo("Spark job[" + sparkJobStatus.getJobId() + "] finished successfully in " http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java new file mode 100644 index 0000000..67a3a9c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to render progress bar for Hive on Spark job status. + * Based on the configuration, appropriate render strategy is selected + * to show the progress bar on beeline or Hive CLI, as well as for logging + * the report String. + */ +class RenderStrategy { + + interface UpdateFunction { + void printStatus(Map<SparkStage, SparkStageProgress> progressMap, + Map<SparkStage, SparkStageProgress> lastProgressMap); + } + + private abstract static class BaseUpdateFunction implements UpdateFunction { + protected final SparkJobMonitor monitor; + private final PerfLogger perfLogger; + private long lastPrintTime; + private static final int PRINT_INTERVAL = 3000; + private final Set<String> completed = new HashSet<String>(); + private String lastReport = null; + + BaseUpdateFunction(SparkJobMonitor monitor) { + this.monitor = monitor; + this.perfLogger = SessionState.getPerfLogger(); + } + + private String getReport(Map<SparkStage, SparkStageProgress> progressMap) { + StringBuilder reportBuffer = new StringBuilder(); + SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + String currentDate = dt.format(new Date()); + reportBuffer.append(currentDate + "\t"); + + // Num of total and completed tasks + int sumTotal = 0; + int sumComplete = 0; + + SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskCount(); + sumTotal += total; + sumComplete += complete; + String s = stage.toString(); + String stageName = "Stage-" + s; + if (total <= 0) { + reportBuffer.append(String.format("%s: -/-\t", stageName)); + } else { + if (complete == total && !completed.contains(s)) { + completed.add(s); + + if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { + perfLogger.PerfLogBegin(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); + } + perfLogger.PerfLogEnd(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); + } + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { + /* stage is started, but not complete */ + if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { + perfLogger.PerfLogBegin(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); + } + if (failed > 0) { + reportBuffer.append( + String.format( + "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total)); + } else { + reportBuffer.append( + String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total)); + } + } else { + /* stage is waiting for input/slots or complete */ + if (failed > 0) { + /* tasks finished but some failed */ + reportBuffer.append( + String.format( + "%s: %d(-%d)/%d Finished with failed tasks\t", + stageName, complete, failed, total)); + } else { + if (complete == total) { + reportBuffer.append( + String.format("%s: %d/%d Finished\t", stageName, complete, total)); + } else { + reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total)); + } + } + } + } + } + + if (SessionState.get() != null) { + final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; + SessionState.get().updateProgressedPercentage(progress); + } + return reportBuffer.toString(); + } + + private boolean isSameAsPreviousProgress( + Map<SparkStage, SparkStageProgress> progressMap, + Map<SparkStage, SparkStageProgress> lastProgressMap) { + + if (lastProgressMap == null) { + return false; + } + + if (progressMap.isEmpty()) { + return lastProgressMap.isEmpty(); + } else { + if (lastProgressMap.isEmpty()) { + return false; + } else { + if (progressMap.size() != lastProgressMap.size()) { + return false; + } + for (Map.Entry<SparkStage, SparkStageProgress> entry : progressMap.entrySet()) { + if (!lastProgressMap.containsKey(entry.getKey()) + || !progressMap.get(entry.getKey()).equals(lastProgressMap.get(entry.getKey()))) { + return false; + } + } + } + } + return true; + } + + + private boolean showReport(String report) { + return !report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL; + } + + @Override + public void printStatus(Map<SparkStage, SparkStageProgress> progressMap, + Map<SparkStage, SparkStageProgress> lastProgressMap) { + // do not print duplicate status while still in middle of print interval. + boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); + boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + PRINT_INTERVAL; + if (isDuplicateState && withinInterval) { + return; + } + + String report = getReport(progressMap); + renderProgress(monitor.getProgressMonitor(progressMap)); + if (showReport(report)) { + renderReport(report); + lastReport = report; + lastPrintTime = System.currentTimeMillis(); + } + } + + abstract void renderProgress(ProgressMonitor monitor); + + abstract void renderReport(String report); + } + + /** + * This is used to show progress bar on Beeline while using HiveServer2. + */ + static class LogToFileFunction extends BaseUpdateFunction { + private static final Logger LOGGER = LoggerFactory.getLogger(LogToFileFunction.class); + private boolean hiveServer2InPlaceProgressEnabled = + SessionState.get().getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS); + + LogToFileFunction(SparkJobMonitor monitor) { + super(monitor); + } + + @Override + void renderProgress(ProgressMonitor monitor) { + SessionState.get().updateProgressMonitor(monitor); + } + + @Override + void renderReport(String report) { + if (hiveServer2InPlaceProgressEnabled) { + LOGGER.info(report); + } else { + monitor.console.printInfo(report); + } + } + } + + /** + * This is used to show progress bar on Hive CLI. + */ + static class InPlaceUpdateFunction extends BaseUpdateFunction { + /** + * Have to use the same instance to render else the number lines printed earlier is lost and the + * screen will print the table again and again. + */ + private final InPlaceUpdate inPlaceUpdate; + + InPlaceUpdateFunction(SparkJobMonitor monitor) { + super(monitor); + inPlaceUpdate = new InPlaceUpdate(SessionState.LogHelper.getInfoStream()); + } + + @Override + void renderProgress(ProgressMonitor monitor) { + inPlaceUpdate.render(monitor); + } + + @Override + void renderReport(String report) { + monitor.console.logInfo(report); + } + } +} + + http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 3531ac2..5fd0c02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; +import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -25,13 +26,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashSet; import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.TimeUnit; abstract class SparkJobMonitor { @@ -42,60 +37,27 @@ abstract class SparkJobMonitor { protected final PerfLogger perfLogger = SessionState.getPerfLogger(); protected final int checkInterval = 1000; protected final long monitorTimeoutInterval; - private final InPlaceUpdate inPlaceUpdateFn; - - private final Set<String> completed = new HashSet<String>(); - private final int printInterval = 3000; - private long lastPrintTime; - + final RenderStrategy.UpdateFunction updateFunction; protected long startTime; protected enum StageState { - PENDING, - RUNNING, - FINISHED + PENDING, RUNNING, FINISHED } protected final boolean inPlaceUpdate; protected SparkJobMonitor(HiveConf hiveConf) { - monitorTimeoutInterval = hiveConf.getTimeVar( - HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); + monitorTimeoutInterval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent(); console = new SessionState.LogHelper(LOG); - inPlaceUpdateFn = new InPlaceUpdate(SessionState.LogHelper.getInfoStream()); + updateFunction = updateFunction(); } public abstract int startMonitor(); - private void printStatusInPlace(Map<SparkStage, SparkStageProgress> progressMap) { - inPlaceUpdateFn.render(getProgressMonitor(progressMap)); - } - - protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap, - Map<SparkStage, SparkStageProgress> lastProgressMap) { - - // do not print duplicate status while still in middle of print interval. - boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); - boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + printInterval; - if (isDuplicateState && withinInterval) { - return; - } - - String report = getReport(progressMap); - if (inPlaceUpdate) { - printStatusInPlace(progressMap); - console.logInfo(report); - } else { - console.printInfo(report); - } - - lastPrintTime = System.currentTimeMillis(); - } - protected int getTotalTaskCount(Map<SparkStage, SparkStageProgress> progressMap) { int totalTasks = 0; - for (SparkStageProgress progress: progressMap.values() ) { + for (SparkStageProgress progress : progressMap.values()) { totalTasks += progress.getTotalTaskCount(); } @@ -104,7 +66,7 @@ abstract class SparkJobMonitor { protected int getStageMaxTaskCount(Map<SparkStage, SparkStageProgress> progressMap) { int stageMaxTasks = 0; - for (SparkStageProgress progress: progressMap.values() ) { + for (SparkStageProgress progress : progressMap.values()) { int tasks = progress.getTotalTaskCount(); if (tasks > stageMaxTasks) { stageMaxTasks = tasks; @@ -114,107 +76,12 @@ abstract class SparkJobMonitor { return stageMaxTasks; } - private String getReport(Map<SparkStage, SparkStageProgress> progressMap) { - StringBuilder reportBuffer = new StringBuilder(); - SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); - String currentDate = dt.format(new Date()); - reportBuffer.append(currentDate + "\t"); - - // Num of total and completed tasks - int sumTotal = 0; - int sumComplete = 0; - - SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); - for (SparkStage stage : keys) { - SparkStageProgress progress = progressMap.get(stage); - final int complete = progress.getSucceededTaskCount(); - final int total = progress.getTotalTaskCount(); - final int running = progress.getRunningTaskCount(); - final int failed = progress.getFailedTaskCount(); - sumTotal += total; - sumComplete += complete; - String s = stage.toString(); - String stageName = "Stage-" + s; - if (total <= 0) { - reportBuffer.append(String.format("%s: -/-\t", stageName)); - } else { - if (complete == total && !completed.contains(s)) { - completed.add(s); - - if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); - } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); - } - if (complete < total && (complete > 0 || running > 0 || failed > 0)) { - /* stage is started, but not complete */ - if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); - } - if (failed > 0) { - reportBuffer.append( - String.format( - "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total)); - } else { - reportBuffer.append( - String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total)); - } - } else { - /* stage is waiting for input/slots or complete */ - if (failed > 0) { - /* tasks finished but some failed */ - reportBuffer.append( - String.format( - "%s: %d(-%d)/%d Finished with failed tasks\t", - stageName, complete, failed, total)); - } else { - if (complete == total) { - reportBuffer.append( - String.format("%s: %d/%d Finished\t", stageName, complete, total)); - } else { - reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total)); - } - } - } - } - } - - if (SessionState.get() != null) { - final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; - SessionState.get().updateProgressedPercentage(progress); - } - return reportBuffer.toString(); - } - - private boolean isSameAsPreviousProgress( - Map<SparkStage, SparkStageProgress> progressMap, - Map<SparkStage, SparkStageProgress> lastProgressMap) { - - if (lastProgressMap == null) { - return false; - } - - if (progressMap.isEmpty()) { - return lastProgressMap.isEmpty(); - } else { - if (lastProgressMap.isEmpty()) { - return false; - } else { - if (progressMap.size() != lastProgressMap.size()) { - return false; - } - for (SparkStage key : progressMap.keySet()) { - if (!lastProgressMap.containsKey(key) - || !progressMap.get(key).equals(lastProgressMap.get(key))) { - return false; - } - } - } - } - return true; + ProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) { + return new SparkProgressMonitor(progressMap, startTime); } - private SparkProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) { - return new SparkProgressMonitor(progressMap, startTime); + private RenderStrategy.UpdateFunction updateFunction() { + return inPlaceUpdate && !SessionState.get().isHiveServerQuery() ? new RenderStrategy.InPlaceUpdateFunction( + this) : new RenderStrategy.LogToFileFunction(this); } } http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 368fa9f..2017fc1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -105,6 +105,7 @@ public class TestSparkTask { when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED); when(jobSts.isRemoteActive()).thenReturn(true); HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, jobSts); Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3); } http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java index e66354f..7257b32 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -40,22 +41,23 @@ public class TestSparkJobMonitor { private SparkJobMonitor monitor; private PrintStream curOut; private PrintStream curErr; + private RenderStrategy.InPlaceUpdateFunction updateFunction; @Before public void setUp() { - testConf = new HiveConf(); curOut = System.out; curErr = System.err; System.setOut(new PrintStream(outContent)); System.setErr(new PrintStream(errContent)); - + testConf = new HiveConf(); + SessionState.start(testConf); monitor = new SparkJobMonitor(testConf) { @Override public int startMonitor() { return 0; } }; - + updateFunction = new RenderStrategy.InPlaceUpdateFunction(monitor); } private Map<SparkStage, SparkStageProgress> progressMap() { @@ -72,12 +74,27 @@ public class TestSparkJobMonitor { } @Test - public void testGetReport() { + public void testProgress() { Map<SparkStage, SparkStageProgress> progressMap = progressMap(); - monitor.printStatus(progressMap, null); - assertTrue(errContent.toString().contains( + updateFunction.printStatus(progressMap, null); + String testOutput = errContent.toString(); + assertTrue(testOutput.contains( "Stage-1_0: 3(+1)/4\tStage-3_1: 4(+1,-1)/6\tStage-9_0: 5/5 Finished\tStage-10_2: 3(+2)/5\t" + "Stage-15_1: 3(+1)/4\tStage-15_2: 4/4 Finished\tStage-20_3: 1(+1,-1)/3\tStage-21_1: 2/2 Finished")); + String[] testStrings = new String[]{ + "STAGES ATTEMPT STATUS TOTAL COMPLETED RUNNING PENDING FAILED", + "Stage-1 ...... 0 RUNNING 4 3 1 0 0", + "Stage-3 ..... 1 RUNNING 6 4 1 1 1", + "Stage-9 ........ 0 FINISHED 5 5 0 0 0", + "Stage-10 .... 2 RUNNING 5 3 2 0 0", + "Stage-15 ..... 1 RUNNING 4 3 1 0 0", + "Stage-15 ....... 2 FINISHED 4 4 0 0 0", + "Stage-20 .. 3 RUNNING 3 1 1 1 1", + "Stage-21 ....... 1 FINISHED 2 2 0 0 0", + "STAGES: 03/08 [===================>>-------] 75% ELAPSED TIME:"}; + for(String testString : testStrings) { + assertTrue(testOutput.contains(testString)); + } } @After http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/ServiceUtils.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/ServiceUtils.java b/service/src/java/org/apache/hive/service/ServiceUtils.java index 226e432..49fb5d5 100644 --- a/service/src/java/org/apache/hive/service/ServiceUtils.java +++ b/service/src/java/org/apache/hive/service/ServiceUtils.java @@ -69,8 +69,9 @@ public class ServiceUtils { } public static boolean canProvideProgressLog(HiveConf hiveConf) { - return "tez".equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) - && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS); + return ("tez".equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) || "spark" + .equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) && hiveConf + .getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java b/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java new file mode 100644 index 0000000..c2a222e --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli; + +import org.apache.hive.service.rpc.thrift.TJobExecutionStatus; +import org.apache.commons.lang3.StringUtils; + +/** + * Maps status of spark stages to job execution status. + */ +public class SparkProgressMonitorStatusMapper implements ProgressMonitorStatusMapper { + /** + * These states are taken form DAGStatus.State, could not use that here directly as it was + * optional dependency and did not want to include it just for the enum. + */ + enum SparkStatus { + PENDING, RUNNING, FINISHED + + } + + @Override + public TJobExecutionStatus forStatus(String status) { + if (StringUtils.isEmpty(status)) { + return TJobExecutionStatus.NOT_AVAILABLE; + } + SparkProgressMonitorStatusMapper.SparkStatus sparkStatus = + SparkProgressMonitorStatusMapper.SparkStatus.valueOf(status); + switch (sparkStatus) { + case PENDING: + case RUNNING: + return TJobExecutionStatus.IN_PROGRESS; + default: + return TJobExecutionStatus.COMPLETE; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 68fe8d8..259ca63 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -21,6 +21,7 @@ package org.apache.hive.service.cli.thrift; import static com.google.common.base.Preconditions.checkArgument; import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.SparkProgressMonitorStatusMapper; import org.apache.hive.service.rpc.thrift.TSetClientInfoReq; import org.apache.hive.service.rpc.thrift.TSetClientInfoResp; @@ -707,7 +708,9 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe if ("tez".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { mapper = new TezProgressMonitorStatusMapper(); } - + if ("spark".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { + mapper = new SparkProgressMonitorStatusMapper(); + } TJobExecutionStatus executionStatus = mapper.forStatus(progressUpdate.status); resp.setProgressUpdateResponse(new TProgressUpdateResp(