This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a8a2ba1  [SPARK-27394][WEBUI] Flush LiveEntity if necessary when 
receiving SparkListenerExecutorMetricsUpdate (backport 2.4)
a8a2ba1 is described below

commit a8a2ba11ac10051423e58920062b50f328b06421
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Wed Apr 10 15:17:04 2019 -0700

    [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving 
SparkListenerExecutorMetricsUpdate (backport 2.4)
    
    ## What changes were proposed in this pull request?
    
    This PR backports #24303 to 2.4.
    
    ## How was this patch tested?
    
    Jenkins
    
    Closes #24328 from zsxwing/SPARK-27394-2.4.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../apache/spark/status/AppStatusListener.scala    | 40 ++++++++++++++++------
 .../scala/org/apache/spark/status/config.scala     |  6 ++++
 .../org/apache/spark/ui/UISeleniumSuite.scala      | 35 +++++++++++++++++--
 docs/configuration.md                              |  8 +++++
 4 files changed, 75 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index c4dd47d..cb7ab7f 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -58,6 +58,12 @@ private[spark] class AppStatusListener(
   // operations that we can live without when rapidly processing incoming task 
events.
   private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
 
+  /**
+   * Minimum time elapsed before stale UI data is flushed. This avoids UI 
staleness when incoming
+   * task events are not fired frequently.
+   */
+  private val liveUpdateMinFlushPeriod = 
conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)
+
   private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
   private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
 
@@ -73,6 +79,9 @@ private[spark] class AppStatusListener(
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
 
+  /** The last time when flushing `LiveEntity`s. This is to avoid flushing too 
frequently. */
+  private var lastFlushTimeNs = System.nanoTime()
+
   kvstore.addTrigger(classOf[ExecutorSummaryWrapper], 
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
     { count => cleanupExecutors(count) }
 
@@ -86,7 +95,8 @@ private[spark] class AppStatusListener(
 
   kvstore.onFlush {
     if (!live) {
-      flush()
+      val now = System.nanoTime()
+      flush(update(_, now))
     }
   }
 
@@ -744,6 +754,15 @@ private[spark] class AppStatusListener(
         }
       }
     }
+
+    // Flush updates if necessary. Executor heartbeat is an event that happens 
periodically. Flush
+    // here to ensure the staleness of Spark UI doesn't last more than
+    // `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
+    if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
+      flush(maybeUpdate(_, now))
+      // Re-get the current system time because `flush` may be slow and `now` 
is stale.
+      lastFlushTimeNs = System.nanoTime()
+    }
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
@@ -755,18 +774,17 @@ private[spark] class AppStatusListener(
     }
   }
 
-  /** Flush all live entities' data to the underlying store. */
-  private def flush(): Unit = {
-    val now = System.nanoTime()
+  /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush 
them. */
+  private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
     liveStages.values.asScala.foreach { stage =>
-      update(stage, now)
-      stage.executorSummaries.values.foreach(update(_, now))
+      entityFlushFunc(stage)
+      stage.executorSummaries.values.foreach(entityFlushFunc)
     }
-    liveJobs.values.foreach(update(_, now))
-    liveExecutors.values.foreach(update(_, now))
-    liveTasks.values.foreach(update(_, now))
-    liveRDDs.values.foreach(update(_, now))
-    pools.values.foreach(update(_, now))
+    liveJobs.values.foreach(entityFlushFunc)
+    liveExecutors.values.foreach(entityFlushFunc)
+    liveTasks.values.foreach(entityFlushFunc)
+    liveRDDs.values.foreach(entityFlushFunc)
+    pools.values.foreach(entityFlushFunc)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/status/config.scala 
b/core/src/main/scala/org/apache/spark/status/config.scala
index 67801b8..87204fd 100644
--- a/core/src/main/scala/org/apache/spark/status/config.scala
+++ b/core/src/main/scala/org/apache/spark/status/config.scala
@@ -31,6 +31,12 @@ private[spark] object config {
     .timeConf(TimeUnit.NANOSECONDS)
     .createWithDefaultString("100ms")
 
+  val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD = 
ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod")
+    .doc("Minimum time elapsed before stale UI data is flushed. This avoids UI 
staleness when " +
+      "incoming task events are not fired frequently.")
+    .timeConf(TimeUnit.NANOSECONDS)
+    .createWithDefaultString("1s")
+
   val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
     .intConf
     .createWithDefault(1000)
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index e86cadf..8eef67e 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark._
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark.api.java.StorageLevels
 import org.apache.spark.deploy.history.HistoryServerSuite
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
+import org.apache.spark.internal.config.{EXECUTOR_HEARTBEAT_INTERVAL, 
MEMORY_OFFHEAP_SIZE}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.status.api.v1.{JacksonMessageWriter, 
RDDDataDistribution, StageStatus}
 import org.apache.spark.status.config._
@@ -99,14 +99,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser 
with Matchers with B
    * Create a test SparkContext with the SparkUI enabled.
    * It is safe to `get` the SparkUI directly from the SparkContext returned 
here.
    */
-  private def newSparkContext(killEnabled: Boolean = true): SparkContext = {
+  private def newSparkContext(
+      killEnabled: Boolean = true,
+      master: String = "local",
+      additionalConfs: Map[String, String] = Map.empty): SparkContext = {
     val conf = new SparkConf()
-      .setMaster("local")
+      .setMaster(master)
       .setAppName("test")
       .set("spark.ui.enabled", "true")
       .set("spark.ui.port", "0")
       .set("spark.ui.killEnabled", killEnabled.toString)
       .set(MEMORY_OFFHEAP_SIZE.key, "64m")
+    additionalConfs.foreach { case (k, v) => conf.set(k, v) }
     val sc = new SparkContext(conf)
     assert(sc.ui.isDefined)
     sc
@@ -724,6 +728,31 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
     }
   }
 
+  test("Staleness of Spark UI should not last minutes or hours") {
+    withSpark(newSparkContext(
+      master = "local[2]",
+      // Set a small heart beat interval to make the test fast
+      additionalConfs = Map(
+        EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms",
+        LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc =>
+      sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+      val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
+        // Make the task never finish so there won't be any task start/end 
events after the first 2
+        // tasks start.
+        Thread.sleep(300000)
+      }
+      try {
+        eventually(timeout(10.seconds)) {
+          val jobsJson = getJson(sc.ui.get, "jobs")
+          jobsJson.children.length should be (1)
+          (jobsJson.children.head \ "numActiveTasks").extract[Int] should be 
(2)
+        }
+      } finally {
+        f.cancel()
+      }
+    }
+  }
+
   def goToUi(sc: SparkContext, path: String): Unit = {
     goToUi(sc.ui.get, path)
   }
diff --git a/docs/configuration.md b/docs/configuration.md
index f0b6216..6808ec7 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -826,6 +826,14 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.ui.liveUpdate.minFlushPeriod</code></td>
+  <td>1s</td>
+  <td>
+    Minimum time elapsed before stale UI data is flushed. This avoids UI 
staleness when incoming
+    task events are not fired frequently.
+  </td>
+</tr>
+<tr>
   <td><code>spark.ui.port</code></td>
   <td>4040</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to