Updated Branches: refs/heads/branch-0.8 27212addd -> 47fce43cf
Merge pull request #228 from pwendell/master Document missing configs and set shuffle consolidation to false. (cherry picked from commit 5d460253d6080d871cb71efb112ea17be0873771) Signed-off-by: Patrick Wendell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/47fce43c Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/47fce43c Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/47fce43c Branch: refs/heads/branch-0.8 Commit: 47fce43cfddb22b7173d9d12f815a9a05ffd1ca0 Parents: 27212ad Author: Patrick Wendell <[email protected]> Authored: Thu Dec 5 12:31:24 2013 -0800 Committer: Patrick Wendell <[email protected]> Committed: Thu Dec 5 12:33:02 2013 -0800 ---------------------------------------------------------------------- .../spark/storage/ShuffleBlockManager.scala | 2 +- .../spark/storage/DiskBlockManagerSuite.scala | 14 ++++++-- docs/configuration.md | 37 +++++++++++++++++++- 3 files changed, 49 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/47fce43c/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 2f1b049..e828e1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -62,7 +62,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = - System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean + System.getProperty("spark.shuffle.consolidateFiles", "false").toBoolean private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/47fce43c/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 0b90563..ef4c4c0 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -5,9 +5,9 @@ import java.io.{FileWriter, File} import scala.collection.mutable import com.google.common.io.Files -import org.scalatest.{BeforeAndAfterEach, FunSuite} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} -class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { +class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll { val rootDir0 = Files.createTempDir() rootDir0.deleteOnExit() @@ -16,6 +16,12 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { val rootDirs = rootDir0.getName + "," + rootDir1.getName println("Created root dirs: " + rootDirs) + // This suite focuses primarily on consolidation features, + // so we coerce consolidation if not already enabled. + val consolidateProp = "spark.shuffle.consolidateFiles" + val oldConsolidate = Option(System.getProperty(consolidateProp)) + System.setProperty(consolidateProp, "true") + val shuffleBlockManager = new ShuffleBlockManager(null) { var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) @@ -23,6 +29,10 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { var diskBlockManager: DiskBlockManager = _ + override def afterAll() { + oldConsolidate.map(c => System.setProperty(consolidateProp, c)) + } + override def beforeEach() { diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs) shuffleBlockManager.idToSegmentMap.clear() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/47fce43c/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 97183ba..22abe1c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -327,7 +327,42 @@ Apart from these, the following properties are also available, and may be useful Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit. </td> </tr> - +<tr> + <td>spark.shuffle.consolidateFiles</td> + <td>false</td> + <td> + If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance if you run shuffles with large numbers of reduce tasks. + </td> +</tr> +<tr> +<tr> + <td>spark.speculation</td> + <td>false</td> + <td> + If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. + </td> +</tr> +<tr> + <td>spark.speculation.interval</td> + <td>100</td> + <td> + How often Spark will check for tasks to speculate, in milliseconds. + </td> +</tr> +<tr> + <td>spark.speculation.quantile</td> + <td>0.75</td> + <td> + Percentage of tasks which must be complete before speculation is enabled for a particular stage. + </td> +</tr> +<tr> + <td>spark.speculation.multiplier</td> + <td>1.5</td> + <td> + How many times slower a task is than the median to be considered for speculation. + </td> +</tr> </table> # Environment Variables
