Repository: hive
Updated Branches:
  refs/heads/master f27c38ff5 -> 764b978fd


HIVE-19508: SparkJobMonitor getReport doesn't print stage progress in order 
(Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/764b978f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/764b978f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/764b978f

Branch: refs/heads/master
Commit: 764b978fd5802887cdad02ce8074ddf5f8d8e2e4
Parents: f27c38f
Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com>
Authored: Wed Jun 6 14:15:45 2018 -0500
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Wed Jun 6 14:15:45 2018 -0500

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |  5 +-
 .../exec/spark/status/LocalSparkJobMonitor.java |  4 +-
 .../spark/status/RemoteSparkJobMonitor.java     |  6 +-
 .../ql/exec/spark/status/SparkJobMonitor.java   | 38 ++++-----
 .../ql/exec/spark/status/SparkJobStatus.java    |  2 +-
 .../hive/ql/exec/spark/status/SparkStage.java   | 72 ++++++++++++++++
 .../spark/status/impl/LocalSparkJobStatus.java  | 11 ++-
 .../spark/status/impl/RemoteSparkJobStatus.java |  9 +-
 .../exec/spark/status/TestSparkJobMonitor.java  | 88 ++++++++++++++++++++
 9 files changed, 198 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index ddbb6ba..02613f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils;
 
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -545,11 +546,11 @@ public class SparkTask extends Task<SparkWork> {
           stageIds.add(stageId);
         }
       }
-      Map<String, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
+      Map<SparkStage, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
       int sumTotal = 0;
       int sumComplete = 0;
       int sumFailed = 0;
