Repository: spark Updated Branches: refs/heads/master f234b7cd7 -> bde85f8b7
[SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus ## What changes were proposed in this pull request? Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #15220 from zsxwing/SPARK-17649. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bde85f8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bde85f8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bde85f8b Branch: refs/heads/master Commit: bde85f8b70138a51052b613664facbc981378c38 Parents: f234b7c Author: Shixiong Zhu <[email protected]> Authored: Mon Sep 26 10:44:35 2016 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Mon Sep 26 10:44:35 2016 -0700 ---------------------------------------------------------------------- .../spark/scheduler/LiveListenerBus.scala | 26 +++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bde85f8b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index bfa3c40..5533f7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.DynamicVariable @@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa // Indicate if `stop()` is called private val stopped = new AtomicBoolean(false) + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + @volatile private var lastReportTimestamp = 0L + // Indicate if we are processing some event // Guarded by `self` private var processingEvent = false @@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa eventLock.release() } else { onDropEvent(event) + droppedEventsCounter.incrementAndGet() + } + + val droppedEvents = droppedEventsCounter.get + if (droppedEvents > 0) { + // Don't log too frequently + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and + // then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + + new java.util.Date(prevLastReportTimestamp)) + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
