Author: brock
Date: Wed Oct 15 17:18:56 2014
New Revision: 1632126

URL: http://svn.apache.org/r1632126
Log:
HIVE-8455 - Print Spark job progress format info on the console[Spark Branch] 
(Chengxiang Li via Brock)

Added:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
Removed:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java
Modified:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1632126&r1=1632125&r2=1632126&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
 Wed Oct 15 17:18:56 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -25,7 +27,6 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 
 /**
@@ -39,7 +40,7 @@ public class SparkJobMonitor {
 
   private transient LogHelper console;
   private final int checkInterval = 200;
-  private final int  maxRetryInterval = 2500;
+  private final int maxRetryInterval = 2500;
   private final int printInterval = 3000;
   private long lastPrintTime;
   private Set<String> completed;
@@ -59,19 +60,19 @@ public class SparkJobMonitor {
     int failedCounter = 0;
     int rc = 0;
     SparkJobState lastState = null;
-    String lastReport = null;
+    Map<String, SparkStageProgress> lastProgressMap = null;
     long startTime = 0;
 
-    while(true) {
+    while (true) {
 
       try {
-        Map<String, SparkProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
+        Map<String, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
         SparkJobState state = sparkJobStatus.getState();
 
         if (state != lastState || state == SparkJobState.RUNNING) {
           lastState = state;
 
-          switch(state) {
+          switch (state) {
           case SUBMITTED:
             console.printInfo("Status: Submitted");
             break;
@@ -88,16 +89,22 @@ public class SparkJobMonitor {
               }
 
               console.printInfo("\nStatus: Running (Hive on Spark job[" +
-                sparkJobStatus.getJobId() + "])\n");
+                sparkJobStatus.getJobId() + "])");
               startTime = System.currentTimeMillis();
               running = true;
+
+              console.printInfo("Job Progress Format\nCurrentTime 
StageId_StageAttemptId: " +
+                
"SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount 
[StageCost]");
             }
 
-            lastReport = printStatus(progressMap, lastReport, console);
+
+            printStatus(progressMap, lastProgressMap);
+            lastProgressMap = progressMap;
             break;
           case SUCCEEDED:
-            lastReport = printStatus(progressMap, lastReport, console);
-            double duration = (System.currentTimeMillis() - startTime)/1000.0;
+            printStatus(progressMap, lastProgressMap);
+            lastProgressMap = progressMap;
+            double duration = (System.currentTimeMillis() - startTime) / 
1000.0;
             console.printInfo("Status: Finished successfully in " +
               String.format("%.2f seconds", duration));
             running = false;
@@ -122,8 +129,8 @@ public class SparkJobMonitor {
           Thread.sleep(checkInterval);
         }
       } catch (Exception e) {
-        console.printInfo("Exception: "+e.getMessage());
-        if (++failedCounter % maxRetryInterval/checkInterval == 0
+        console.printInfo("Exception: " + e.getMessage());
+        if (++failedCounter % maxRetryInterval / checkInterval == 0
           || e instanceof InterruptedException) {
           console.printInfo("Killing Job...");
           console.printError("Execution has failed.");
@@ -141,53 +148,97 @@ public class SparkJobMonitor {
     return rc;
   }
 
-  private String printStatus(
-    Map<String, SparkProgress> progressMap,
-    String lastReport,
-    LogHelper console) {
+  private void printStatus(Map<String, SparkStageProgress> progressMap, 
Map<String, SparkStageProgress> lastProgressMap) {
+
+    // do not print duplicate status while still in middle of print interval.
+    boolean isDuplicateState = isSameAsPreviousProgress(progressMap, 
lastProgressMap);
+    boolean isPassedInterval = System.currentTimeMillis() <= lastPrintTime + 
printInterval;
+    if (isDuplicateState && isPassedInterval) {
+      return;
+    }
 
     StringBuffer reportBuffer = new StringBuffer();
+    SimpleDateFormat dt = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss,SSS");
+    String currentDate = dt.format(new Date());
+    reportBuffer.append(currentDate + "\t");
 
     SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
-    for (String s: keys) {
-      SparkProgress progress = progressMap.get(s);
+    for (String s : keys) {
+      SparkStageProgress progress = progressMap.get(s);
       final int complete = progress.getSucceededTaskCount();
       final int total = progress.getTotalTaskCount();
       final int running = progress.getRunningTaskCount();
       final int failed = progress.getFailedTaskCount();
+      String stageName = "Stage-" + s;
       if (total <= 0) {
-        reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
+        reportBuffer.append(String.format("%s: -/-\t", stageName, complete, 
total));
       } else {
         if (complete == total && !completed.contains(s)) {
           completed.add(s);
         }
-        if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
+        if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
           /* stage is started, but not complete */
           if (failed > 0) {
             reportBuffer.append(
-              String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, 
failed, total));
+              String.format(
+                "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, 
total));
           } else {
-            reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, 
running, total));
+            reportBuffer.append(
+              String.format("%s: %d(+%d)/%d\t", stageName, complete, running, 
total));
           }
         } else {
+          double cost = progress.getCumulativeTime() / 1000.0;
           /* stage is waiting for input/slots or complete */
           if (failed > 0) {
             /* tasks finished but some failed */
-            reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, 
failed, total));
+            reportBuffer.append(
+              String.format(
+                "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete, 
failed, total, cost));
           } else {
-            reportBuffer.append(String.format("%s: %d/%d\t", s, complete, 
total));
+            if (complete == total) {
+              reportBuffer.append(
+                String.format("%s: %d/%d Finished in %,.2fs\t", stageName, 
complete, total, cost));
+            } else {
+              reportBuffer.append(String.format("%s: %d/%d\t", stageName, 
complete, total));
+            }
           }
         }
       }
     }
 
