This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ff39cd  [SPARK-27394][WEBUI] Flush LiveEntity if necessary when 
receiving SparkListenerExecutorMetricsUpdate
5ff39cd is described below

commit 5ff39cd5ee92da0d08380c7a680d350ff6f4b5db
Author: Shixiong Zhu <[email protected]>
AuthorDate: Tue Apr 9 08:26:00 2019 -0700

    [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving 
SparkListenerExecutorMetricsUpdate
    
    ## What changes were proposed in this pull request?
    
    This PR updates `AppStatusListener` to flush `LiveEntity` if necessary when 
receiving `SparkListenerExecutorMetricsUpdate`. This will ensure the staleness 
of Spark UI doesn't last more than the executor heartbeat interval.
    
    ## How was this patch tested?
    
    The new unit test.
    
    Closes #24303 from zsxwing/SPARK-27394.
    
    Authored-by: Shixiong Zhu <[email protected]>
    Signed-off-by: Marcelo Vanzin <[email protected]>
---
 .../org/apache/spark/internal/config/Status.scala  |  6 ++++
 .../apache/spark/status/AppStatusListener.scala    | 39 ++++++++++++++++------
 .../org/apache/spark/ui/UISeleniumSuite.scala      | 33 ++++++++++++++++--
 docs/configuration.md                              |  8 +++++
 4 files changed, 73 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
index c561572..3e6a4e9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
@@ -29,6 +29,12 @@ private[spark] object Status {
     .timeConf(TimeUnit.NANOSECONDS)
     .createWithDefaultString("100ms")
 
+  val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD = 
ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod")
+    .doc("Minimum time elapsed before stale UI data is flushed. This avoids UI 
staleness when " +
+      "incoming task events are not fired frequently.")
+    .timeConf(TimeUnit.NANOSECONDS)
+    .createWithDefaultString("1s")
+
   val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
     .intConf
     .createWithDefault(1000)
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index a3e8242..b085f21 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -58,6 +58,12 @@ private[spark] class AppStatusListener(
   // operations that we can live without when rapidly processing incoming task 
events.
   private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
 
+  /**
+   * Minimum time elapsed before stale UI data is flushed. This avoids UI 
staleness when incoming
+   * task events are not fired frequently.
+   */
+  private val liveUpdateMinFlushPeriod = 
conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)
+
   private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
   private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
 
@@ -76,6 +82,9 @@ private[spark] class AppStatusListener(
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
 
+  /** The last time when flushing `LiveEntity`s. This is to avoid flushing too 
frequently. */
+  private var lastFlushTimeNs = System.nanoTime()
+
   kvstore.addTrigger(classOf[ExecutorSummaryWrapper], 
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
     { count => cleanupExecutors(count) }
 
@@ -89,7 +98,8 @@ private[spark] class AppStatusListener(
 
   kvstore.onFlush {
     if (!live) {
-      flush()
+      val now = System.nanoTime()
+      flush(update(_, now))
     }
   }
 
@@ -831,6 +841,14 @@ private[spark] class AppStatusListener(
         }
       }
     }
+    // Flush updates if necessary. Executor heartbeat is an event that happens 
periodically. Flush
+    // here to ensure the staleness of Spark UI doesn't last more than
+    // `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
+    if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
+      flush(maybeUpdate(_, now))
+      // Re-get the current system time because `flush` may be slow and `now` 
is stale.
+      lastFlushTimeNs = System.nanoTime()
+    }
   }
 
   override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
@@ -856,18 +874,17 @@ private[spark] class AppStatusListener(
     }
   }
 
-  /** Flush all live entities' data to the underlying store. */
-  private def flush(): Unit = {
-    val now = System.nanoTime()
+  /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush 
them. */
+  private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
     liveStages.values.asScala.foreach { stage =>
-      update(stage, now)
-      stage.executorSummaries.values.foreach(update(_, now))
+      entityFlushFunc(stage)
+      stage.executorSummaries.values.foreach(entityFlushFunc)
     }
-    liveJobs.values.foreach(update(_, now))
-    liveExecutors.values.foreach(update(_, now))
-    liveTasks.values.foreach(update(_, now))
-    liveRDDs.values.foreach(update(_, now))
-    pools.values.foreach(update(_, now))
+    liveJobs.values.foreach(entityFlushFunc)
+    liveExecutors.values.foreach(entityFlushFunc)
+    liveTasks.values.foreach(entityFlushFunc)
+    liveRDDs.values.foreach(entityFlushFunc)
+    pools.values.foreach(entityFlushFunc)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index cdc7185..a9f03eb 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -100,14 +100,18 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
    * Create a test SparkContext with the SparkUI enabled.
    * It is safe to `get` the SparkUI directly from the SparkContext returned 
here.
    */
-  private def newSparkContext(killEnabled: Boolean = true): SparkContext = {
+  private def newSparkContext(
+      killEnabled: Boolean = true,
+      master: String = "local",
+      additionalConfs: Map[String, String] = Map.empty): SparkContext = {
     val conf = new SparkConf()
-      .setMaster("local")
+      .setMaster(master)
       .setAppName("test")
       .set(UI_ENABLED, true)
       .set(UI_PORT, 0)
       .set(UI_KILL_ENABLED, killEnabled)
       .set(MEMORY_OFFHEAP_SIZE.key, "64m")
+    additionalConfs.foreach { case (k, v) => conf.set(k, v) }
     val sc = new SparkContext(conf)
     assert(sc.ui.isDefined)
     sc
@@ -725,6 +729,31 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
     }
   }
 
+  test("Staleness of Spark UI should not last minutes or hours") {
+    withSpark(newSparkContext(
+      master = "local[2]",
+      // Set a small heart beat interval to make the test fast
+      additionalConfs = Map(
+        EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms",
+        LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc =>
+      sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+      val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
+        // Make the task never finish so there won't be any task start/end 
events after the first 2
+        // tasks start.
+        Thread.sleep(300000)
+      }
+      try {
+        eventually(timeout(10.seconds)) {
+          val jobsJson = getJson(sc.ui.get, "jobs")
+          jobsJson.children.length should be (1)
+          (jobsJson.children.head \ "numActiveTasks").extract[Int] should be 
(2)
+        }
+      } finally {
+        f.cancel()
+      }
+    }
+  }
+
   def goToUi(sc: SparkContext, path: String): Unit = {
     goToUi(sc.ui.get, path)
   }
diff --git a/docs/configuration.md b/docs/configuration.md
index 3187b77..5325f8a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -875,6 +875,14 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.ui.liveUpdate.minFlushPeriod</code></td>
+  <td>1s</td>
+  <td>
+    Minimum time elapsed before stale UI data is flushed. This avoids UI 
staleness when incoming
+    task events are not fired frequently.
+  </td>
+</tr>
+<tr>
   <td><code>spark.ui.port</code></td>
   <td>4040</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to