Repository: spark Updated Branches: refs/heads/master 6c726a3fb -> 27e7f5a72
[SPARK-5083][Core] Fix a flaky test in TaskResultGetterSuite Because `sparkEnv.blockManager.master.removeBlock` is asynchronous, we need to make sure the block has already been removed before calling `super.enqueueSuccessfulTask`. Author: zsxwing <[email protected]> Closes #3894 from zsxwing/SPARK-5083 and squashes the following commits: d97c03d [zsxwing] Fix a flaky test in TaskResultGetterSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27e7f5a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27e7f5a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27e7f5a7 Branch: refs/heads/master Commit: 27e7f5a7237d9d64a3b2c8a030ba3e3a9a96b26c Parents: 6c726a3 Author: zsxwing <[email protected]> Authored: Sun Jan 4 21:09:21 2015 -0800 Committer: Reynold Xin <[email protected]> Committed: Sun Jan 4 21:09:21 2015 -0800 ---------------------------------------------------------------------- .../spark/scheduler/TaskResultGetterSuite.scala | 22 ++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/27e7f5a7/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 3aab5a1..e3a3803 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -19,7 +19,12 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.control.NonFatal + +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually._ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv} import org.apache.spark.storage.TaskResultBlockId @@ -34,6 +39,8 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule extends TaskResultGetter(sparkEnv, scheduler) { var removedResult = false + @volatile var removeBlockSuccessfully = false + override def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { if (!removedResult) { @@ -42,6 +49,15 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule serializer.get().deserialize[TaskResult[_]](serializedData) match { case IndirectTaskResult(blockId, size) => sparkEnv.blockManager.master.removeBlock(blockId) + // removeBlock is asynchronous. Need to wait it's removed successfully + try { + eventually(timeout(3 seconds), interval(200 milliseconds)) { + assert(!sparkEnv.blockManager.master.contains(blockId)) + } + removeBlockSuccessfully = true + } catch { + case NonFatal(e) => removeBlockSuccessfully = false + } case directResult: DirectTaskResult[_] => taskSetManager.abort("Internal error: expect only indirect results") } @@ -92,10 +108,12 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSpark assert(false, "Expect local cluster to use TaskSchedulerImpl") throw new ClassCastException } - scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) + val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) + scheduler.taskResultGetter = resultGetter val akkaFrameSize = sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + assert(resultGetter.removeBlockSuccessfully) assert(result === 1.to(akkaFrameSize).toArray) // Make sure two tasks were run (one failed one, and a second retried one). --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
