furongbin created SPARK-24398:
---------------------------------
Summary: SQLAppStatusListener.aggregateMetrics() invocation is
too slow
Key: SPARK-24398
URL: https://issues.apache.org/jira/browse/SPARK-24398
Project: Spark
Issue Type: Improvement
Components: Web UI
Affects Versions: 2.3.0
Reporter: furongbin
RROR 18/05/24 14:08:25 AsyncEventQueue: Dropping event from queue appStatus.
This likely means one of the listeners is too slow and cannot keep up with the
rate at which tasks are being started by the scheduler.
[dag-scheduler-event-loop]
WARN 18/05/24 16:39:54 AsyncEventQueue: Dropped 208205 events from appStatus
since Thu May 24 16:30:57 CST 2018. [dag-scheduler-event-loop]
private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
val metricIds = exec.metrics.map(_.accumulatorId).sorted
val metricTypes = exec.metrics.map \{ m => (m.accumulatorId, m.metricType)
}.toMap
val metrics = exec.stages.toSeq
.flatMap \{ stageId => Option(stageMetrics.get(stageId)) }
.flatMap(_.taskMetrics.values().asScala)
.flatMap \{ metrics => metrics.ids.zip(metrics.values) }
{color:#FF0000}val aggregatedMetrics = (metrics ++
exec.driverAccumUpdates.toSeq){color}
{color:#FF0000} .filter \{ case (id, _) => metricIds.contains(id) }{color}
.groupBy(_._1)
.map { case (id, values) =>
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
}
if (exec.metricsValues != null) {
exec.metricsValues
} else {
aggregatedMetrics
}
}
*Case:*
when aggregatedMetrics.size=5000000, metrics.size=1000, then the
execution({color:#FF0000}val aggregatedMetrics = (metrics ++
exec.driverAccumUpdates.toSeq){color}
{color:#FF0000}.filter \{ case (id, _) => metricIds.contains(id) }{color}) is
very slow
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]