This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ff39cd [SPARK-27394][WEBUI] Flush LiveEntity if necessary when
receiving SparkListenerExecutorMetricsUpdate
5ff39cd is described below
commit 5ff39cd5ee92da0d08380c7a680d350ff6f4b5db
Author: Shixiong Zhu <[email protected]>
AuthorDate: Tue Apr 9 08:26:00 2019 -0700
[SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving
SparkListenerExecutorMetricsUpdate
## What changes were proposed in this pull request?
This PR updates `AppStatusListener` to flush `LiveEntity` if necessary when
receiving `SparkListenerExecutorMetricsUpdate`. This will ensure the staleness
of Spark UI doesn't last more than the executor heartbeat interval.
## How was this patch tested?
The new unit test.
Closes #24303 from zsxwing/SPARK-27394.
Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../org/apache/spark/internal/config/Status.scala | 6 ++++
.../apache/spark/status/AppStatusListener.scala | 39 ++++++++++++++++------
.../org/apache/spark/ui/UISeleniumSuite.scala | 33 ++++++++++++++++--
docs/configuration.md | 8 +++++
4 files changed, 73 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala
b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
index c561572..3e6a4e9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
@@ -29,6 +29,12 @@ private[spark] object Status {
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("100ms")
+ val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD =
ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod")
+ .doc("Minimum time elapsed before stale UI data is flushed. This avoids UI
staleness when " +
+ "incoming task events are not fired frequently.")
+ .timeConf(TimeUnit.NANOSECONDS)
+ .createWithDefaultString("1s")
+
val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
.intConf
.createWithDefault(1000)
diff --git
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index a3e8242..b085f21 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -58,6 +58,12 @@ private[spark] class AppStatusListener(
// operations that we can live without when rapidly processing incoming task
events.
private val liveUpdatePeriodNs = if (live)
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+ /**
+ * Minimum time elapsed before stale UI data is flushed. This avoids UI
staleness when incoming
+ * task events are not fired frequently.
+ */
+ private val liveUpdateMinFlushPeriod =
conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)
+
private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
@@ -76,6 +82,9 @@ private[spark] class AppStatusListener(
// around liveExecutors.
@volatile private var activeExecutorCount = 0
+ /** The last time when flushing `LiveEntity`s. This is to avoid flushing too
frequently. */
+ private var lastFlushTimeNs = System.nanoTime()
+
kvstore.addTrigger(classOf[ExecutorSummaryWrapper],
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }
@@ -89,7 +98,8 @@ private[spark] class AppStatusListener(
kvstore.onFlush {
if (!live) {
- flush()
+ val now = System.nanoTime()
+ flush(update(_, now))
}
}
@@ -831,6 +841,14 @@ private[spark] class AppStatusListener(
}
}
}
+ // Flush updates if necessary. Executor heartbeat is an event that happens
periodically. Flush
+ // here to ensure the staleness of Spark UI doesn't last more than
+ // `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
+ if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
+ flush(maybeUpdate(_, now))
+ // Re-get the current system time because `flush` may be slow and `now`
is stale.
+ lastFlushTimeNs = System.nanoTime()
+ }
}
override def onStageExecutorMetrics(executorMetrics:
SparkListenerStageExecutorMetrics): Unit = {
@@ -856,18 +874,17 @@ private[spark] class AppStatusListener(
}
}
- /** Flush all live entities' data to the underlying store. */
- private def flush(): Unit = {
- val now = System.nanoTime()
+ /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush
them. */
+ private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
liveStages.values.asScala.foreach { stage =>
- update(stage, now)
- stage.executorSummaries.values.foreach(update(_, now))
+ entityFlushFunc(stage)
+ stage.executorSummaries.values.foreach(entityFlushFunc)
}
- liveJobs.values.foreach(update(_, now))
- liveExecutors.values.foreach(update(_, now))
- liveTasks.values.foreach(update(_, now))
- liveRDDs.values.foreach(update(_, now))
- pools.values.foreach(update(_, now))
+ liveJobs.values.foreach(entityFlushFunc)
+ liveExecutors.values.foreach(entityFlushFunc)
+ liveTasks.values.foreach(entityFlushFunc)
+ liveRDDs.values.foreach(entityFlushFunc)
+ pools.values.foreach(entityFlushFunc)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index cdc7185..a9f03eb 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -100,14 +100,18 @@ class UISeleniumSuite extends SparkFunSuite with
WebBrowser with Matchers with B
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned
here.
*/
- private def newSparkContext(killEnabled: Boolean = true): SparkContext = {
+ private def newSparkContext(
+ killEnabled: Boolean = true,
+ master: String = "local",
+ additionalConfs: Map[String, String] = Map.empty): SparkContext = {
val conf = new SparkConf()
- .setMaster("local")
+ .setMaster(master)
.setAppName("test")
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
.set(UI_KILL_ENABLED, killEnabled)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
+ additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
@@ -725,6 +729,31 @@ class UISeleniumSuite extends SparkFunSuite with
WebBrowser with Matchers with B
}
}
+ test("Staleness of Spark UI should not last minutes or hours") {
+ withSpark(newSparkContext(
+ master = "local[2]",
+ // Set a small heart beat interval to make the test fast
+ additionalConfs = Map(
+ EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms",
+ LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc =>
+ sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+ val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
+ // Make the task never finish so there won't be any task start/end
events after the first 2
+ // tasks start.
+ Thread.sleep(300000)
+ }
+ try {
+ eventually(timeout(10.seconds)) {
+ val jobsJson = getJson(sc.ui.get, "jobs")
+ jobsJson.children.length should be (1)
+ (jobsJson.children.head \ "numActiveTasks").extract[Int] should be
(2)
+ }
+ } finally {
+ f.cancel()
+ }
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 3187b77..5325f8a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -875,6 +875,14 @@ Apart from these, the following properties are also
available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.ui.liveUpdate.minFlushPeriod</code></td>
+ <td>1s</td>
+ <td>
+ Minimum time elapsed before stale UI data is flushed. This avoids UI
staleness when incoming
+ task events are not fired frequently.
+ </td>
+</tr>
+<tr>
<td><code>spark.ui.port</code></td>
<td>4040</td>
<td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]