[ 
https://issues.apache.org/jira/browse/SPARK-24398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

furongbin updated SPARK-24398:
------------------------------
    Description: 
RROR 18/05/24 14:08:25 AsyncEventQueue: Dropping event from queue appStatus. 
This likely means one of the listeners is too slow and cannot keep up with the 
rate at which tasks are being started by the scheduler. 
[dag-scheduler-event-loop]

WARN 18/05/24 16:39:54 AsyncEventQueue: Dropped 208205 events from appStatus 
since Thu May 24 16:30:57 CST 2018. [dag-scheduler-event-loop]

private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {

val metricIds = exec.metrics.map(_.accumulatorId).sorted

val metricTypes = exec.metrics.map \\{ m => (m.accumulatorId, m.metricType) 
}.toMap
 val metrics = exec.stages.toSeq
    .flatMap \{ stageId => Option(stageMetrics.get(stageId)) }
    .flatMap(_.taskMetrics.values().asScala)
    .flatMap \{ metrics => metrics.ids.zip(metrics.values) }

{color:#ff0000}val aggregatedMetrics = (metrics ++ 
exec.driverAccumUpdates.toSeq){color}
 {color:#ff0000} .filter \{ case (id, _) => metricIds.contains(id) }{color}
 .groupBy(_._1)
 .map{ case (id, values) =>

        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)

}

if (exec.metricsValues != null) {

      exec.metricsValues

} else {

    aggregatedMetrics

  }

}

*Case:*

when aggregatedMetrics.size=5000000, metrics.size=1000, then the 
execution({color:#ff0000}val aggregatedMetrics = (metrics ++ 
exec.driverAccumUpdates.toSeq){color}
 {color:#ff0000}.filter \{ case (id, _) => metricIds.contains(id) }{color})  is 
very slow

 

  was:
RROR 18/05/24 14:08:25 AsyncEventQueue: Dropping event from queue appStatus. 
This likely means one of the listeners is too slow and cannot keep up with the 
rate at which tasks are being started by the scheduler. 
[dag-scheduler-event-loop]

WARN 18/05/24 16:39:54 AsyncEventQueue: Dropped 208205 events from appStatus 
since Thu May 24 16:30:57 CST 2018. [dag-scheduler-event-loop]

private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
 val metricIds = exec.metrics.map(_.accumulatorId).sorted
 val metricTypes = exec.metrics.map \{ m => (m.accumulatorId, m.metricType) 
}.toMap
 val metrics = exec.stages.toSeq
 .flatMap \{ stageId => Option(stageMetrics.get(stageId)) }
 .flatMap(_.taskMetrics.values().asScala)
 .flatMap \{ metrics => metrics.ids.zip(metrics.values) }

 {color:#FF0000}val aggregatedMetrics = (metrics ++ 
exec.driverAccumUpdates.toSeq){color}
{color:#FF0000} .filter \{ case (id, _) => metricIds.contains(id) }{color}
 .groupBy(_._1)
 .map { case (id, values) =>
 id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
 }
 if (exec.metricsValues != null) {
 exec.metricsValues
 } else {
 aggregatedMetrics
 }
}

*Case:*

when aggregatedMetrics.size=5000000, metrics.size=1000, then the 
execution({color:#FF0000}val aggregatedMetrics = (metrics ++ 
exec.driverAccumUpdates.toSeq){color}
{color:#FF0000}.filter \{ case (id, _) => metricIds.contains(id) }{color})  is 
very slow

 


> SQLAppStatusListener.aggregateMetrics()  invocation is too slow
> ---------------------------------------------------------------
>
>                 Key: SPARK-24398
>                 URL: https://issues.apache.org/jira/browse/SPARK-24398
>             Project: Spark
>          Issue Type: Improvement
>          Components: Web UI
>    Affects Versions: 2.3.0
>            Reporter: furongbin
>            Priority: Major
>
> RROR 18/05/24 14:08:25 AsyncEventQueue: Dropping event from queue appStatus. 
> This likely means one of the listeners is too slow and cannot keep up with 
> the rate at which tasks are being started by the scheduler. 
> [dag-scheduler-event-loop]
> WARN 18/05/24 16:39:54 AsyncEventQueue: Dropped 208205 events from appStatus 
> since Thu May 24 16:30:57 CST 2018. [dag-scheduler-event-loop]
> private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
> val metricIds = exec.metrics.map(_.accumulatorId).sorted
> val metricTypes = exec.metrics.map \\{ m => (m.accumulatorId, m.metricType) 
> }.toMap
>  val metrics = exec.stages.toSeq
>     .flatMap \{ stageId => Option(stageMetrics.get(stageId)) }
>     .flatMap(_.taskMetrics.values().asScala)
>     .flatMap \{ metrics => metrics.ids.zip(metrics.values) }
> {color:#ff0000}val aggregatedMetrics = (metrics ++ 
> exec.driverAccumUpdates.toSeq){color}
>  {color:#ff0000} .filter \{ case (id, _) => metricIds.contains(id) }{color}
>  .groupBy(_._1)
>  .map{ case (id, values) =>
>         id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
> }
> if (exec.metricsValues != null) {
>       exec.metricsValues
> } else {
>     aggregatedMetrics
>   }
> }
> *Case:*
> when aggregatedMetrics.size=5000000, metrics.size=1000, then the 
> execution({color:#ff0000}val aggregatedMetrics = (metrics ++ 
> exec.driverAccumUpdates.toSeq){color}
>  {color:#ff0000}.filter \{ case (id, _) => metricIds.contains(id) }{color})  
> is very slow
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to