Repository: spark
Updated Branches:
  refs/heads/master 1bf55e396 -> 7dc3e697c


[SPARK-16251][SPARK-20200][CORE][TEST] Flaky test: 
org.apache.spark.rdd.LocalCheckpointSuite.missing checkpoint block fails with 
informative message

## What changes were proposed in this pull request?

Currently we don't wait to confirm the removal of the block from the slave's 
BlockManager, if the removal takes too much time, we will fail the assertion in 
this test case.
The failure can be easily reproduced if we sleep for a while before we remove 
the block in BlockManagerSlaveEndpoint.receiveAndReply().

## How was this patch tested?
N/A

Author: Xingbo Jiang <[email protected]>

Closes #18314 from jiangxb1987/LocalCheckpointSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7dc3e697
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7dc3e697
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7dc3e697

Branch: refs/heads/master
Commit: 7dc3e697c74864a4e3cca7342762f1427058b3c3
Parents: 1bf55e3
Author: Xingbo Jiang <[email protected]>
Authored: Fri Jun 16 00:06:54 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Jun 16 00:06:54 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/LocalCheckpointSuite.scala    | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7dc3e697/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index 2802cd9..9e204f5 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.rdd
 
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
 import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, 
SparkFunSuite}
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 
@@ -168,6 +172,10 @@ class LocalCheckpointSuite extends SparkFunSuite with 
LocalSparkContext {
     // Collecting the RDD should now fail with an informative exception
     val blockId = RDDBlockId(rdd.id, numPartitions - 1)
     bmm.removeBlock(blockId)
+    // Wait until the block has been removed successfully.
+    eventually(timeout(1 seconds), interval(100 milliseconds)) {
+      assert(bmm.getBlockStatus(blockId).isEmpty)
+    }
     try {
       rdd.collect()
       fail("Collect should have failed if local checkpoint block is 
removed...")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to