Repository: spark Updated Branches: refs/heads/branch-1.6 94524cef4 -> 7aded55e7
[SPARK-17649][CORE] Log how many Spark events got dropped in AsynchronousListenerBus (branch 1.6) ## What changes were proposed in this pull request? Backport #15220 to 1.6. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #15226 from zsxwing/SPARK-17649-branch-1.6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7aded55e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7aded55e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7aded55e Branch: refs/heads/branch-1.6 Commit: 7aded55e7329d331728ce2ec24d8ec915204d775 Parents: 94524ce Author: Shixiong Zhu <[email protected]> Authored: Mon Sep 26 11:03:05 2016 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Mon Sep 26 11:03:05 2016 -0700 ---------------------------------------------------------------------- .../spark/util/AsynchronousListenerBus.scala | 27 +++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7aded55e/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 6c1fca7..b5455ff 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -18,7 +18,8 @@ package org.apache.spark.util import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + import scala.util.DynamicVariable import org.apache.spark.SparkContext @@ -51,6 +52,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri // Indicate if `stop()` is called private val stopped = new AtomicBoolean(false) + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + @volatile private var lastReportTimestamp = 0L + // Indicate if we are processing some event // Guarded by `self` private var processingEvent = false @@ -117,6 +124,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri eventLock.release() } else { onDropEvent(event) + droppedEventsCounter.incrementAndGet() + } + + val droppedEvents = droppedEventsCounter.get + if (droppedEvents > 0) { + // Don't log too frequently + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and + // then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + + new java.util.Date(prevLastReportTimestamp)) + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