-    String report = reportBuffer.toString();
-    if (!report.equals(lastReport)
-      || System.currentTimeMillis() >= lastPrintTime + printInterval) {
-      console.printInfo(report);
-      lastPrintTime = System.currentTimeMillis();
+    lastPrintTime = System.currentTimeMillis();
+    console.printInfo(reportBuffer.toString());
+  }
+
+  private boolean isSameAsPreviousProgress(
+    Map<String, SparkStageProgress> progressMap,
+    Map<String, SparkStageProgress> lastProgressMap) {
+
+    if (lastProgressMap == null) {
+      return false;
     }
 
-    return report;
+    if (progressMap.isEmpty()) {
+      if (lastProgressMap.isEmpty()) {
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      if (lastProgressMap.isEmpty()) {
+        return false;
+      } else {
+        if (progressMap.size() != lastProgressMap.size()) {
+          return false;
+        }
+        for (String key : progressMap.keySet()) {
+          if (!lastProgressMap.containsKey(key) ||
+            !progressMap.get(key).equals(lastProgressMap.get(key))) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
   }
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1632126&r1=1632125&r2=1632126&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
 Wed Oct 15 17:18:56 2014
@@ -28,10 +28,8 @@ public interface SparkJobStatus {
 
   public SparkJobState getState();
 
-  public SparkProgress getSparkJobProgress();
-
   public int[] getStageIds();
 
-  public Map<String, SparkProgress> getSparkStageProgress();
+  public Map<String, SparkStageProgress> getSparkStageProgress();
 
 }

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java?rev=1632126&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
 Wed Oct 15 17:18:56 2014
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+public class SparkStageProgress {
+
+  private int totalTaskCount;
+  private int succeededTaskCount;
+  private int runningTaskCount;
+  private int failedTaskCount;
+  private int killedTaskCount;
+  private long cumulativeTime;
+
+  public SparkStageProgress(
+    int totalTaskCount,
+    int succeededTaskCount,
+    int runningTaskCount,
+    int failedTaskCount,
+    int killedTaskCount,
+    long cumulativeTime) {
+
+    this.totalTaskCount = totalTaskCount;
+    this.succeededTaskCount = succeededTaskCount;
+    this.runningTaskCount = runningTaskCount;
+    this.failedTaskCount = failedTaskCount;
+    this.killedTaskCount = killedTaskCount;
+    this.cumulativeTime = cumulativeTime;
+  }
+
+  public int getTotalTaskCount() {
+    return totalTaskCount;
+  }
+
+  public int getSucceededTaskCount() {
+    return succeededTaskCount;
+  }
+
+  public int getRunningTaskCount() {
+    return runningTaskCount;
+  }
+
+  public int getFailedTaskCount() {
+    return failedTaskCount;
+  }
+
+  public int getKilledTaskCount() {
+    return killedTaskCount;
+  }
+
+  public long getCumulativeTime() {
+    return cumulativeTime;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof SparkStageProgress) {
+      SparkStageProgress other = (SparkStageProgress) obj;
+      return getTotalTaskCount() == other.getTotalTaskCount()
+        && getSucceededTaskCount() == other.getSucceededTaskCount()
+        && getRunningTaskCount() == other.getRunningTaskCount()
+        && getFailedTaskCount() == other.getFailedTaskCount()
+        && getKilledTaskCount() == other.getKilledTaskCount();
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("TotalTasks: ");
+    sb.append(getTotalTaskCount());
+    sb.append(" Succeeded: ");
+    sb.append(getSucceededTaskCount());
+    sb.append(" Running: ");
+    sb.append(getRunningTaskCount());
+    sb.append(" Failed: ");
+    sb.append(getFailedTaskCount());
+    sb.append(" Killed: ");
+    sb.append(getKilledTaskCount());
+    sb.append(" CumulativeTime: ");
+    sb.append(getCumulativeTime() + "ms");
+    return sb.toString();
+  }
+}

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1632126&r1=1632125&r2=1632126&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
 Wed Oct 15 17:18:56 2014
@@ -24,11 +24,12 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkProgress;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
 import org.apache.spark.scheduler.StageInfo;
 import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.spark.ui.jobs.UIData;
 
+import scala.Option;
 import scala.Tuple2;
 
 import static scala.collection.JavaConversions.bufferAsJavaList;
@@ -61,35 +62,13 @@ public class SimpleSparkJobStatus implem
   }
 
   @Override
-  public SparkProgress getSparkJobProgress() {
-    Map<String, SparkProgress> stageProgresses = getSparkStageProgress();
-
-    int totalTaskCount = 0;
-    int runningTaskCount = 0;
-    int completedTaskCount = 0;
-    int failedTaskCount = 0;
-    int killedTaskCount = 0;
-
-    for (SparkProgress sparkProgress : stageProgresses.values()) {
-      totalTaskCount += sparkProgress.getTotalTaskCount();
-      runningTaskCount += sparkProgress.getRunningTaskCount();
-      completedTaskCount += sparkProgress.getSucceededTaskCount();
-      failedTaskCount += sparkProgress.getFailedTaskCount();
-      killedTaskCount += sparkProgress.getKilledTaskCount();
-    }
-
-    return new SparkProgress(
-      totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount, 
killedTaskCount);
-  }
-
-  @Override
   public int[] getStageIds() {
     return jobStateListener.getStageIds(jobId);
   }
 
   @Override
-  public Map<String, SparkProgress> getSparkStageProgress() {
-    Map<String, SparkProgress> stageProgresses = new HashMap<String, 
SparkProgress>();
+  public Map<String, SparkStageProgress> getSparkStageProgress() {
+    Map<String, SparkStageProgress> stageProgresses = new HashMap<String, 
SparkStageProgress>();
     int[] stageIds = jobStateListener.getStageIds(jobId);
     if (stageIds != null) {
       for (int stageId : stageIds) {
@@ -104,12 +83,26 @@ public class SimpleSparkJobStatus implem
             int failedTaskCount = uiData.numFailedTasks();
             int totalTaskCount = stageInfo.numTasks();
             int killedTaskCount = 0;
-            SparkProgress stageProgress = new SparkProgress(
+            long costTime;
+            Option<Object> startOption = stageInfo.submissionTime();
+            Option<Object> completeOption = stageInfo.completionTime();
+            if (startOption.isEmpty()) {
+              costTime = 0;
+            } else if (completeOption.isEmpty()) {
+              long startTime = (Long)startOption.get();
+              costTime = System.currentTimeMillis() - startTime;
+            } else {
+              long startTime = (Long)startOption.get();
+              long completeTime = (Long)completeOption.get();
+              costTime = completeTime - startTime;
+            }
+            SparkStageProgress stageProgress = new SparkStageProgress(
               totalTaskCount,
               completedTaskCount,
               runningTaskCount,
               failedTaskCount,
-              killedTaskCount);
+              killedTaskCount,
+              costTime);
             stageProgresses.put(stageInfo.stageId() + "_" + 
stageInfo.attemptId(), stageProgress);
           }
         }


Reply via email to