Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f21e550e3 -> 4a5c3d21b


[SPARK-4480] Avoid many small spills in external data structures

**Summary.** Currently, we may spill many small files in 
`ExternalAppendOnlyMap` and `ExternalSorter`. The underlying root cause of this 
is summarized in 
[SPARK-4452](https://issues.apache.org/jira/browse/SPARK-4452). This PR does 
not address this root cause, but simply provides the guarantee that we never 
spill the in-memory data structure if its size is less than a configurable 
threshold of 5MB. This config is not documented because we don't want users to 
set it themselves, and it is not hard-coded because we need to change it in 
tests.

**Symptom.** Each spill is orders of magnitude smaller than 1MB, and there are 
many spills. In environments where the ulimit is set, this frequently causes 
"too many open file" exceptions observed in 
[SPARK-3633](https://issues.apache.org/jira/browse/SPARK-3633).
```
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 4792 B to disk (292769 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 4760 B to disk (292770 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 4520 B to disk (292771 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 4560 B to disk (292772 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 4792 B to disk (292773 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 4784 B to disk (292774 spills so far)
```

**Reproduction.** I ran the following on a small 4-node cluster with 512MB 
executors. Note that the back-to-back shuffle here is necessary for reasons 
described in [SPARK-4522](https://issues.apache.org/jira/browse/SPARK-4452). 
The second shuffle is a `reduceByKey` because it performs a map-side combine.
```
sc.parallelize(1 to 100000000, 100)
  .map { i => (i, i) }
  .groupByKey()
  .reduceByKey(_ ++ _)
  .count()
```
Before the change, I notice that each thread may spill up to 1000 times, and 
the size of each spill is on the order of 10KB. After the change, each thread 
spills only up to 20 times in the worst case, and the size of each spill is on 
the order of 1MB.

Author: Andrew Or <and...@databricks.com>

Closes #3353 from andrewor14/avoid-small-spills and squashes the following 
commits:

49f380f [Andrew Or] Merge branch 'master' of 
https://git-wip-us.apache.org/repos/asf/spark into avoid-small-spills
27d6966 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
avoid-small-spills
f4736e3 [Andrew Or] Fix tests
a919776 [Andrew Or] Avoid many small spills

(cherry picked from commit 0eb4a7fb0fa1fa56677488cbd74eb39e65317621)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a5c3d21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a5c3d21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a5c3d21

Branch: refs/heads/branch-1.2
Commit: 4a5c3d21b4df8fa506fe0365a0718c94bbc1cd1b
Parents: f21e550
Author: Andrew Or <and...@databricks.com>
Authored: Wed Nov 19 18:07:27 2014 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Nov 19 18:07:39 2014 -0800

----------------------------------------------------------------------
 .../spark/util/collection/Spillable.scala       | 28 +++++++++++---------
 .../util/collection/ExternalSorterSuite.scala   |  2 ++
 2 files changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4a5c3d21/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index cb73b37..9f54312 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -24,10 +24,7 @@ import org.apache.spark.SparkEnv
  * Spills contents of an in-memory collection to disk when the memory threshold
  * has been exceeded.
  */
-private[spark] trait Spillable[C] {
-
-  this: Logging =>
-
+private[spark] trait Spillable[C] extends Logging {
   /**
    * Spills the current in-memory collection to disk, and releases the memory.
    *
@@ -45,15 +42,21 @@ private[spark] trait Spillable[C] {
   // Memory manager that can be used to acquire/release memory
   private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
 
-  // What threshold of elementsRead we start estimating collection size at
+  // Threshold for `elementsRead` before we start tracking this collection's 
memory usage
   private[this] val trackMemoryThreshold = 1000
 
+  // Initial threshold for the size of a collection before we start tracking 
its memory usage
+  // Exposed for testing
+  private[this] val initialMemoryThreshold: Long =
+    SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 
* 1024 * 1024)
+
+  // Threshold for this collection's size in bytes before we start tracking 
its memory usage
+  // To avoid a large number of small spills, initialize this to a value 
orders of magnitude > 0
+  private[this] var myMemoryThreshold = initialMemoryThreshold
+
   // Number of elements read from input since last spill
   private[this] var _elementsRead = 0L
 
-  // How much of the shared memory pool this collection has claimed
-  private[this] var myMemoryThreshold = 0L
-
   // Number of bytes spilled in total
   private[this] var _memoryBytesSpilled = 0L
 
@@ -102,8 +105,9 @@ private[spark] trait Spillable[C] {
    * Release our memory back to the shuffle pool so that other threads can 
grab it.
    */
   private def releaseMemoryForThisThread(): Unit = {
-    shuffleMemoryManager.release(myMemoryThreshold)
-    myMemoryThreshold = 0L
+    // The amount we requested does not include the initial memory tracking 
threshold
+    shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+    myMemoryThreshold = initialMemoryThreshold
   }
 
   /**
@@ -114,7 +118,7 @@ private[spark] trait Spillable[C] {
   @inline private def logSpillage(size: Long) {
     val threadId = Thread.currentThread().getId
     logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
-        .format(threadId, org.apache.spark.util.Utils.bytesToString(size),
-            _spillCount, if (_spillCount > 1) "s" else ""))
+      .format(threadId, org.apache.spark.util.Utils.bytesToString(size),
+        _spillCount, if (_spillCount > 1) "s" else ""))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a5c3d21/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index f26e40f..3cb42d4 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext with PrivateMe
   test("empty partitions with spilling") {
     val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
+    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext with PrivateMe
   test("empty partitions with spilling, bypass merge-sort") {
     val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
+    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to