This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new a8a2ba1 [SPARK-27394][WEBUI] Flush LiveEntity if necessary when
receiving SparkListenerExecutorMetricsUpdate (backport 2.4)
a8a2ba1 is described below
commit a8a2ba11ac10051423e58920062b50f328b06421
Author: Shixiong Zhu <[email protected]>
AuthorDate: Wed Apr 10 15:17:04 2019 -0700
[SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving
SparkListenerExecutorMetricsUpdate (backport 2.4)
## What changes were proposed in this pull request?
This PR backports #24303 to 2.4.
## How was this patch tested?
Jenkins
Closes #24328 from zsxwing/SPARK-27394-2.4.
Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
---
.../apache/spark/status/AppStatusListener.scala | 40 ++++++++++++++++------
.../scala/org/apache/spark/status/config.scala | 6 ++++
.../org/apache/spark/ui/UISeleniumSuite.scala | 35 +++++++++++++++++--
docs/configuration.md | 8 +++++
4 files changed, 75 insertions(+), 14 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index c4dd47d..cb7ab7f 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -58,6 +58,12 @@ private[spark] class AppStatusListener(
// operations that we can live without when rapidly processing incoming task
events.
private val liveUpdatePeriodNs = if (live)
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+ /**
+ * Minimum time elapsed before stale UI data is flushed. This avoids UI
staleness when incoming
+ * task events are not fired frequently.
+ */
+ private val liveUpdateMinFlushPeriod =
conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)
+
private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
@@ -73,6 +79,9 @@ private[spark] class AppStatusListener(
// around liveExecutors.
@volatile private var activeExecutorCount = 0
+ /** The last time when flushing `LiveEntity`s. This is to avoid flushing too
frequently. */
+ private var lastFlushTimeNs = System.nanoTime()
+
kvstore.addTrigger(classOf[ExecutorSummaryWrapper],
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }
@@ -86,7 +95,8 @@ private[spark] class AppStatusListener(
kvstore.onFlush {
if (!live) {
- flush()
+ val now = System.nanoTime()
+ flush(update(_, now))
}
}
@@ -744,6 +754,15 @@ private[spark] class AppStatusListener(
}
}
}
+
+ // Flush updates if necessary. Executor heartbeat is an event that happens
periodically. Flush
+ // here to ensure the staleness of Spark UI doesn't last more than
+ // `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
+ if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
+ flush(maybeUpdate(_, now))
+ // Re-get the current system time because `flush` may be slow and `now`
is stale.
+ lastFlushTimeNs = System.nanoTime()
+ }
}
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
@@ -755,18 +774,17 @@ private[spark] class AppStatusListener(
}
}
- /** Flush all live entities' data to the underlying store. */
- private def flush(): Unit = {
- val now = System.nanoTime()
+ /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush
them. */
+ private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
liveStages.values.asScala.foreach { stage =>
- update(stage, now)
- stage.executorSummaries.values.foreach(update(_, now))
+ entityFlushFunc(stage)
+ stage.executorSummaries.values.foreach(entityFlushFunc)
}
- liveJobs.values.foreach(update(_, now))
- liveExecutors.values.foreach(update(_, now))
- liveTasks.values.foreach(update(_, now))
- liveRDDs.values.foreach(update(_, now))
- pools.values.foreach(update(_, now))
+ liveJobs.values.foreach(entityFlushFunc)
+ liveExecutors.values.foreach(entityFlushFunc)
+ liveTasks.values.foreach(entityFlushFunc)
+ liveRDDs.values.foreach(entityFlushFunc)
+ pools.values.foreach(entityFlushFunc)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/status/config.scala
b/core/src/main/scala/org/apache/spark/status/config.scala
index 67801b8..87204fd 100644
--- a/core/src/main/scala/org/apache/spark/status/config.scala
+++ b/core/src/main/scala/org/apache/spark/status/config.scala
@@ -31,6 +31,12 @@ private[spark] object config {
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("100ms")
+ val LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD =
ConfigBuilder("spark.ui.liveUpdate.minFlushPeriod")
+ .doc("Minimum time elapsed before stale UI data is flushed. This avoids UI
staleness when " +
+ "incoming task events are not fired frequently.")
+ .timeConf(TimeUnit.NANOSECONDS)
+ .createWithDefaultString("1s")
+
val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
.intConf
.createWithDefault(1000)
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index e86cadf..8eef67e 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.deploy.history.HistoryServerSuite
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
+import org.apache.spark.internal.config.{EXECUTOR_HEARTBEAT_INTERVAL,
MEMORY_OFFHEAP_SIZE}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter,
RDDDataDistribution, StageStatus}
import org.apache.spark.status.config._
@@ -99,14 +99,18 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser
with Matchers with B
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned
here.
*/
- private def newSparkContext(killEnabled: Boolean = true): SparkContext = {
+ private def newSparkContext(
+ killEnabled: Boolean = true,
+ master: String = "local",
+ additionalConfs: Map[String, String] = Map.empty): SparkContext = {
val conf = new SparkConf()
- .setMaster("local")
+ .setMaster(master)
.setAppName("test")
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "0")
.set("spark.ui.killEnabled", killEnabled.toString)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
+ additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
@@ -724,6 +728,31 @@ class UISeleniumSuite extends SparkFunSuite with
WebBrowser with Matchers with B
}
}
+ test("Staleness of Spark UI should not last minutes or hours") {
+ withSpark(newSparkContext(
+ master = "local[2]",
+ // Set a small heart beat interval to make the test fast
+ additionalConfs = Map(
+ EXECUTOR_HEARTBEAT_INTERVAL.key -> "10ms",
+ LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD.key -> "10ms"))) { sc =>
+ sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+ val f = sc.parallelize(1 to 1000, 1000).foreachAsync { _ =>
+ // Make the task never finish so there won't be any task start/end
events after the first 2
+ // tasks start.
+ Thread.sleep(300000)
+ }
+ try {
+ eventually(timeout(10.seconds)) {
+ val jobsJson = getJson(sc.ui.get, "jobs")
+ jobsJson.children.length should be (1)
+ (jobsJson.children.head \ "numActiveTasks").extract[Int] should be
(2)
+ }
+ } finally {
+ f.cancel()
+ }
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
diff --git a/docs/configuration.md b/docs/configuration.md
index f0b6216..6808ec7 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -826,6 +826,14 @@ Apart from these, the following properties are also
available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.ui.liveUpdate.minFlushPeriod</code></td>
+ <td>1s</td>
+ <td>
+ Minimum time elapsed before stale UI data is flushed. This avoids UI
staleness when incoming
+ task events are not fired frequently.
+ </td>
+</tr>
+<tr>
<td><code>spark.ui.port</code></td>
<td>4040</td>
<td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]