[
https://issues.apache.org/jira/browse/SPARK-34898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
angerszhu updated SPARK-34898:
------------------------------
Description:
In current EventLoggingListener, we won't write
SparkListenerExecutorMetricsUpdate message at all
{code:java}
override def onExecutorMetricsUpdate(event:
SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogStageExecutorMetrics) {
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) =>
// If the update came from the driver, stageKey1 will be the dummy key
(-1, -1),
// so record those peaks for all active stages.
// Otherwise, record the peaks for the matching stage.
if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
val metrics = metricsPerExecutor.getOrElseUpdate(
event.execId, new ExecutorMetrics())
metrics.compareAndUpdatePeakValues(newPeaks)
}
}
}
}
}
{code}
> Send ExecutorMetricsUpdate EventLog appropriately
> -------------------------------------------------
>
> Key: SPARK-34898
> URL: https://issues.apache.org/jira/browse/SPARK-34898
> Project: Spark
> Issue Type: Sub-task
> Components: Spark Core
> Affects Versions: 3.2.0
> Reporter: angerszhu
> Priority: Major
>
> In current EventLoggingListener, we won't write
> SparkListenerExecutorMetricsUpdate message at all
> {code:java}
> override def onExecutorMetricsUpdate(event:
> SparkListenerExecutorMetricsUpdate): Unit = {
> if (shouldLogStageExecutorMetrics) {
> event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
> liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor)
> =>
> // If the update came from the driver, stageKey1 will be the dummy
> key (-1, -1),
> // so record those peaks for all active stages.
> // Otherwise, record the peaks for the matching stage.
> if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
> val metrics = metricsPerExecutor.getOrElseUpdate(
> event.execId, new ExecutorMetrics())
> metrics.compareAndUpdatePeakValues(newPeaks)
> }
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]