Repository: spark Updated Branches: refs/heads/master 2b574f52d -> b3ffac517
http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index bdb0f4d..651c7ea 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -24,6 +24,8 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078) + class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = { val conf = new SparkConf(loadDefaults) @@ -38,6 +40,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { conf.set("spark.shuffle.sort.bypassMergeThreshold", "0") // Ensure that we actually have multiple batches per spill file conf.set("spark.shuffle.spill.batchSize", "10") + conf.set("spark.testing.memory", "2000000") conf } @@ -50,7 +53,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def emptyDataStream(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -91,7 +93,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def fewElementsPerPartition(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -140,7 +141,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def emptyPartitionsWithSpilling(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.spill.initialMemoryThreshold", "512") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -174,7 +174,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def testSpillingInLocalCluster(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) @@ -252,7 +251,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -323,7 +321,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in sorter") { val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -348,7 +345,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in sorter if there are errors") { val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -372,7 +368,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in shuffle") { val conf = createSparkConf(false, false) - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -387,7 +382,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in shuffle with errors") { val conf = createSparkConf(false, false) - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -416,7 +410,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def noPartialAggregationOrSorting(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -438,7 +431,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def partialAggregationWithoutSpill(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -461,7 +453,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def partialAggregationWIthSpillNoOrdering(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -485,7 +476,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def partialAggregationWithSpillWithOrdering(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -512,7 +502,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def sortingWithoutAggregationNoSpill(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -536,7 +525,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def sortingWithoutAggregationWithSpill(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -553,7 +541,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i) @@ -610,7 +597,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with many hash collisions") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.0001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) @@ -633,7 +619,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions using the Int.MaxValue key") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i) @@ -657,7 +642,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with null keys and values") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i) @@ -693,7 +677,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } private def sortWithoutBreakingSortingContracts(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.01") conf.set("spark.shuffle.manager", "sort") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 154a3ae..771d93b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -446,17 +446,6 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.shuffle.memoryFraction</code></td> - <td>0.2</td> - <td> - Fraction of Java heap to use for aggregation and cogroups during shuffles. - At any given time, the collective size of - all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will - begin to spill to disk. If spills are often, consider increasing this value at the expense of - <code>spark.storage.memoryFraction</code>. - </td> -</tr> -<tr> <td><code>spark.shuffle.service.enabled</code></td> <td>false</td> <td> @@ -712,6 +701,76 @@ Apart from these, the following properties are also available, and may be useful </tr> </table> +#### Memory Management +<table class="table"> +<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> +<tr> + <td><code>spark.memory.fraction</code></td> + <td>0.75</td> + <td> + Fraction of the heap space used for execution and storage. The lower this is, the more + frequently spills and cached data eviction occur. The purpose of this config is to set + aside memory for internal metadata, user data structures, and imprecise size estimation + in the case of sparse, unusually large records. + </td> +</tr> +<tr> + <td><code>spark.memory.storageFraction</code></td> + <td>0.5</td> + <td> + Tâhe size of the storage region within the space set aside by + <code>sâpark.memory.fraction</code>. This region is not statically reserved, but dynamically + allocated as cache requests come in. âCached data may be evicted only if total storage exceeds + this region. + </td> +</tr> +<tr> + <td><code>spark.memory.useLegacyMode</code></td> + <td>false</td> + <td> + âWhether to enable the legacy memory management mode used in Spark 1.5 and before. + The legacy mode rigidly partitions the heap space into fixed-size regions, + potentially leading to excessive spilling if the application was not tuned. + The following deprecated memory fraction configurations are not read unless this is enabled: + <code>spark.shuffle.memoryFraction</code><br> + <code>spark.storage.memoryFraction</code><br> + <code>spark.storage.unrollFraction</code> + </td> +</tr> +<tr> + <td><code>spark.shuffle.memoryFraction</code></td> + <td>0.2</td> + <td> + (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled. + Fraction of Java heap to use for aggregation and cogroups during shuffles. + At any given time, the collective size of + all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will + begin to spill to disk. If spills are often, consider increasing this value at the expense of + <code>spark.storage.memoryFraction</code>. + </td> +</tr> +<tr> + <td><code>spark.storage.memoryFraction</code></td> + <td>0.6</td> + <td> + (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled. + Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" + generation of objects in the JVM, which by default is given 0.6 of the heap, but you can + increase it if you configure your own old generation size. + </td> +</tr> +<tr> + <td><code>spark.storage.unrollFraction</code></td> + <td>0.2</td> + <td> + (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled. + Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory. + This is dynamically allocated by dropping existing blocks when there is not enough free + storage space to unroll the new block in its entirety. + </td> +</tr> +</table> + #### Execution Behavior <table class="table"> <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> @@ -825,15 +884,6 @@ Apart from these, the following properties are also available, and may be useful data may need to be rewritten to pre-existing output directories during checkpoint recovery.</td> </tr> <tr> - <td><code>spark.storage.memoryFraction</code></td> - <td>0.6</td> - <td> - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 0.6 of the heap, but you can - increase it if you configure your own old generation size. - </td> -</tr> -<tr> <td><code>spark.storage.memoryMapThreshold</code></td> <td>2m</td> <td> @@ -843,15 +893,6 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.storage.unrollFraction</code></td> - <td>0.2</td> - <td> - Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory. - This is dynamically allocated by dropping existing blocks when there is not enough free - storage space to unroll the new block in its entirety. - </td> -</tr> -<tr> <td><code>spark.externalBlockStore.blockManager</code></td> <td>org.apache.spark.storage.TachyonBlockManager</td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index ff65d7b..835f52f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -57,7 +57,9 @@ class TestShuffleMemoryManager } private class GrantEverythingMemoryManager extends MemoryManager { - override def acquireExecutionMemory(numBytes: Long): Long = numBytes + override def acquireExecutionMemory( + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes override def acquireStorageMemory( blockId: BlockId, numBytes: Long, @@ -66,12 +68,6 @@ private class GrantEverythingMemoryManager extends MemoryManager { blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def releaseExecutionMemory(numBytes: Long): Unit = { } - override def releaseStorageMemory(numBytes: Long): Unit = { } - override def releaseStorageMemory(): Unit = { } - override def releaseUnrollMemory(numBytes: Long): Unit = { } override def maxExecutionMemory: Long = Long.MaxValue override def maxStorageMemory: Long = Long.MaxValue - override def executionMemoryUsed: Long = Long.MaxValue - override def storageMemoryUsed: Long = Long.MaxValue } http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index f7d48bc..75d1fce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -103,7 +103,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { val conf = new SparkConf() .set("spark.shuffle.spill.initialMemoryThreshold", "1024") .set("spark.shuffle.sort.bypassMergeThreshold", "0") - .set("spark.shuffle.memoryFraction", "0.0001") + .set("spark.testing.memory", "80000") sc = new SparkContext("local", "test", conf) outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
