Repository: hive Updated Branches: refs/heads/master f27c38ff5 -> 764b978fd
HIVE-19508: SparkJobMonitor getReport doesn't print stage progress in order (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/764b978f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/764b978f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/764b978f Branch: refs/heads/master Commit: 764b978fd5802887cdad02ce8074ddf5f8d8e2e4 Parents: f27c38f Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Wed Jun 6 14:15:45 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Wed Jun 6 14:15:45 2018 -0500 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/spark/SparkTask.java | 5 +- .../exec/spark/status/LocalSparkJobMonitor.java | 4 +- .../spark/status/RemoteSparkJobMonitor.java | 6 +- .../ql/exec/spark/status/SparkJobMonitor.java | 38 ++++----- .../ql/exec/spark/status/SparkJobStatus.java | 2 +- .../hive/ql/exec/spark/status/SparkStage.java | 72 ++++++++++++++++ .../spark/status/impl/LocalSparkJobStatus.java | 11 ++- .../spark/status/impl/RemoteSparkJobStatus.java | 9 +- .../exec/spark/status/TestSparkJobMonitor.java | 88 ++++++++++++++++++++ 9 files changed, 198 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index ddbb6ba..02613f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -545,11 +546,11 @@ public class SparkTask extends Task<SparkWork> { stageIds.add(stageId); } } - Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); + Map<SparkStage, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); int sumTotal = 0; int sumComplete = 0; int sumFailed = 0; - for (String s : progressMap.keySet()) { + for (SparkStage s : progressMap.keySet()) { SparkStageProgress progress = progressMap.get(s); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index 4ce9f53..2a6c33b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -42,7 +42,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor { boolean done = false; int rc = 0; JobExecutionStatus lastState = null; - Map<String, SparkStageProgress> lastProgressMap = null; + Map<SparkStage, SparkStageProgress> lastProgressMap = null; perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); @@ -68,7 +68,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor { } } else if (state != lastState || state == JobExecutionStatus.RUNNING) { lastState = state; - Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); + Map<SparkStage, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); switch (state) { case RUNNING: http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 98c228b..004b50b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -56,7 +56,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { boolean running = false; boolean done = false; int rc = 0; - Map<String, SparkStageProgress> lastProgressMap = null; + Map<SparkStage, SparkStageProgress> lastProgressMap = null; perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); @@ -89,7 +89,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { case STARTED: JobExecutionStatus sparkJobState = sparkJobStatus.getState(); if (sparkJobState == JobExecutionStatus.RUNNING) { - Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); + Map<SparkStage, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); printAppInfo(); @@ -137,7 +137,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { } break; case SUCCEEDED: - Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); + Map<SparkStage, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; double duration = (System.currentTimeMillis() - startTime) / 1000.0; http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 7afd886..e78b1cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -88,7 +88,7 @@ abstract class SparkJobMonitor { public abstract int startMonitor(); - private void printStatusInPlace(Map<String, SparkStageProgress> progressMap) { + private void printStatusInPlace(Map<SparkStage, SparkStageProgress> progressMap) { StringBuilder reportBuffer = new StringBuilder(); @@ -104,11 +104,11 @@ abstract class SparkJobMonitor { reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); reprintLine(SEPARATOR); - SortedSet<String> keys = new TreeSet<String>(progressMap.keySet()); + SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); int idx = 0; final int numKey = keys.size(); - for (String s : keys) { - SparkStageProgress progress = progressMap.get(s); + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); @@ -116,6 +116,7 @@ abstract class SparkJobMonitor { sumTotal += total; sumComplete += complete; + String s = stage.toString(); StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED; if (complete > 0 || running > 0 || failed > 0) { if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { @@ -130,9 +131,8 @@ abstract class SparkJobMonitor { } } - int div = s.indexOf('_'); - String attempt = div > 0 ? s.substring(div + 1) : "-"; - String stageName = "Stage-" + (div > 0 ? s.substring(0, div) : s); + String attempt = String.valueOf(stage.getAttemptId()); + String stageName = "Stage-" + String.valueOf(stage.getStageId()); String nameWithProgress = getNameWithProgress(stageName, complete, total); final int pending = total - complete - running; @@ -151,8 +151,8 @@ abstract class SparkJobMonitor { reprintLine(SEPARATOR); } - protected void printStatus(Map<String, SparkStageProgress> progressMap, - Map<String, SparkStageProgress> lastProgressMap) { + protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap, + Map<SparkStage, SparkStageProgress> lastProgressMap) { // do not print duplicate status while still in middle of print interval. boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); @@ -172,7 +172,7 @@ abstract class SparkJobMonitor { lastPrintTime = System.currentTimeMillis(); } - protected int getTotalTaskCount(Map<String, SparkStageProgress> progressMap) { + protected int getTotalTaskCount(Map<SparkStage, SparkStageProgress> progressMap) { int totalTasks = 0; for (SparkStageProgress progress: progressMap.values() ) { totalTasks += progress.getTotalTaskCount(); @@ -181,7 +181,7 @@ abstract class SparkJobMonitor { return totalTasks; } - protected int getStageMaxTaskCount(Map<String, SparkStageProgress> progressMap) { + protected int getStageMaxTaskCount(Map<SparkStage, SparkStageProgress> progressMap) { int stageMaxTasks = 0; for (SparkStageProgress progress: progressMap.values() ) { int tasks = progress.getTotalTaskCount(); @@ -193,7 +193,7 @@ abstract class SparkJobMonitor { return stageMaxTasks; } - private String getReport(Map<String, SparkStageProgress> progressMap) { + private String getReport(Map<SparkStage, SparkStageProgress> progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); String currentDate = dt.format(new Date()); @@ -203,16 +203,16 @@ abstract class SparkJobMonitor { int sumTotal = 0; int sumComplete = 0; - SortedSet<String> keys = new TreeSet<String>(progressMap.keySet()); - for (String s : keys) { - SparkStageProgress progress = progressMap.get(s); + SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); final int failed = progress.getFailedTaskCount(); sumTotal += total; sumComplete += complete; - + String s = stage.toString(); String stageName = "Stage-" + s; if (total <= 0) { reportBuffer.append(String.format("%s: -/-\t", stageName)); @@ -266,8 +266,8 @@ abstract class SparkJobMonitor { } private boolean isSameAsPreviousProgress( - Map<String, SparkStageProgress> progressMap, - Map<String, SparkStageProgress> lastProgressMap) { + Map<SparkStage, SparkStageProgress> progressMap, + Map<SparkStage, SparkStageProgress> lastProgressMap) { if (lastProgressMap == null) { return false; @@ -282,7 +282,7 @@ abstract class SparkJobMonitor { if (progressMap.size() != lastProgressMap.size()) { return false; } - for (String key : progressMap.keySet()) { + for (SparkStage key : progressMap.keySet()) { if (!lastProgressMap.containsKey(key) || !progressMap.get(key).equals(lastProgressMap.get(key))) { return false; http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 1e584f4..e8596c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -37,7 +37,7 @@ public interface SparkJobStatus { int[] getStageIds() throws HiveException; - Map<String, SparkStageProgress> getSparkStageProgress() throws HiveException; + Map<SparkStage, SparkStageProgress> getSparkStageProgress() throws HiveException; SparkCounters getCounter(); http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java new file mode 100644 index 0000000..7e4346a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import java.util.Objects; + +/** + * Class to hold information that can be used to identify a Spark stage. + */ +public class SparkStage implements Comparable<SparkStage> { + + private int stageId; + private int attemptId; + + public SparkStage(int stageId, int attemptId) { + this.stageId = stageId; + this.attemptId = attemptId; + } + + public int getStageId() { + return stageId; + } + + public int getAttemptId() { + return attemptId; + } + + @Override + public int compareTo(SparkStage stage) { + if (this.stageId == stage.stageId) { + return Integer.compare(this.attemptId, stage.attemptId); + } + return Integer.compare(this.stageId, stage.stageId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SparkStage that = (SparkStage) o; + return getStageId() == that.getStageId() && getAttemptId() == that.getAttemptId(); + } + + @Override + public int hashCode() { + return Objects.hash(stageId, attemptId); + } + + @Override + public String toString() { + return String.valueOf(this.stageId) + "_" + String.valueOf(this.attemptId); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 4368eb0..0b74ffe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -22,8 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.base.Throwables; - +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,8 +104,8 @@ public class LocalSparkJobStatus implements SparkJobStatus { } @Override - public Map<String, SparkStageProgress> getSparkStageProgress() { - Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>(); + public Map<SparkStage, SparkStageProgress> getSparkStageProgress() { + Map<SparkStage, SparkStageProgress> stageProgresses = new HashMap<SparkStage, SparkStageProgress>(); for (int stageId : getStageIds()) { SparkStageInfo sparkStageInfo = getStageInfo(stageId); if (sparkStageInfo != null) { @@ -116,8 +115,8 @@ public class LocalSparkJobStatus implements SparkJobStatus { int totalTaskCount = sparkStageInfo.numTasks(); SparkStageProgress sparkStageProgress = new SparkStageProgress( totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" - + sparkStageInfo.currentAttemptId(), sparkStageProgress); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); } } return stageProgresses; http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e4a53fb..d2e28b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,8 +99,8 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } @Override - public Map<String, SparkStageProgress> getSparkStageProgress() throws HiveException { - Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>(); + public Map<SparkStage, SparkStageProgress> getSparkStageProgress() throws HiveException { + Map<SparkStage, SparkStageProgress> stageProgresses = new HashMap<SparkStage, SparkStageProgress>(); for (int stageId : getStageIds()) { SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); if (sparkStageInfo != null && sparkStageInfo.name() != null) { @@ -109,8 +110,8 @@ public class RemoteSparkJobStatus implements SparkJobStatus { int totalTaskCount = sparkStageInfo.numTasks(); SparkStageProgress sparkStageProgress = new SparkStageProgress( totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" - + sparkStageInfo.currentAttemptId(), sparkStageProgress); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); } } return stageProgresses; http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java new file mode 100644 index 0000000..e66354f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +/** + * Test spark progress monitoring information. + */ +public class TestSparkJobMonitor { + + private HiveConf testConf; + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + private SparkJobMonitor monitor; + private PrintStream curOut; + private PrintStream curErr; + + @Before + public void setUp() { + testConf = new HiveConf(); + curOut = System.out; + curErr = System.err; + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + monitor = new SparkJobMonitor(testConf) { + @Override + public int startMonitor() { + return 0; + } + }; + + } + + private Map<SparkStage, SparkStageProgress> progressMap() { + return new HashMap<SparkStage, SparkStageProgress>() {{ + put(new SparkStage(1, 0), new SparkStageProgress(4, 3, 1, 0)); + put(new SparkStage(3, 1), new SparkStageProgress(6, 4, 1, 1)); + put(new SparkStage(9, 0), new SparkStageProgress(5, 5, 0, 0)); + put(new SparkStage(10, 2), new SparkStageProgress(5, 3, 2, 0)); + put(new SparkStage(15, 1), new SparkStageProgress(4, 3, 1, 0)); + put(new SparkStage(15, 2), new SparkStageProgress(4, 4, 0, 0)); + put(new SparkStage(20, 3), new SparkStageProgress(3, 1, 1, 1)); + put(new SparkStage(21, 1), new SparkStageProgress(2, 2, 0, 0)); + }}; + } + + @Test + public void testGetReport() { + Map<SparkStage, SparkStageProgress> progressMap = progressMap(); + monitor.printStatus(progressMap, null); + assertTrue(errContent.toString().contains( + "Stage-1_0: 3(+1)/4\tStage-3_1: 4(+1,-1)/6\tStage-9_0: 5/5 Finished\tStage-10_2: 3(+2)/5\t" + + "Stage-15_1: 3(+1)/4\tStage-15_2: 4/4 Finished\tStage-20_3: 1(+1,-1)/3\tStage-21_1: 2/2 Finished")); + } + + @After + public void tearDown() { + System.setOut(curOut); + System.setErr(curErr); + } +}