Repository: spark
Updated Branches:
  refs/heads/master 840dea64a -> 0e178e152


[SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite "Shuffle registration 
timeout and maxAttempts conf"

## What changes were proposed in this pull request?

[Ticket](https://issues.apache.org/jira/browse/SPARK-22297)
- one of the tests seems to produce unreliable results due to execution speed 
variability

Since the original test was trying to connect to the test server with `40 ms` 
timeout, and the test server replied after `50 ms`, the error might be produced 
under the following conditions:
- it might occur that the test server replies correctly after `50 ms`
- but the client does only receive the timeout after `51 ms`s
- this might happen if the executor has to schedule a big number of threads, 
and decides to delay the thread/actor that is responsible to watch the timeout, 
because of high CPU load
- running an entire test suite usually produces high loads on the CPU executing 
the tests

## How was this patch tested?

The test's check cases remain the same and the set-up emulates the previous 
version's.

Author: Mark Petruska <petruska.m...@gmail.com>

Closes #19671 from mpetruska/SPARK-22297.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e178e15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e178e15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e178e15

Branch: refs/heads/master
Commit: 0e178e1523175a0be9437920045e80deb0a2712b
Parents: 840dea6
Author: Mark Petruska <petruska.m...@gmail.com>
Authored: Wed Jan 24 10:25:14 2018 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Jan 24 10:25:14 2018 -0800

----------------------------------------------------------------------
 .../spark/storage/BlockManagerSuite.scala       | 55 ++++++++++++++------
 1 file changed, 38 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e178e15/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 629eed4..b19d8eb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.storage
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Future
 import scala.concurrent.duration._
@@ -44,8 +43,9 @@ import org.apache.spark.network.buffer.{ManagedBuffer, 
NioManagedBuffer}
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
 import org.apache.spark.network.netty.{NettyBlockTransferService, 
SparkTransportConf}
 import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, 
TransportServerBootstrap}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, 
TempFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
TempFileManager}
 import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, 
RegisterExecutor}
+import org.apache.spark.network.util.TransportConf
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
@@ -1325,9 +1325,18 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
   test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
     val tryAgainMsg = "test_spark_20640_try_again"
+    val timingoutExecutor = "timingoutExecutor"
+    val tryAgainExecutor = "tryAgainExecutor"
+    val succeedingExecutor = "succeedingExecutor"
+
     // a server which delays response 50ms and must try twice for success.
     def newShuffleServer(port: Int): (TransportServer, Int) = {
-      val attempts = new mutable.HashMap[String, Int]()
+      val failure = new Exception(tryAgainMsg)
+      val success = ByteBuffer.wrap(new Array[Byte](0))
+
+      var secondExecutorFailedOnce = false
+      var thirdExecutorFailedOnce = false
+
       val handler = new NoOpRpcHandler {
         override def receive(
             client: TransportClient,
@@ -1335,15 +1344,26 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
             callback: RpcResponseCallback): Unit = {
           val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
           msgObj match {
-            case exec: RegisterExecutor =>
-              Thread.sleep(50)
-              val attempt = attempts.getOrElse(exec.execId, 0) + 1
-              attempts(exec.execId) = attempt
-              if (attempt < 2) {
-                callback.onFailure(new Exception(tryAgainMsg))
-                return
-              }
-              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+
+            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
+              () // No reply to generate client-side timeout
+
+            case exec: RegisterExecutor
+              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce 
=>
+              secondExecutorFailedOnce = true
+              callback.onFailure(failure)
+
+            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
+              callback.onSuccess(success)
+
+            case exec: RegisterExecutor
+              if exec.execId == succeedingExecutor && !thirdExecutorFailedOnce 
=>
+              thirdExecutorFailedOnce = true
+              callback.onFailure(failure)
+
+            case exec: RegisterExecutor if exec.execId == succeedingExecutor =>
+              callback.onSuccess(success)
+
           }
         }
       }
@@ -1352,6 +1372,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
       val transCtx = new TransportContext(transConf, handler, true)
       (transCtx.createServer(port, 
Seq.empty[TransportServerBootstrap].asJava), port)
     }
+
     val candidatePort = RandomUtils.nextInt(1024, 65536)
     val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
       newShuffleServer, conf, "ShuffleServer")
@@ -1360,21 +1381,21 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     conf.set("spark.shuffle.service.port", shufflePort.toString)
     conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
     conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
-    var e = intercept[SparkException]{
-      makeBlockManager(8000, "executor1")
+    var e = intercept[SparkException] {
+      makeBlockManager(8000, timingoutExecutor)
     }.getMessage
     assert(e.contains("TimeoutException"))
 
     conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
     conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
-    e = intercept[SparkException]{
-      makeBlockManager(8000, "executor2")
+    e = intercept[SparkException] {
+      makeBlockManager(8000, tryAgainExecutor)
     }.getMessage
     assert(e.contains(tryAgainMsg))
 
     conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
     conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "2")
-    makeBlockManager(8000, "executor3")
+    makeBlockManager(8000, succeedingExecutor)
     server.close()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to