-      for (String s : progressMap.keySet()) {
+      for (SparkStage s : progressMap.keySet()) {
         SparkStageProgress progress = progressMap.get(s);
         final int complete = progress.getSucceededTaskCount();
         final int total = progress.getTotalTaskCount();

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index 4ce9f53..2a6c33b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -42,7 +42,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
     boolean done = false;
     int rc = 0;
     JobExecutionStatus lastState = null;
-    Map<String, SparkStageProgress> lastProgressMap = null;
+    Map<SparkStage, SparkStageProgress> lastProgressMap = null;
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
@@ -68,7 +68,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
           }
         } else if (state != lastState || state == JobExecutionStatus.RUNNING) {
           lastState = state;
-          Map<String, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
+          Map<SparkStage, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
 
           switch (state) {
           case RUNNING:

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 98c228b..004b50b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -56,7 +56,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
     boolean running = false;
     boolean done = false;
     int rc = 0;
-    Map<String, SparkStageProgress> lastProgressMap = null;
+    Map<SparkStage, SparkStageProgress> lastProgressMap = null;
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
@@ -89,7 +89,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
         case STARTED:
           JobExecutionStatus sparkJobState = sparkJobStatus.getState();
           if (sparkJobState == JobExecutionStatus.RUNNING) {
-            Map<String, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
+            Map<SparkStage, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
             if (!running) {
               perfLogger.PerfLogEnd(CLASS_NAME, 
PerfLogger.SPARK_SUBMIT_TO_RUNNING);
               printAppInfo();
@@ -137,7 +137,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
           }
           break;
         case SUCCEEDED:
-          Map<String, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
+          Map<SparkStage, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
           printStatus(progressMap, lastProgressMap);
           lastProgressMap = progressMap;
           double duration = (System.currentTimeMillis() - startTime) / 1000.0;

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 7afd886..e78b1cd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -88,7 +88,7 @@ abstract class SparkJobMonitor {
 
   public abstract int startMonitor();
 
-  private void printStatusInPlace(Map<String, SparkStageProgress> progressMap) 
{
+  private void printStatusInPlace(Map<SparkStage, SparkStageProgress> 
progressMap) {
 
     StringBuilder reportBuffer = new StringBuilder();
 
@@ -104,11 +104,11 @@ abstract class SparkJobMonitor {
     reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
     reprintLine(SEPARATOR);
 
-    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+    SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
     int idx = 0;
     final int numKey = keys.size();
-    for (String s : keys) {
-      SparkStageProgress progress = progressMap.get(s);
+    for (SparkStage stage : keys) {
+      SparkStageProgress progress = progressMap.get(stage);
       final int complete = progress.getSucceededTaskCount();
       final int total = progress.getTotalTaskCount();
       final int running = progress.getRunningTaskCount();
@@ -116,6 +116,7 @@ abstract class SparkJobMonitor {
       sumTotal += total;
       sumComplete += complete;
 
+      String s = stage.toString();
       StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED;
       if (complete > 0 || running > 0 || failed > 0) {
         if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
@@ -130,9 +131,8 @@ abstract class SparkJobMonitor {
         }
       }
 
-      int div = s.indexOf('_');
-      String attempt = div > 0 ? s.substring(div + 1) : "-";
-      String stageName = "Stage-" + (div > 0 ? s.substring(0, div) : s);
+      String attempt = String.valueOf(stage.getAttemptId());
+      String stageName = "Stage-" + String.valueOf(stage.getStageId());
       String nameWithProgress = getNameWithProgress(stageName, complete, 
total);
 
       final int pending = total - complete - running;
@@ -151,8 +151,8 @@ abstract class SparkJobMonitor {
     reprintLine(SEPARATOR);
   }
 
-  protected void printStatus(Map<String, SparkStageProgress> progressMap,
-      Map<String, SparkStageProgress> lastProgressMap) {
+  protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
+      Map<SparkStage, SparkStageProgress> lastProgressMap) {
 
     // do not print duplicate status while still in middle of print interval.
     boolean isDuplicateState = isSameAsPreviousProgress(progressMap, 
lastProgressMap);
@@ -172,7 +172,7 @@ abstract class SparkJobMonitor {
     lastPrintTime = System.currentTimeMillis();
   }
 
-  protected int getTotalTaskCount(Map<String, SparkStageProgress> progressMap) 
{
+  protected int getTotalTaskCount(Map<SparkStage, SparkStageProgress> 
progressMap) {
     int totalTasks = 0;
     for (SparkStageProgress progress: progressMap.values() ) {
       totalTasks += progress.getTotalTaskCount();
@@ -181,7 +181,7 @@ abstract class SparkJobMonitor {
     return totalTasks;
   }
 
-  protected int getStageMaxTaskCount(Map<String, SparkStageProgress> 
progressMap) {
+  protected int getStageMaxTaskCount(Map<SparkStage, SparkStageProgress> 
progressMap) {
     int stageMaxTasks = 0;
     for (SparkStageProgress progress: progressMap.values() ) {
       int tasks = progress.getTotalTaskCount();
@@ -193,7 +193,7 @@ abstract class SparkJobMonitor {
     return stageMaxTasks;
   }
 
-  private String getReport(Map<String, SparkStageProgress> progressMap) {
+  private String getReport(Map<SparkStage, SparkStageProgress> progressMap) {
     StringBuilder reportBuffer = new StringBuilder();
     SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
     String currentDate = dt.format(new Date());
@@ -203,16 +203,16 @@ abstract class SparkJobMonitor {
     int sumTotal = 0;
     int sumComplete = 0;
 
-    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
-    for (String s : keys) {
-      SparkStageProgress progress = progressMap.get(s);
+    SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+    for (SparkStage stage : keys) {
+      SparkStageProgress progress = progressMap.get(stage);
       final int complete = progress.getSucceededTaskCount();
       final int total = progress.getTotalTaskCount();
       final int running = progress.getRunningTaskCount();
       final int failed = progress.getFailedTaskCount();
       sumTotal += total;
       sumComplete += complete;
-
+      String s = stage.toString();
       String stageName = "Stage-" + s;
       if (total <= 0) {
         reportBuffer.append(String.format("%s: -/-\t", stageName));
@@ -266,8 +266,8 @@ abstract class SparkJobMonitor {
   }
 
   private boolean isSameAsPreviousProgress(
-      Map<String, SparkStageProgress> progressMap,
-      Map<String, SparkStageProgress> lastProgressMap) {
+      Map<SparkStage, SparkStageProgress> progressMap,
+      Map<SparkStage, SparkStageProgress> lastProgressMap) {
 
     if (lastProgressMap == null) {
       return false;
@@ -282,7 +282,7 @@ abstract class SparkJobMonitor {
         if (progressMap.size() != lastProgressMap.size()) {
           return false;
         }
-        for (String key : progressMap.keySet()) {
+        for (SparkStage key : progressMap.keySet()) {
           if (!lastProgressMap.containsKey(key)
               || !progressMap.get(key).equals(lastProgressMap.get(key))) {
             return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
index 1e584f4..e8596c6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
@@ -37,7 +37,7 @@ public interface SparkJobStatus {
 
   int[] getStageIds() throws HiveException;
 
-  Map<String, SparkStageProgress> getSparkStageProgress() throws HiveException;
+  Map<SparkStage, SparkStageProgress> getSparkStageProgress() throws 
HiveException;
 
   SparkCounters getCounter();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java
new file mode 100644
index 0000000..7e4346a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.Objects;
+
+/**
+ * Class to hold information that can be used to identify a Spark stage.
+ */
+public class SparkStage implements Comparable<SparkStage> {
+
+  private int stageId;
+  private int attemptId;
+
+  public SparkStage(int stageId, int attemptId) {
+    this.stageId = stageId;
+    this.attemptId = attemptId;
+  }
+
+  public int getStageId() {
+    return stageId;
+  }
+
+  public int getAttemptId() {
+    return attemptId;
+  }
+
+  @Override
+  public int compareTo(SparkStage stage) {
+    if (this.stageId == stage.stageId) {
+      return Integer.compare(this.attemptId, stage.attemptId);
+    }
+    return Integer.compare(this.stageId, stage.stageId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SparkStage that = (SparkStage) o;
+    return getStageId() == that.getStageId() && getAttemptId() == 
that.getAttemptId();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(stageId, attemptId);
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(this.stageId) + "_" + String.valueOf(this.attemptId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index 4368eb0..0b74ffe 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -22,8 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.base.Throwables;
-
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,8 +104,8 @@ public class LocalSparkJobStatus implements SparkJobStatus {
   }
 
   @Override
-  public Map<String, SparkStageProgress> getSparkStageProgress() {
-    Map<String, SparkStageProgress> stageProgresses = new HashMap<String, 
SparkStageProgress>();
+  public Map<SparkStage, SparkStageProgress> getSparkStageProgress() {
+    Map<SparkStage, SparkStageProgress> stageProgresses = new 
HashMap<SparkStage, SparkStageProgress>();
     for (int stageId : getStageIds()) {
       SparkStageInfo sparkStageInfo = getStageInfo(stageId);
       if (sparkStageInfo != null) {
@@ -116,8 +115,8 @@ public class LocalSparkJobStatus implements SparkJobStatus {
         int totalTaskCount = sparkStageInfo.numTasks();
         SparkStageProgress sparkStageProgress = new SparkStageProgress(
             totalTaskCount, completedTaskCount, runningTaskCount, 
failedTaskCount);
-        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_"
-          + sparkStageInfo.currentAttemptId(), sparkStageProgress);
+        SparkStage stage = new SparkStage(sparkStageInfo.stageId(), 
sparkStageInfo.currentAttemptId());
+        stageProgresses.put(stage, sparkStageProgress);
       }
     }
     return stageProgresses;

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index e4a53fb..d2e28b0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
 
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,8 +99,8 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
   }
 
   @Override
-  public Map<String, SparkStageProgress> getSparkStageProgress() throws 
HiveException {
-    Map<String, SparkStageProgress> stageProgresses = new HashMap<String, 
SparkStageProgress>();
+  public Map<SparkStage, SparkStageProgress> getSparkStageProgress() throws 
HiveException {
+    Map<SparkStage, SparkStageProgress> stageProgresses = new 
HashMap<SparkStage, SparkStageProgress>();
     for (int stageId : getStageIds()) {
       SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId);
       if (sparkStageInfo != null && sparkStageInfo.name() != null) {
@@ -109,8 +110,8 @@ public class RemoteSparkJobStatus implements SparkJobStatus 
{
         int totalTaskCount = sparkStageInfo.numTasks();
         SparkStageProgress sparkStageProgress = new SparkStageProgress(
             totalTaskCount, completedTaskCount, runningTaskCount, 
failedTaskCount);
-        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_"
-          + sparkStageInfo.currentAttemptId(), sparkStageProgress);
+        SparkStage stage = new SparkStage(sparkStageInfo.stageId(), 
sparkStageInfo.currentAttemptId());
+        stageProgresses.put(stage, sparkStageProgress);
       }
     }
     return stageProgresses;

http://git-wip-us.apache.org/repos/asf/hive/blob/764b978f/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
new file mode 100644
index 0000000..e66354f
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test spark progress monitoring information.
+ */
+public class TestSparkJobMonitor {
+
+  private HiveConf testConf;
+  private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+  private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+  private SparkJobMonitor monitor;
+  private PrintStream curOut;
+  private PrintStream curErr;
+
+  @Before
+  public void setUp() {
+    testConf = new HiveConf();
+    curOut = System.out;
+    curErr = System.err;
+    System.setOut(new PrintStream(outContent));
+    System.setErr(new PrintStream(errContent));
+
+    monitor = new SparkJobMonitor(testConf) {
+      @Override
+      public int startMonitor() {
+        return 0;
+      }
+    };
+
+  }
+
+  private Map<SparkStage, SparkStageProgress> progressMap() {
+    return new HashMap<SparkStage, SparkStageProgress>() {{
+        put(new SparkStage(1, 0), new SparkStageProgress(4, 3, 1, 0));
+        put(new SparkStage(3, 1), new SparkStageProgress(6, 4, 1, 1));
+        put(new SparkStage(9, 0), new SparkStageProgress(5, 5, 0, 0));
+        put(new SparkStage(10, 2), new SparkStageProgress(5, 3, 2, 0));
+        put(new SparkStage(15, 1), new SparkStageProgress(4, 3, 1, 0));
+        put(new SparkStage(15, 2), new SparkStageProgress(4, 4, 0, 0));
+        put(new SparkStage(20, 3), new SparkStageProgress(3, 1, 1, 1));
+        put(new SparkStage(21, 1), new SparkStageProgress(2, 2, 0, 0));
+      }};
+  }
+
+  @Test
+  public void testGetReport() {
+    Map<SparkStage, SparkStageProgress> progressMap = progressMap();
+    monitor.printStatus(progressMap, null);
+    assertTrue(errContent.toString().contains(
+        "Stage-1_0: 3(+1)/4\tStage-3_1: 4(+1,-1)/6\tStage-9_0: 5/5 
Finished\tStage-10_2: 3(+2)/5\t"
+            + "Stage-15_1: 3(+1)/4\tStage-15_2: 4/4 Finished\tStage-20_3: 
1(+1,-1)/3\tStage-21_1: 2/2 Finished"));
+  }
+
+  @After
+  public void tearDown() {
+    System.setOut(curOut);
+    System.setErr(curErr);
+  }
+}

Reply via email to