[ 
https://issues.apache.org/jira/browse/SPARK-34898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

angerszhu updated SPARK-34898:
------------------------------
    Description: 
In current EventLoggingListener, we won't write 
SparkListenerExecutorMetricsUpdate message at all
{code:java}
override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
  if (shouldLogStageExecutorMetrics) {
    event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
      liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) =>
        // If the update came from the driver, stageKey1 will be the dummy key 
(-1, -1),
        // so record those peaks for all active stages.
        // Otherwise, record the peaks for the matching stage.
        if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
          val metrics = metricsPerExecutor.getOrElseUpdate(
            event.execId, new ExecutorMetrics())
          metrics.compareAndUpdatePeakValues(newPeaks)
        }
      }
    }
  }
}
{code}

> Send ExecutorMetricsUpdate EventLog appropriately
> -------------------------------------------------
>
>                 Key: SPARK-34898
>                 URL: https://issues.apache.org/jira/browse/SPARK-34898
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>    Affects Versions: 3.2.0
>            Reporter: angerszhu
>            Priority: Major
>
> In current EventLoggingListener, we won't write 
> SparkListenerExecutorMetricsUpdate message at all
> {code:java}
> override def onExecutorMetricsUpdate(event: 
> SparkListenerExecutorMetricsUpdate): Unit = {
>   if (shouldLogStageExecutorMetrics) {
>     event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
>       liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) 
> =>
>         // If the update came from the driver, stageKey1 will be the dummy 
> key (-1, -1),
>         // so record those peaks for all active stages.
>         // Otherwise, record the peaks for the matching stage.
>         if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
>           val metrics = metricsPerExecutor.getOrElseUpdate(
>             event.execId, new ExecutorMetrics())
>           metrics.compareAndUpdatePeakValues(newPeaks)
>         }
>       }
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to