Repository: spark
Updated Branches:
  refs/heads/master 2e069ca65 -> c9da466ed


[SPARK-3015] Block on cleaning tasks to prevent Akka timeouts

More detail on the issue is described in 
[SPARK-3015](https://issues.apache.org/jira/browse/SPARK-3015), but the TLDR is 
if we send too many blocking Akka messages that are dependent on each other in 
quick successions, then we end up causing a few of these messages to time out 
and ultimately kill the executors. As of #1498, we broadcast each RDD whether 
or not it is persisted. This means if we create many RDDs (each of which 
becomes a broadcast) and the driver performs a GC that cleans up all of these 
broadcast blocks, then we end up sending many `RemoveBroadcast` messages in 
parallel and trigger the chain of blocking messages at high frequencies.

We do not know of the Akka-level root cause yet, so this is intended to be a 
temporary solution until we identify the real issue. I have done some 
preliminary testing of enabling blocking and observed that the queue length 
remains quite low (< 1000) even under very intensive workloads.

In the long run, we should do something more sophisticated to allow a limited 
degree of parallelism through batching clean up tasks or processing them in a 
sliding window. In the longer run, we should clean up the whole `BlockManager*` 
message passing interface to avoid unnecessarily awaiting on futures created 
from Akka asks.

tdas pwendell mengxr

Author: Andrew Or <[email protected]>

Closes #1931 from andrewor14/reference-blocking and squashes the following 
commits:

d0f7195 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
reference-blocking
ce9daf5 [Andrew Or] Remove logic for logging queue length
111192a [Andrew Or] Add missing space in log message (minor)
a183b83 [Andrew Or] Switch order of code blocks (minor)
9fd1fe6 [Andrew Or] Remove outdated log
104b366 [Andrew Or] Use the actual reference queue length
0b7e768 [Andrew Or] Block on cleaning tasks by default + log error on queue full


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9da466e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9da466e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9da466e

Branch: refs/heads/master
Commit: c9da466edb83e45a159ccc17c68856a511b9e8b7
Parents: 2e069ca
Author: Andrew Or <[email protected]>
Authored: Fri Aug 15 22:55:32 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Fri Aug 15 22:55:32 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/ContextCleaner.scala    | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c9da466e/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index bf3c3a6..3848734 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
 
   /**
    * Whether the cleaning thread will block on cleanup tasks.
-   * This is set to true only for tests.
+   *
+   * Due to SPARK-3015, this is set to true by default. This is intended to be 
only a temporary
+   * workaround for the issue, which is ultimately caused by the way the 
BlockManager actors
+   * issue inter-dependent blocking Akka messages to each other at high 
frequencies. This happens,
+   * for instance, when the driver performs a GC and cleans up all broadcast 
blocks that are no
+   * longer in scope.
    */
   private val blockOnCleanupTasks = sc.conf.getBoolean(
-    "spark.cleaner.referenceTracking.blocking", false)
+    "spark.cleaner.referenceTracking.blocking", true)
 
   @volatile private var stopped = false
 
@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
   private def blockManagerMaster = sc.env.blockManager.master
   private def broadcastManager = sc.env.broadcastManager
   private def mapOutputTrackerMaster = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
-
-  // Used for testing. These methods explicitly blocks until cleanup is 
completed
-  // to ensure that more reliable testing.
 }
 
 private object ContextCleaner {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to