Repository: spark
Updated Branches:
  refs/heads/master 2b574f52d -> b3ffac517


http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index bdb0f4d..651c7ea 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -24,6 +24,8 @@ import scala.util.Random
 import org.apache.spark._
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 
+// TODO: some of these spilling tests probably aren't actually spilling 
(SPARK-11078)
+
 class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
   private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf 
= {
     val conf = new SparkConf(loadDefaults)
@@ -38,6 +40,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
     conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
     // Ensure that we actually have multiple batches per spill file
     conf.set("spark.shuffle.spill.batchSize", "10")
+    conf.set("spark.testing.memory", "2000000")
     conf
   }
 
@@ -50,7 +53,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def emptyDataStream(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -91,7 +93,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def fewElementsPerPartition(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -140,7 +141,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def emptyPartitionsWithSpilling(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -174,7 +174,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def testSpillingInLocalCluster(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
@@ -252,7 +251,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
@@ -323,7 +321,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("cleanup of intermediate files in sorter") {
     val conf = createSparkConf(true, false)  // Load defaults, otherwise 
SPARK_HOME is not found
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
     val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -348,7 +345,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("cleanup of intermediate files in sorter if there are errors") {
     val conf = createSparkConf(true, false)  // Load defaults, otherwise 
SPARK_HOME is not found
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
     val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -372,7 +368,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("cleanup of intermediate files in shuffle") {
     val conf = createSparkConf(false, false)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
     val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -387,7 +382,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("cleanup of intermediate files in shuffle with errors") {
     val conf = createSparkConf(false, false)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
     val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -416,7 +410,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def noPartialAggregationOrSorting(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -438,7 +431,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def partialAggregationWithoutSpill(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -461,7 +453,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -485,7 +476,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -512,7 +502,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def sortingWithoutAggregationNoSpill(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -536,7 +525,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   def sortingWithoutAggregationWithSpill(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -553,7 +541,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with hash collisions") {
     val conf = createSparkConf(true, false)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
     def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -610,7 +597,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with many hash collisions") {
     val conf = createSparkConf(true, false)
-    conf.set("spark.shuffle.memoryFraction", "0.0001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
     val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -633,7 +619,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with hash collisions using the Int.MaxValue key") {
     val conf = createSparkConf(true, false)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
     def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
@@ -657,7 +642,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with null keys and values") {
     val conf = createSparkConf(true, false)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
     def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -693,7 +677,6 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   private def sortWithoutBreakingSortingContracts(conf: SparkConf) {
-    conf.set("spark.shuffle.memoryFraction", "0.01")
     conf.set("spark.shuffle.manager", "sort")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 154a3ae..771d93b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -446,17 +446,6 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.shuffle.memoryFraction</code></td>
-  <td>0.2</td>
-  <td>
-    Fraction of Java heap to use for aggregation and cogroups during shuffles.
-    At any given time, the collective size of
-    all in-memory maps used for shuffles is bounded by this limit, beyond 
which the contents will
-    begin to spill to disk. If spills are often, consider increasing this 
value at the expense of
-    <code>spark.storage.memoryFraction</code>.
-  </td>
-</tr>
-<tr>
   <td><code>spark.shuffle.service.enabled</code></td>
   <td>false</td>
   <td>
@@ -712,6 +701,76 @@ Apart from these, the following properties are also 
available, and may be useful
 </tr>
 </table>
 
+#### Memory Management
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+  <td><code>spark.memory.fraction</code></td>
+  <td>0.75</td>
+  <td>
+    Fraction of the heap space used for execution and storage. The lower this 
is, the more
+    frequently spills and cached data eviction occur. The purpose of this 
config is to set
+    aside memory for internal metadata, user data structures, and imprecise 
size estimation
+    in the case of sparse, unusually large records.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.memory.storageFraction</code></td>
+  <td>0.5</td>
+  <td>
+    T​he size of the storage region within the space set aside by
+    <code>s​park.memory.fraction</code>. This region is not statically 
reserved, but dynamically
+    allocated as cache requests come in. ​Cached data may be evicted only if 
total storage exceeds
+    this region.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.memory.useLegacyMode</code></td>
+  <td>false</td>
+  <td>
+    ​Whether to enable the legacy memory management mode used in Spark 1.5 
and before.
+    The legacy mode rigidly partitions the heap space into fixed-size regions,
+    potentially leading to excessive spilling if the application was not tuned.
+    The following deprecated memory fraction configurations are not read 
unless this is enabled:
+    <code>spark.shuffle.memoryFraction</code><br>
+    <code>spark.storage.memoryFraction</code><br>
+    <code>spark.storage.unrollFraction</code>
+  </td>
+</tr>
+<tr>
+  <td><code>spark.shuffle.memoryFraction</code></td>
+  <td>0.2</td>
+  <td>
+    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> 
is enabled.
+    Fraction of Java heap to use for aggregation and cogroups during shuffles.
+    At any given time, the collective size of
+    all in-memory maps used for shuffles is bounded by this limit, beyond 
which the contents will
+    begin to spill to disk. If spills are often, consider increasing this 
value at the expense of
+    <code>spark.storage.memoryFraction</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.storage.memoryFraction</code></td>
+  <td>0.6</td>
+  <td>
+    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> 
is enabled.
+    Fraction of Java heap to use for Spark's memory cache. This should not be 
larger than the "old"
+    generation of objects in the JVM, which by default is given 0.6 of the 
heap, but you can
+    increase it if you configure your own old generation size.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.storage.unrollFraction</code></td>
+  <td>0.2</td>
+  <td>
+    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> 
is enabled.
+    Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling 
blocks in memory.
+    This is dynamically allocated by dropping existing blocks when there is 
not enough free
+    storage space to unroll the new block in its entirety.
+  </td>
+</tr>
+</table>
+
 #### Execution Behavior
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@@ -825,15 +884,6 @@ Apart from these, the following properties are also 
available, and may be useful
     data may need to be rewritten to pre-existing output directories during 
checkpoint recovery.</td>
 </tr>
 <tr>
-  <td><code>spark.storage.memoryFraction</code></td>
-  <td>0.6</td>
-  <td>
-    Fraction of Java heap to use for Spark's memory cache. This should not be 
larger than the "old"
-    generation of objects in the JVM, which by default is given 0.6 of the 
heap, but you can
-    increase it if you configure your own old generation size.
-  </td>
-</tr>
-<tr>
   <td><code>spark.storage.memoryMapThreshold</code></td>
   <td>2m</td>
   <td>
@@ -843,15 +893,6 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.storage.unrollFraction</code></td>
-  <td>0.2</td>
-  <td>
-    Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling 
blocks in memory.
-    This is dynamically allocated by dropping existing blocks when there is 
not enough free
-    storage space to unroll the new block in its entirety.
-  </td>
-</tr>
-<tr>
   <td><code>spark.externalBlockStore.blockManager</code></td>
   <td>org.apache.spark.storage.TachyonBlockManager</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index ff65d7b..835f52f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -57,7 +57,9 @@ class TestShuffleMemoryManager
 }
 
 private class GrantEverythingMemoryManager extends MemoryManager {
-  override def acquireExecutionMemory(numBytes: Long): Long = numBytes
+  override def acquireExecutionMemory(
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes
   override def acquireStorageMemory(
       blockId: BlockId,
       numBytes: Long,
@@ -66,12 +68,6 @@ private class GrantEverythingMemoryManager extends 
MemoryManager {
       blockId: BlockId,
       numBytes: Long,
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
-  override def releaseExecutionMemory(numBytes: Long): Unit = { }
-  override def releaseStorageMemory(numBytes: Long): Unit = { }
-  override def releaseStorageMemory(): Unit = { }
-  override def releaseUnrollMemory(numBytes: Long): Unit = { }
   override def maxExecutionMemory: Long = Long.MaxValue
   override def maxStorageMemory: Long = Long.MaxValue
-  override def executionMemoryUsed: Long = Long.MaxValue
-  override def storageMemoryUsed: Long = Long.MaxValue
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index f7d48bc..75d1fce 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -103,7 +103,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkContext {
       val conf = new SparkConf()
         .set("spark.shuffle.spill.initialMemoryThreshold", "1024")
         .set("spark.shuffle.sort.bypassMergeThreshold", "0")
-        .set("spark.shuffle.memoryFraction", "0.0001")
+        .set("spark.testing.memory", "80000")
 
       sc = new SparkContext("local", "test", conf)
       outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to