This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new a8a2ba1 [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate (backport 2.4) a8a2ba1 is described below commit a8a2ba11ac10051423e58920062b50f328b06421 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Wed Apr 10 15:17:04 2019 -0700 [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate (backport 2.4) ## What changes were proposed in this pull request? This PR backports #24303 to 2.4. ## How was this patch tested? Jenkins Closes #24328 from zsxwing/SPARK-27394-2.4. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../apache/spark/status/AppStatusListener.scala | 40 ++++++++++++++++------ .../scala/org/apache/spark/status/config.scala | 6 ++++ .../org/apache/spark/ui/UISeleniumSuite.scala | 35 +++++++++++++++++-- docs/configuration.md | 8 +++++ 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index c4dd47d..cb7ab7f 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -58,6 +58,12 @@ private[spark] class AppStatusListener( // operations that we can live without when rapidly processing incoming task events. private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + /** + * Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming + * task events are not fired frequently. + */ + private val liveUpdateMinFlushPeriod = conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD) + private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) @@ -73,6 +79,9 @@ private[spark] class AppStatusListener( // around liveExecutors. @volatile private var activeExecutorCount = 0 + /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ + private var lastFlushTimeNs = System.nanoTime() + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) { count => cleanupExecutors(count) } @@ -86,7 +95,8 @@ private[spark] class AppStatusListener( kvstore.onFlush { if (!live) { - flush() + val now = System.nanoTime() + flush(update(_, now)) } } @@ -744,6 +754,15 @@ private[spark] class AppStatusListener( } } } + + // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush + // here to ensure the staleness of Spark UI doesn't last more than + // `max(heartbeat interval, liveUpdateMinFlushPeriod)`. + if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) { + flush(maybeUpdate(_, now)) + // Re-get the current system time because `flush` may be slow and `now` is stale. + lastFlushTimeNs = System.nanoTime() + } } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { @@ -755,18 +774,17 @@ private[spark] class AppStatusListener( } } - /** Flush all live entities' data to the underlying store. */ - private def flush(): Unit = { - val now = System.nanoTime() + /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ + private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { liveStages.values.asScala.foreach { stage => - update(stage, now) - stage.executorSummaries.values.foreach(update(_, now)) + entityFlushFunc(stage) + stage.executorSummaries.values.foreach(entityFlushFunc) } - liveJobs.values.foreach(update(_, now)) - liveExecutors.values.foreach(update(_, now)) - liveTasks.values.foreach(update(_, now)) - liveRDDs.values.foreach(update(_, now)) - pools.values.foreach(update(_, now)) + liveJobs.values.foreach(entityFlushFunc) + liveExecutors.values.foreach(entityFlushFunc) + liveTasks.values.foreach(entityFlushFunc) + liveRDDs.values.foreach(entityFlushFunc) + pools.values.foreach(entityFlushFunc) } /** diff --git a/core/src/main/scala/org/apache/spark/status/config.scala b/core/src/main/scala/org/apache/spark/status/config.scala index 67801b8..87204fd 100644 --- a/core/src/main/scala/org/apache/spark/status/config.scala +++ b/core/src/main/scala/org/apache/spark/status/config.scala @@ -31,6 +31,12 @@ private[spark] object config { .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("100ms") + val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD = ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod") + .doc("Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when " + + "incoming task events are not fired frequently.") + .timeConf(TimeUnit.NANOSECONDS) + .createWithDefaultString("1s") + val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs") .intConf .createWithDefault(1000) diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index e86cadf..8eef67e 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark._ import org.apache.spark.LocalSparkContext._ import org.apache.spark.api.java.StorageLevels import org.apache.spark.deploy.history.HistoryServerSuite -import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE +import org.apache.spark.internal.config.{EXECUTOR_HEARTBEAT_INTERVAL, MEMORY_OFFHEAP_SIZE} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus} import org.apache.spark.status.config._ @@ -99,14 +99,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B * Create a test SparkContext with the SparkUI enabled. * It is safe to `get` the SparkUI directly from the SparkContext returned here. */ - private def newSparkContext(killEnabled: Boolean = true): SparkContext = { + private def newSparkContext( + killEnabled: Boolean = true, + master: String = "local", + additionalConfs: Map[String, String] = Map.empty): SparkContext = { val conf = new SparkConf() - .setMaster("local") + .setMaster(master) .setAppName("test") .set("spark.ui.enabled", "true") .set("spark.ui.port", "0") .set("spark.ui.killEnabled", killEnabled.toString) .set(MEMORY_OFFHEAP_SIZE.key, "64m") + additionalConfs.foreach { case (k, v) => conf.set(k, v) } val sc = new SparkContext(conf) assert(sc.ui.isDefined) sc @@ -724,6 +728,31 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + test("Staleness of Spark UI should not last minutes or hours") { + withSpark(newSparkContext( + master = "local[2]", + // Set a small heart beat interval to make the test fast + additionalConfs = Map( + EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms", + LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc => + sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") + val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ => + // Make the task never finish so there won't be any task start/end events after the first 2 + // tasks start. + Thread.sleep(300000) + } + try { + eventually(timeout(10.seconds)) { + val jobsJson = getJson(sc.ui.get, "jobs") + jobsJson.children.length should be (1) + (jobsJson.children.head \ "numActiveTasks").extract[Int] should be (2) + } + } finally { + f.cancel() + } + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) } diff --git a/docs/configuration.md b/docs/configuration.md index f0b6216..6808ec7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -826,6 +826,14 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td><code>spark.ui.liveUpdate.minFlushPeriod</code></td> + <td>1s</td> + <td> + Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming + task events are not fired frequently. + </td> +</tr> +<tr> <td><code>spark.ui.port</code></td> <td>4040</td> <td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org