This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 79362c4 [SPARK-34898][CORE] We should log
SparkListenerExecutorMetricsUpdateEvent of `driver` appropriately when
`spark.eventLog.logStageExecutorMetrics` is true
79362c4 is described below
commit 79362c4efcb6bd4b575438330a14a6191cca5e4b
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jun 17 12:08:10 2021 -0500
[SPARK-34898][CORE] We should log SparkListenerExecutorMetricsUpdateEvent
of `driver` appropriately when `spark.eventLog.logStageExecutorMetrics` is true
### What changes were proposed in this pull request?
In current EventLoggingListener, we won't write
SparkListenerExecutorMetricsUpdate message to event log file at all
```
override def onExecutorMetricsUpdate(event:
SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogStageExecutorMetrics) {
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
liveStageExecutorMetrics.foreach { case (stageKey2,
metricsPerExecutor) =>
// If the update came from the driver, stageKey1 will be the dummy
key (-1, -1),
// so record those peaks for all active stages.
// Otherwise, record the peaks for the matching stage.
if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
val metrics = metricsPerExecutor.getOrElseUpdate(
event.execId, new ExecutorMetrics())
metrics.compareAndUpdatePeakValues(newPeaks)
}
}
}
}
}
```
In history server's restful API about executor, we can get Executor's
metrics but can't get all driver's metrics. Executor's executor metrics can be
updated with TaskEnd event etc...
So in this pr, I add support to log SparkListenerExecutorMetricsUpdateEvent
of `driver` when `spark.eventLog.logStageExecutorMetrics` is true.
### Why are the changes needed?
Make user can got driver's peakMemoryMetrics in SHS.
### Does this PR introduce _any_ user-facing change?
user can got driver's executor metrics in SHS's restful API.
### How was this patch tested?
Mannul test
Closes #31992 from AngersZhuuuu/SPARK-34898.
Lead-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: AngersZhuuuu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../scala/org/apache/spark/scheduler/EventLoggingListener.scala | 5 ++++-
.../org/apache/spark/scheduler/EventLoggingListenerSuite.scala | 9 +++++++--
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index c57894b..cfbaa46 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._
-import org.apache.spark.{SPARK_VERSION, SparkConf}
+import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.EventLogFileWriter
import org.apache.spark.executor.ExecutorMetrics
@@ -250,6 +250,9 @@ private[spark] class EventLoggingListener(
override def onExecutorMetricsUpdate(event:
SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogStageExecutorMetrics) {
+ if (event.execId == SparkContext.DRIVER_IDENTIFIER) {
+ logEvent(event)
+ }
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
liveStageExecutorMetrics.foreach { case (stageKey2,
metricsPerExecutor) =>
// If the update came from the driver, stageKey1 will be the dummy
key (-1, -1),
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 240774d..09ad223 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -515,14 +515,15 @@ class EventLoggingListenerSuite extends SparkFunSuite
with LocalSparkContext wit
try {
val lines = readLines(logData)
val logStart = SparkListenerLogStart(SPARK_VERSION)
- assert(lines.size === 22)
+ assert(lines.size === 25)
assert(lines(0).contains("SparkListenerLogStart"))
assert(lines(1).contains("SparkListenerApplicationStart"))
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
var logIdx = 1
events.foreach { event =>
event match {
- case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+ case metricsUpdate: SparkListenerExecutorMetricsUpdate
+ if metricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER =>
case stageCompleted: SparkListenerStageCompleted =>
val execIds = Set[String]()
(1 to 3).foreach { _ =>
@@ -618,6 +619,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with
LocalSparkContext wit
assert(expected.stageInfo.stageId === actual.stageInfo.stageId)
case (expected: SparkListenerTaskEnd, actual: SparkListenerTaskEnd) =>
assert(expected.stageId === actual.stageId)
+ case (expected: SparkListenerExecutorMetricsUpdate,
+ actual: SparkListenerExecutorMetricsUpdate) =>
+ assert(expected.execId == actual.execId)
+ assert(expected.execId == SparkContext.DRIVER_IDENTIFIER)
case (expected: SparkListenerEvent, actual: SparkListenerEvent) =>
assert(expected === actual)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]