Repository: spark Updated Branches: refs/heads/master bcc7373f6 -> 25782981c
[SPARK-12174] Speed up BlockManagerSuite getRemoteBytes() test This patch significantly speeds up the BlockManagerSuite's "SPARK-9591: getRemoteBytes from another location when Exception throw" test, reducing the test time from 45s to ~250ms. The key change was to set `spark.shuffle.io.maxRetries` to 0 (the code previously set `spark.network.timeout` to `2s`, but this didn't make a difference because the slowdown was not due to this timeout). Along the way, I also cleaned up the way that we handle SparkConf in BlockManagerSuite: previously, each test would mutate a shared SparkConf instance, while now each test gets a fresh SparkConf. Author: Josh Rosen <[email protected]> Closes #10759 from JoshRosen/SPARK-12174. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25782981 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25782981 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25782981 Branch: refs/heads/master Commit: 25782981cf58946dc7c186acadd2beec5d964461 Parents: bcc7373 Author: Josh Rosen <[email protected]> Authored: Thu Jan 14 17:37:27 2016 -0800 Committer: Andrew Or <[email protected]> Committed: Thu Jan 14 17:37:27 2016 -0800 ---------------------------------------------------------------------- .../spark/storage/BlockManagerSuite.scala | 71 +++++++++----------- 1 file changed, 30 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/25782981/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 67210e5..62e6c4f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -45,20 +45,18 @@ import org.apache.spark.util._ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties { - private val conf = new SparkConf(false).set("spark.app.id", "test") + var conf: SparkConf = null var store: BlockManager = null var store2: BlockManager = null var store3: BlockManager = null var rpcEnv: RpcEnv = null var master: BlockManagerMaster = null - conf.set("spark.authenticate", "false") - val securityMgr = new SecurityManager(conf) - val mapOutputTracker = new MapOutputTrackerMaster(conf) - val shuffleManager = new HashShuffleManager(conf) + val securityMgr = new SecurityManager(new SparkConf(false)) + val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false)) + val shuffleManager = new HashShuffleManager(new SparkConf(false)) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test - conf.set("spark.kryoserializer.buffer", "1m") - val serializer = new KryoSerializer(conf) + val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) @@ -79,15 +77,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE override def beforeEach(): Unit = { super.beforeEach() - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) - // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case System.setProperty("os.arch", "amd64") - conf.set("os.arch", "amd64") - conf.set("spark.test.useCompressedOops", "true") + conf = new SparkConf(false) + .set("spark.app.id", "test") + .set("spark.kryoserializer.buffer", "1m") + .set("spark.test.useCompressedOops", "true") + .set("spark.storage.unrollFraction", "0.4") + .set("spark.storage.unrollMemoryThreshold", "512") + + rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) - conf.set("spark.storage.unrollFraction", "0.4") - conf.set("spark.storage.unrollMemoryThreshold", "512") master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) @@ -98,6 +98,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE override def afterEach(): Unit = { try { + conf = null if (store != null) { store.stop() store = null @@ -473,34 +474,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("SPARK-9591: getRemoteBytes from another location when Exception throw") { - val origTimeoutOpt = conf.getOption("spark.network.timeout") - try { - conf.set("spark.network.timeout", "2s") - store = makeBlockManager(8000, "executor1") - store2 = makeBlockManager(8000, "executor2") - store3 = makeBlockManager(8000, "executor3") - val list1 = List(new Array[Byte](4000)) - store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - var list1Get = store.getRemoteBytes("list1") - assert(list1Get.isDefined, "list1Get expected to be fetched") - // block manager exit - store2.stop() - store2 = null - list1Get = store.getRemoteBytes("list1") - // get `list1` block - assert(list1Get.isDefined, "list1Get expected to be fetched") - store3.stop() - store3 = null - // exception throw because there is no locations - intercept[BlockFetchException] { - list1Get = store.getRemoteBytes("list1") - } - } finally { - origTimeoutOpt match { - case Some(t) => conf.set("spark.network.timeout", t) - case None => conf.remove("spark.network.timeout") - } + conf.set("spark.shuffle.io.maxRetries", "0") + store = makeBlockManager(8000, "executor1") + store2 = makeBlockManager(8000, "executor2") + store3 = makeBlockManager(8000, "executor3") + val list1 = List(new Array[Byte](4000)) + store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") + store2.stop() + store2 = null + assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") + store3.stop() + store3 = null + // exception throw because there is no locations + intercept[BlockFetchException] { + store.getRemoteBytes("list1") } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
