[ 
https://issues.apache.org/jira/browse/SPARK-24398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492237#comment-16492237
 ] 

Apache Spark commented on SPARK-24398:
--------------------------------------

User 'frb502' has created a pull request for this issue:
https://github.com/apache/spark/pull/21438

> SQLAppStatusListener.aggregateMetrics()  invocation is too slow
> ---------------------------------------------------------------
>
>                 Key: SPARK-24398
>                 URL: https://issues.apache.org/jira/browse/SPARK-24398
>             Project: Spark
>          Issue Type: Improvement
>          Components: Web UI
>    Affects Versions: 2.3.0
>            Reporter: furongbin
>            Priority: Major
>
> RROR 18/05/24 14:08:25 AsyncEventQueue: Dropping event from queue appStatus. 
> This likely means one of the listeners is too slow and cannot keep up with 
> the rate at which tasks are being started by the scheduler. 
> [dag-scheduler-event-loop]
> WARN 18/05/24 16:39:54 AsyncEventQueue: Dropped 208205 events from appStatus 
> since Thu May 24 16:30:57 CST 2018. [dag-scheduler-event-loop] 
> WARN 18/05/24 16:40:19 SQLAppStatusListener: agregateMetrics  cost=18.6s, 
> metricIds.size=775, metrics.size=5292940, aggregatedMetrics=615 
> [pool-22-thread-1]
> private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
> val metricIds = exec.metrics.map(_.accumulatorId).sorted
> val metricTypes = exec.metrics.map 
> { m => (m.accumulatorId, m.metricType) }
> .toMap
>  val metrics = exec.stages.toSeq
>      .flatMap \{ stageId => Option(stageMetrics.get(stageId)) }
>      .flatMap(_.taskMetrics.values().asScala)
>      .flatMap \{ metrics => metrics.ids.zip(metrics.values) }
> {color:#ff0000}val aggregatedMetrics = (metrics ++ 
> exec.driverAccumUpdates.toSeq){color}
>  {color:#ff0000} .filter \{ case (id, _) => metricIds.contains(id) }{color}
>  .groupBy(_._1)
>  .map
> { case (id, values) =>         id -> SQLMetrics.stringValue(metricTypes(id), 
> values.map(_._2).toSeq) }
> if (exec.metricsValues != null)
> {       exec.metricsValues }
> else
> {     aggregatedMetrics   }
> }
> *Case:*
> when aggregatedMetrics.size=5000000, metrics.size=1000, then the 
> execution({color:#ff0000}val aggregatedMetrics = (metrics ++ 
> exec.driverAccumUpdates.toSeq){color}
>  {color:#ff0000}.filter \{ case (id, _) => metricIds.contains(id) }{color})  
> is very slow
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to