Repository: spark
Updated Branches:
  refs/heads/branch-1.2 2d26c6248 -> 04b1bdbae


[SPARK-4017] show progress bar in console

The progress bar will look like this:

![1___spark_job__85_250_finished__4_are_running___java_](https://cloud.githubusercontent.com/assets/40902/4854813/a02f44ac-6099-11e4-9060-7c73a73151d6.png)

In the right corner, the numbers are: finished tasks, running tasks, total 
tasks.

After the stage has finished, it will disappear.

The progress bar is only showed if logging level is WARN or higher (but 
progress in title is still showed), it can be turned off by 
spark.driver.showConsoleProgress.

Author: Davies Liu <[email protected]>

Closes #3029 from davies/progress and squashes the following commits:

95336d5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress
fc49ac8 [Davies Liu] address commentse
2e90f75 [Davies Liu] show multiple stages in same time
0081bcc [Davies Liu] address comments
38c42f1 [Davies Liu] fix tests
ab87958 [Davies Liu] disable progress bar during tests
30ac852 [Davies Liu] re-implement progress bar
b3f34e5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress
6fd30ff [Davies Liu] show progress bar if no task finished in 500ms
e4e7344 [Davies Liu] refactor
e1f524d [Davies Liu] revert unnecessary change
a60477c [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress
5cae3f2 [Davies Liu] fix style
ea49fe0 [Davies Liu] address comments
bc53d99 [Davies Liu] refactor
e6bb189 [Davies Liu] fix logging in sparkshell
7e7d4e7 [Davies Liu] address commments
5df26bb [Davies Liu] fix style
9e42208 [Davies Liu] show progress bar in console and title

(cherry picked from commit e34f38ff1a0dfbb0ffa4bd11071e03b1a58de998)
Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04b1bdba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04b1bdba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04b1bdba

Branch: refs/heads/branch-1.2
Commit: 04b1bdbae31c3039125100e703121daf7d9dabf5
Parents: 2d26c62
Author: Davies Liu <[email protected]>
Authored: Tue Nov 18 13:37:21 2014 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Tue Nov 18 13:37:37 2014 -0800

----------------------------------------------------------------------
 bin/spark-submit                                |   3 +
 .../java/org/apache/spark/SparkStageInfo.java   |   1 +
 .../scala/org/apache/spark/SparkContext.scala   |  10 +-
 .../org/apache/spark/SparkStatusTracker.scala   |   1 +
 .../scala/org/apache/spark/StatusAPIImpl.scala  |   1 +
 .../apache/spark/ui/ConsoleProgressBar.scala    | 124 +++++++++++++++++++
 pom.xml                                         |   1 +
 project/SparkBuild.scala                        |   1 +
 8 files changed, 141 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/bin/spark-submit
----------------------------------------------------------------------
diff --git a/bin/spark-submit b/bin/spark-submit
index c557311..f92d90c 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -22,6 +22,9 @@
 export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
 ORIG_ARGS=("$@")
 
+# Set COLUMNS for progress bar
+export COLUMNS=`tput cols`
+
 while (($#)); do
   if [ "$1" = "--deploy-mode" ]; then
     SPARK_SUBMIT_DEPLOY_MODE=$2

http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/core/src/main/java/org/apache/spark/SparkStageInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java 
b/core/src/main/java/org/apache/spark/SparkStageInfo.java
index 04e2247..fd74321 100644
--- a/core/src/main/java/org/apache/spark/SparkStageInfo.java
+++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java
@@ -26,6 +26,7 @@ package org.apache.spark;
 public interface SparkStageInfo {
   int stageId();
   int currentAttemptId();
+  long submissionTime();
   String name();
   int numTasks();
   int numActiveTasks();

http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7cccf74..3701312 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -50,7 +50,7 @@ import 
org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, 
MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage._
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
 import org.apache.spark.ui.jobs.JobProgressListener
 import org.apache.spark.util._
 
@@ -245,6 +245,13 @@ class SparkContext(config: SparkConf) extends Logging {
 
   val statusTracker = new SparkStatusTracker(this)
 
+  private[spark] val progressBar: Option[ConsoleProgressBar] =
+    if (conf.getBoolean("spark.ui.showConsoleProgress", true) && 
!log.isInfoEnabled) {
+      Some(new ConsoleProgressBar(this))
+    } else {
+      None
+    }
+
   // Initialize the Spark UI
   private[spark] val ui: Option[SparkUI] =
     if (conf.getBoolean("spark.ui.enabled", true)) {
@@ -1274,6 +1281,7 @@ class SparkContext(config: SparkConf) extends Logging {
     logInfo("Starting job: " + callSite.shortForm)
     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
       resultHandler, localProperties.get)
+    progressBar.foreach(_.finishAll())
     rdd.doCheckpoint()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index c18d763..edbdda8 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -96,6 +96,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
         new SparkStageInfoImpl(
           stageId,
           info.attemptId,
+          info.submissionTime.getOrElse(0),
           info.name,
           info.numTasks,
           data.numActiveTasks,

http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala 
b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
index 90b47c8..e5c7c8d 100644
--- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
+++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
@@ -26,6 +26,7 @@ private class SparkJobInfoImpl (
 private class SparkStageInfoImpl(
   val stageId: Int,
   val currentAttemptId: Int,
+  val submissionTime: Long,
   val name: String,
   val numTasks: Int,
   val numActiveTasks: Int,

http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala 
b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
new file mode 100644
index 0000000..27ba9e1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import java.util.{Timer, TimerTask}
+
+import org.apache.spark._
+
+/**
+ * ConsoleProgressBar shows the progress of stages in the next line of the 
console. It poll the
+ * status of active stages from `sc.statusTracker` periodically, the progress 
bar will be showed
+ * up after the stage has ran at least 500ms. If multiple stages run in the 
same time, the status
+ * of them will be combined together, showed in one line.
+ */
+private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
+
+  // Carrige return
+  val CR = '\r'
+  // Update period of progress bar, in milliseconds
+  val UPDATE_PERIOD = 200L
+  // Delay to show up a progress bar, in milliseconds
+  val FIRST_DELAY = 500L
+
+  // The width of terminal
+  val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
+    sys.env.get("COLUMNS").get.toInt
+  } else {
+    80
+  }
+
+  var lastFinishTime = 0L
+  var lastUpdateTime = 0L
+  var lastProgressBar = ""
+
+  // Schedule a refresh thread to run periodically
+  private val timer = new Timer("refresh progress", true)
+  timer.schedule(new TimerTask{
+    override def run() {
+      refresh()
+    }
+  }, FIRST_DELAY, UPDATE_PERIOD)
+
+  /**
+   * Try to refresh the progress bar in every cycle
+   */
+  private def refresh(): Unit = synchronized {
+    val now = System.currentTimeMillis()
+    if (now - lastFinishTime < FIRST_DELAY) {
+      return
+    }
+    val stageIds = sc.statusTracker.getActiveStageIds()
+    val stages = 
stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks() > 1)
+      .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
+    if (stages.size > 0) {
+      show(now, stages.take(3))  // display at most 3 stages in same time
+    }
+  }
+
+  /**
+   * Show progress bar in console. The progress bar is displayed in the next 
line
+   * after your last output, keeps overwriting itself to hold in one line. The 
logging will follow
+   * the progress bar, then progress bar will be showed in next line without 
overwrite logs.
+   */
+  private def show(now: Long, stages: Seq[SparkStageInfo]) {
+    val width = TerminalWidth / stages.size
+    val bar = stages.map { s =>
+      val total = s.numTasks()
+      val header = s"[Stage ${s.stageId()}:"
+      val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / 
$total]"
+      val w = width - header.size - tailer.size
+      val bar = if (w > 0) {
+        val percent = w * s.numCompletedTasks() / total
+        (0 until w).map { i =>
+          if (i < percent) "=" else if (i == percent) ">" else " "
+        }.mkString("")
+      } else {
+        ""
+      }
+      header + bar + tailer
+    }.mkString("")
+
+    // only refresh if it's changed of after 1 minute (or the ssh connection 
will be closed
+    // after idle some time)
+    if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
+      System.err.print(CR + bar)
+      lastUpdateTime = now
+    }
+    lastProgressBar = bar
+  }
+
+  /**
+   * Clear the progress bar if showed.
+   */
+  private def clear() {
+    if (!lastProgressBar.isEmpty) {
+      System.err.printf(CR + " " * TerminalWidth + CR)
+      lastProgressBar = ""
+    }
+  }
+
+  /**
+   * Mark all the stages as finished, clear the progress bar if showed, then 
the progress will not
+   * interweave with output of jobs.
+   */
+  def finishAll(): Unit = synchronized {
+    clear()
+    lastFinishTime = System.currentTimeMillis()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 41f4ec1..418c4af 100644
--- a/pom.xml
+++ b/pom.xml
@@ -977,6 +977,7 @@
               
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
               <spark.testing>1</spark.testing>
               <spark.ui.enabled>false</spark.ui.enabled>
+              
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
               
<spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath>
               
<spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts>
             </systemProperties>

http://git-wip-us.apache.org/repos/asf/spark/blob/04b1bdba/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1697b6d..c1879ce 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -377,6 +377,7 @@ object TestSettings {
     javaOptions in Test += "-Dspark.testing=1",
     javaOptions in Test += "-Dspark.port.maxRetries=100",
     javaOptions in Test += "-Dspark.ui.enabled=false",
+    javaOptions in Test += "-Dspark.ui.showConsoleProgress=false",
     javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true",
     javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
     javaOptions in Test ++= System.getProperties.filter(_._1 startsWith 
"spark")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to