[
https://issues.apache.org/jira/browse/SPARK-24398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492237#comment-16492237
]
Apache Spark commented on SPARK-24398:
--------------------------------------
User 'frb502' has created a pull request for this issue:
https://github.com/apache/spark/pull/21438
> SQLAppStatusListener.aggregateMetrics() invocation is too slow
> ---------------------------------------------------------------
>
> Key: SPARK-24398
> URL: https://issues.apache.org/jira/browse/SPARK-24398
> Project: Spark
> Issue Type: Improvement
> Components: Web UI
> Affects Versions: 2.3.0
> Reporter: furongbin
> Priority: Major
>
> RROR 18/05/24 14:08:25 AsyncEventQueue: Dropping event from queue appStatus.
> This likely means one of the listeners is too slow and cannot keep up with
> the rate at which tasks are being started by the scheduler.
> [dag-scheduler-event-loop]
> WARN 18/05/24 16:39:54 AsyncEventQueue: Dropped 208205 events from appStatus
> since Thu May 24 16:30:57 CST 2018. [dag-scheduler-event-loop]
> WARN 18/05/24 16:40:19 SQLAppStatusListener: agregateMetrics cost=18.6s,
> metricIds.size=775, metrics.size=5292940, aggregatedMetrics=615
> [pool-22-thread-1]
> private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
> val metricIds = exec.metrics.map(_.accumulatorId).sorted
> val metricTypes = exec.metrics.map
> { m => (m.accumulatorId, m.metricType) }
> .toMap
> val metrics = exec.stages.toSeq
> .flatMap \{ stageId => Option(stageMetrics.get(stageId)) }
> .flatMap(_.taskMetrics.values().asScala)
> .flatMap \{ metrics => metrics.ids.zip(metrics.values) }
> {color:#ff0000}val aggregatedMetrics = (metrics ++
> exec.driverAccumUpdates.toSeq){color}
> {color:#ff0000} .filter \{ case (id, _) => metricIds.contains(id) }{color}
> .groupBy(_._1)
> .map
> { case (id, values) => id -> SQLMetrics.stringValue(metricTypes(id),
> values.map(_._2).toSeq) }
> if (exec.metricsValues != null)
> { exec.metricsValues }
> else
> { aggregatedMetrics }
> }
> *Case:*
> when aggregatedMetrics.size=5000000, metrics.size=1000, then the
> execution({color:#ff0000}val aggregatedMetrics = (metrics ++
> exec.driverAccumUpdates.toSeq){color}
> {color:#ff0000}.filter \{ case (id, _) => metricIds.contains(id) }{color})
> is very slow
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]