Author: brock
Date: Wed Oct 15 17:18:56 2014
New Revision: 1632126
URL: http://svn.apache.org/r1632126
Log:
HIVE-8455 - Print Spark job progress format info on the console[Spark Branch]
(Chengxiang Li via Brock)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
Removed:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1632126&r1=1632125&r2=1632126&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
Wed Oct 15 17:18:56 2014
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.status;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -25,7 +27,6 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
/**
@@ -39,7 +40,7 @@ public class SparkJobMonitor {
private transient LogHelper console;
private final int checkInterval = 200;
- private final int maxRetryInterval = 2500;
+ private final int maxRetryInterval = 2500;
private final int printInterval = 3000;
private long lastPrintTime;
private Set<String> completed;
@@ -59,19 +60,19 @@ public class SparkJobMonitor {
int failedCounter = 0;
int rc = 0;
SparkJobState lastState = null;
- String lastReport = null;
+ Map<String, SparkStageProgress> lastProgressMap = null;
long startTime = 0;
- while(true) {
+ while (true) {
try {
- Map<String, SparkProgress> progressMap =
sparkJobStatus.getSparkStageProgress();
+ Map<String, SparkStageProgress> progressMap =
sparkJobStatus.getSparkStageProgress();
SparkJobState state = sparkJobStatus.getState();
if (state != lastState || state == SparkJobState.RUNNING) {
lastState = state;
- switch(state) {
+ switch (state) {
case SUBMITTED:
console.printInfo("Status: Submitted");
break;
@@ -88,16 +89,22 @@ public class SparkJobMonitor {
}
console.printInfo("\nStatus: Running (Hive on Spark job[" +
- sparkJobStatus.getJobId() + "])\n");
+ sparkJobStatus.getJobId() + "])");
startTime = System.currentTimeMillis();
running = true;
+
+ console.printInfo("Job Progress Format\nCurrentTime
StageId_StageAttemptId: " +
+
"SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
[StageCost]");
}
- lastReport = printStatus(progressMap, lastReport, console);
+
+ printStatus(progressMap, lastProgressMap);
+ lastProgressMap = progressMap;
break;
case SUCCEEDED:
- lastReport = printStatus(progressMap, lastReport, console);
- double duration = (System.currentTimeMillis() - startTime)/1000.0;
+ printStatus(progressMap, lastProgressMap);
+ lastProgressMap = progressMap;
+ double duration = (System.currentTimeMillis() - startTime) /
1000.0;
console.printInfo("Status: Finished successfully in " +
String.format("%.2f seconds", duration));
running = false;
@@ -122,8 +129,8 @@ public class SparkJobMonitor {
Thread.sleep(checkInterval);
}
} catch (Exception e) {
- console.printInfo("Exception: "+e.getMessage());
- if (++failedCounter % maxRetryInterval/checkInterval == 0
+ console.printInfo("Exception: " + e.getMessage());
+ if (++failedCounter % maxRetryInterval / checkInterval == 0
|| e instanceof InterruptedException) {
console.printInfo("Killing Job...");
console.printError("Execution has failed.");
@@ -141,53 +148,97 @@ public class SparkJobMonitor {
return rc;
}
- private String printStatus(
- Map<String, SparkProgress> progressMap,
- String lastReport,
- LogHelper console) {
+ private void printStatus(Map<String, SparkStageProgress> progressMap,
Map<String, SparkStageProgress> lastProgressMap) {
+
+ // do not print duplicate status while still in middle of print interval.
+ boolean isDuplicateState = isSameAsPreviousProgress(progressMap,
lastProgressMap);
+ boolean isPassedInterval = System.currentTimeMillis() <= lastPrintTime +
printInterval;
+ if (isDuplicateState && isPassedInterval) {
+ return;
+ }
StringBuffer reportBuffer = new StringBuffer();
+ SimpleDateFormat dt = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss,SSS");
+ String currentDate = dt.format(new Date());
+ reportBuffer.append(currentDate + "\t");
SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- for (String s: keys) {
- SparkProgress progress = progressMap.get(s);
+ for (String s : keys) {
+ SparkStageProgress progress = progressMap.get(s);
final int complete = progress.getSucceededTaskCount();
final int total = progress.getTotalTaskCount();
final int running = progress.getRunningTaskCount();
final int failed = progress.getFailedTaskCount();
+ String stageName = "Stage-" + s;
if (total <= 0) {
- reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
+ reportBuffer.append(String.format("%s: -/-\t", stageName, complete,
total));
} else {
if (complete == total && !completed.contains(s)) {
completed.add(s);
}
- if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
/* stage is started, but not complete */
if (failed > 0) {
reportBuffer.append(
- String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running,
failed, total));
+ String.format(
+ "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed,
total));
} else {
- reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete,
running, total));
+ reportBuffer.append(
+ String.format("%s: %d(+%d)/%d\t", stageName, complete, running,
total));
}
} else {
+ double cost = progress.getCumulativeTime() / 1000.0;
/* stage is waiting for input/slots or complete */
if (failed > 0) {
/* tasks finished but some failed */
- reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete,
failed, total));
+ reportBuffer.append(
+ String.format(
+ "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete,
failed, total, cost));
} else {
- reportBuffer.append(String.format("%s: %d/%d\t", s, complete,
total));
+ if (complete == total) {
+ reportBuffer.append(
+ String.format("%s: %d/%d Finished in %,.2fs\t", stageName,
complete, total, cost));
+ } else {
+ reportBuffer.append(String.format("%s: %d/%d\t", stageName,
complete, total));
+ }
}
}
}
}
- String report = reportBuffer.toString();
- if (!report.equals(lastReport)
- || System.currentTimeMillis() >= lastPrintTime + printInterval) {
- console.printInfo(report);
- lastPrintTime = System.currentTimeMillis();
+ lastPrintTime = System.currentTimeMillis();
+ console.printInfo(reportBuffer.toString());
+ }
+
+ private boolean isSameAsPreviousProgress(
+ Map<String, SparkStageProgress> progressMap,
+ Map<String, SparkStageProgress> lastProgressMap) {
+
+ if (lastProgressMap == null) {
+ return false;
}
- return report;
+ if (progressMap.isEmpty()) {
+ if (lastProgressMap.isEmpty()) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ if (lastProgressMap.isEmpty()) {
+ return false;
+ } else {
+ if (progressMap.size() != lastProgressMap.size()) {
+ return false;
+ }
+ for (String key : progressMap.keySet()) {
+ if (!lastProgressMap.containsKey(key) ||
+ !progressMap.get(key).equals(lastProgressMap.get(key))) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
}
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1632126&r1=1632125&r2=1632126&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
Wed Oct 15 17:18:56 2014
@@ -28,10 +28,8 @@ public interface SparkJobStatus {
public SparkJobState getState();
- public SparkProgress getSparkJobProgress();
-
public int[] getStageIds();
- public Map<String, SparkProgress> getSparkStageProgress();
+ public Map<String, SparkStageProgress> getSparkStageProgress();
}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java?rev=1632126&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
Wed Oct 15 17:18:56 2014
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+public class SparkStageProgress {
+
+ private int totalTaskCount;
+ private int succeededTaskCount;
+ private int runningTaskCount;
+ private int failedTaskCount;
+ private int killedTaskCount;
+ private long cumulativeTime;
+
+ public SparkStageProgress(
+ int totalTaskCount,
+ int succeededTaskCount,
+ int runningTaskCount,
+ int failedTaskCount,
+ int killedTaskCount,
+ long cumulativeTime) {
+
+ this.totalTaskCount = totalTaskCount;
+ this.succeededTaskCount = succeededTaskCount;
+ this.runningTaskCount = runningTaskCount;
+ this.failedTaskCount = failedTaskCount;
+ this.killedTaskCount = killedTaskCount;
+ this.cumulativeTime = cumulativeTime;
+ }
+
+ public int getTotalTaskCount() {
+ return totalTaskCount;
+ }
+
+ public int getSucceededTaskCount() {
+ return succeededTaskCount;
+ }
+
+ public int getRunningTaskCount() {
+ return runningTaskCount;
+ }
+
+ public int getFailedTaskCount() {
+ return failedTaskCount;
+ }
+
+ public int getKilledTaskCount() {
+ return killedTaskCount;
+ }
+
+ public long getCumulativeTime() {
+ return cumulativeTime;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof SparkStageProgress) {
+ SparkStageProgress other = (SparkStageProgress) obj;
+ return getTotalTaskCount() == other.getTotalTaskCount()
+ && getSucceededTaskCount() == other.getSucceededTaskCount()
+ && getRunningTaskCount() == other.getRunningTaskCount()
+ && getFailedTaskCount() == other.getFailedTaskCount()
+ && getKilledTaskCount() == other.getKilledTaskCount();
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("TotalTasks: ");
+ sb.append(getTotalTaskCount());
+ sb.append(" Succeeded: ");
+ sb.append(getSucceededTaskCount());
+ sb.append(" Running: ");
+ sb.append(getRunningTaskCount());
+ sb.append(" Failed: ");
+ sb.append(getFailedTaskCount());
+ sb.append(" Killed: ");
+ sb.append(getKilledTaskCount());
+ sb.append(" CumulativeTime: ");
+ sb.append(getCumulativeTime() + "ms");
+ return sb.toString();
+ }
+}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1632126&r1=1632125&r2=1632126&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Wed Oct 15 17:18:56 2014
@@ -24,11 +24,12 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkProgress;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.spark.ui.jobs.UIData;
+import scala.Option;
import scala.Tuple2;
import static scala.collection.JavaConversions.bufferAsJavaList;
@@ -61,35 +62,13 @@ public class SimpleSparkJobStatus implem
}
@Override
- public SparkProgress getSparkJobProgress() {
- Map<String, SparkProgress> stageProgresses = getSparkStageProgress();
-
- int totalTaskCount = 0;
- int runningTaskCount = 0;
- int completedTaskCount = 0;
- int failedTaskCount = 0;
- int killedTaskCount = 0;
-
- for (SparkProgress sparkProgress : stageProgresses.values()) {
- totalTaskCount += sparkProgress.getTotalTaskCount();
- runningTaskCount += sparkProgress.getRunningTaskCount();
- completedTaskCount += sparkProgress.getSucceededTaskCount();
- failedTaskCount += sparkProgress.getFailedTaskCount();
- killedTaskCount += sparkProgress.getKilledTaskCount();
- }
-
- return new SparkProgress(
- totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount,
killedTaskCount);
- }
-
- @Override
public int[] getStageIds() {
return jobStateListener.getStageIds(jobId);
}
@Override
- public Map<String, SparkProgress> getSparkStageProgress() {
- Map<String, SparkProgress> stageProgresses = new HashMap<String,
SparkProgress>();
+ public Map<String, SparkStageProgress> getSparkStageProgress() {
+ Map<String, SparkStageProgress> stageProgresses = new HashMap<String,
SparkStageProgress>();
int[] stageIds = jobStateListener.getStageIds(jobId);
if (stageIds != null) {
for (int stageId : stageIds) {
@@ -104,12 +83,26 @@ public class SimpleSparkJobStatus implem
int failedTaskCount = uiData.numFailedTasks();
int totalTaskCount = stageInfo.numTasks();
int killedTaskCount = 0;
- SparkProgress stageProgress = new SparkProgress(
+ long costTime;
+ Option<Object> startOption = stageInfo.submissionTime();
+ Option<Object> completeOption = stageInfo.completionTime();
+ if (startOption.isEmpty()) {
+ costTime = 0;
+ } else if (completeOption.isEmpty()) {
+ long startTime = (Long)startOption.get();
+ costTime = System.currentTimeMillis() - startTime;
+ } else {
+ long startTime = (Long)startOption.get();
+ long completeTime = (Long)completeOption.get();
+ costTime = completeTime - startTime;
+ }
+ SparkStageProgress stageProgress = new SparkStageProgress(
totalTaskCount,
completedTaskCount,
runningTaskCount,
failedTaskCount,
- killedTaskCount);
+ killedTaskCount,
+ costTime);
stageProgresses.put(stageInfo.stageId() + "_" +
stageInfo.attemptId(), stageProgress);
}
}