Repository: spark
Updated Branches:
  refs/heads/master df3cbe3a3 -> ba8912e5f


[SPARK-19450] Replace askWithRetry with askSync.

## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and 
https://github.com/apache/spark/pull/16690#issuecomment-276850068) and 
`askWithRetry` is marked as deprecated.
As mentioned 
SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218):

>askWithRetry is basically an unneeded API, and a leftover from the akka days 
>that doesn't make sense anymore. It's prone to cause deadlocks (exactly 
>because it's blocking), it imposes restrictions on the caller (e.g. 
>idempotency) and other things that people generally don't pay that much 
>attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It might 
make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.

Author: jinxing <[email protected]>

Closes #16790 from jinxing64/SPARK-19450.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba8912e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba8912e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba8912e5

Branch: refs/heads/master
Commit: ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2
Parents: df3cbe3
Author: jinxing <[email protected]>
Authored: Sun Feb 19 04:34:07 2017 -0800
Committer: Sean Owen <[email protected]>
Committed: Sun Feb 19 04:34:07 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |  2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../scala/org/apache/spark/deploy/Client.scala  |  2 +-
 .../org/apache/spark/deploy/master/Master.scala |  2 +-
 .../deploy/master/ui/ApplicationPage.scala      |  2 +-
 .../spark/deploy/master/ui/MasterPage.scala     |  2 +-
 .../deploy/rest/StandaloneRestServer.scala      |  6 +-
 .../spark/deploy/worker/ui/WorkerPage.scala     |  4 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  2 +-
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../org/apache/spark/rpc/RpcEndpointRef.scala   | 60 --------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  4 +-
 .../spark/storage/BlockManagerMaster.scala      | 32 +++++------
 .../StandaloneDynamicAllocationSuite.scala      |  4 +-
 .../spark/deploy/client/AppClientSuite.scala    |  2 +-
 .../spark/deploy/master/MasterSuite.scala       |  2 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      | 21 ++++---
 .../spark/storage/BlockManagerSuite.scala       |  2 +-
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  2 +-
 .../spark/deploy/yarn/YarnAllocator.scala       |  2 +-
 .../streaming/state/StateStoreCoordinator.scala |  8 +--
 .../receiver/ReceiverSupervisorImpl.scala       |  4 +-
 .../streaming/scheduler/ReceiverTracker.scala   |  6 +-
 24 files changed, 58 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 4ca442b..4ef6656 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -99,7 +99,7 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
    */
   protected def askTracker[T: ClassTag](message: Any): T = {
     try {
-      trackerEndpoint.askWithRetry[T](message)
+      trackerEndpoint.askSync[T](message)
     } catch {
       case e: Exception =>
         logError("Error communicating with MapOutputTracker", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7e56406..e4d8389 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -605,7 +605,7 @@ class SparkContext(config: SparkConf) extends Logging {
         Some(Utils.getThreadDump())
       } else {
         val endpointRef = 
env.blockManager.master.getExecutorEndpointRef(executorId).get
-        
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
+        Some(endpointRef.askSync[Array[ThreadStackTrace]](TriggerThreadDump))
       }
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index a4de3d7..bf60932 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -123,7 +123,7 @@ private class ClientEndpoint(
     Thread.sleep(5000)
     logInfo("... polling master for driver state")
     val statusResponse =
-      
activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
+      
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
     if (statusResponse.found) {
       logInfo(s"State of $driverId is ${statusResponse.state.get}")
       // Worker node, if present

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c5f7c07..816bf37 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -1045,7 +1045,7 @@ private[deploy] object Master extends Logging {
     val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
     val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
       new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
-    val portsResponse = 
masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
+    val portsResponse = 
masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
     (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 18cff31..946a928 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -34,7 +34,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) 
extends WebUIPage("app")
   /** Executor details for a particular application */
   def render(request: HttpServletRequest): Seq[Node] = {
     val appId = request.getParameter("appId")
-    val state = master.askWithRetry[MasterStateResponse](RequestMasterState)
+    val state = master.askSync[MasterStateResponse](RequestMasterState)
     val app = state.activeApps.find(_.id == appId)
       .getOrElse(state.completedApps.find(_.id == appId).orNull)
     if (app == null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index ebbbbd3..7dbe329 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -33,7 +33,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
   private val master = parent.masterEndpointRef
 
   def getMasterState: MasterStateResponse = {
-    master.askWithRetry[MasterStateResponse](RequestMasterState)
+    master.askSync[MasterStateResponse](RequestMasterState)
   }
 
   override def renderJson(request: HttpServletRequest): JValue = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index c19296c..5662006 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -71,7 +71,7 @@ private[rest] class 
StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef,
   extends KillRequestServlet {
 
   protected def handleKill(submissionId: String): KillSubmissionResponse = {
-    val response = 
masterEndpoint.askWithRetry[DeployMessages.KillDriverResponse](
+    val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse](
       DeployMessages.RequestKillDriver(submissionId))
     val k = new KillSubmissionResponse
     k.serverSparkVersion = sparkVersion
@@ -89,7 +89,7 @@ private[rest] class 
StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe
   extends StatusRequestServlet {
 
   protected def handleStatus(submissionId: String): SubmissionStatusResponse = 
{
-    val response = 
masterEndpoint.askWithRetry[DeployMessages.DriverStatusResponse](
+    val response = masterEndpoint.askSync[DeployMessages.DriverStatusResponse](
       DeployMessages.RequestDriverStatus(submissionId))
     val message = response.exception.map { s"Exception from the cluster:\n" + 
formatException(_) }
     val d = new SubmissionStatusResponse
@@ -174,7 +174,7 @@ private[rest] class StandaloneSubmitRequestServlet(
     requestMessage match {
       case submitRequest: CreateSubmissionRequest =>
         val driverDescription = buildDriverDescription(submitRequest)
-        val response = 
masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse](
+        val response = 
masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
           DeployMessages.RequestSubmitDriver(driverDescription))
         val submitResponse = new CreateSubmissionResponse
         submitResponse.serverSparkVersion = sparkVersion

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
index 8ebcbcb..1ad9731 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
@@ -34,12 +34,12 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends 
WebUIPage("") {
   private val workerEndpoint = parent.worker.self
 
   override def renderJson(request: HttpServletRequest): JValue = {
-    val workerState = 
workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
+    val workerState = 
workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
     JsonProtocol.writeWorkerState(workerState)
   }
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    val workerState = 
workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
+    val workerState = 
workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
 
     val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job 
Details", "Logs")
     val runningExecutors = workerState.executors

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4a38560..b376ecd 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -199,7 +199,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
         new SecurityManager(executorConf),
         clientMode = true)
       val driver = fetcher.setupEndpointRefByURI(driverUrl)
-      val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
+      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
       val props = cfg.sparkProperties ++ Seq[(String, 
String)](("spark.app.id", appId))
       fetcher.shutdown()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index db5d0d8..d762f11 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -677,7 +677,7 @@ private[spark] class Executor(
 
     val message = Heartbeat(executorId, accumUpdates.toArray, 
env.blockManager.blockManagerId)
     try {
-      val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
+      val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
           message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
       if (response.reregisterBlockManager) {
         logInfo("Told to re-register on heartbeat")

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index a577887..4d39f14 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -92,64 +92,4 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
     timeout.awaitResult(future)
   }
 
-  /**
-   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
-   * default timeout, throw a SparkException if this fails even after the 
default number of retries.
-   * The default `timeout` will be used in every trial of calling 
`sendWithReply`. Because this
-   * method retries, the message handling in the receiver side should be 
idempotent.
-   *
-   * Note: this is a blocking action which may cost a lot of time,  so don't 
call it in a message
-   * loop of [[RpcEndpoint]].
-   *
-   * @param message the message to send
-   * @tparam T type of the reply message
-   * @return the reply message from the corresponding [[RpcEndpoint]]
-   */
-  @deprecated("use 'askSync' instead.", "2.2.0")
-  def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, 
defaultAskTimeout)
-
-  /**
-   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
-   * specified timeout, throw a SparkException if this fails even after the 
specified number of
-   * retries. `timeout` will be used in every trial of calling 
`sendWithReply`. Because this method
-   * retries, the message handling in the receiver side should be idempotent.
-   *
-   * Note: this is a blocking action which may cost a lot of time, so don't 
call it in a message
-   * loop of [[RpcEndpoint]].
-   *
-   * @param message the message to send
-   * @param timeout the timeout duration
-   * @tparam T type of the reply message
-   * @return the reply message from the corresponding [[RpcEndpoint]]
-   */
-  @deprecated("use 'askSync' instead.", "2.2.0")
-  def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
-    // TODO: Consider removing multiple attempts
-    var attempts = 0
-    var lastException: Exception = null
-    while (attempts < maxRetries) {
-      attempts += 1
-      try {
-        val future = ask[T](message, timeout)
-        val result = timeout.awaitResult(future)
-        if (result == null) {
-          throw new SparkException("RpcEndpoint returned null")
-        }
-        return result
-      } catch {
-        case ie: InterruptedException => throw ie
-        case e: Exception =>
-          lastException = e
-          logWarning(s"Error sending message [message = $message] in $attempts 
attempts", e)
-      }
-
-      if (attempts < maxRetries) {
-        Thread.sleep(retryWaitMs)
-      }
-    }
-
-    throw new SparkException(
-      s"Error sending message [message = $message]", lastException)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0b7d371..692ed80 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -232,7 +232,7 @@ class DAGScheduler(
       accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
       blockManagerId: BlockManagerId): Boolean = {
     listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
-    blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
+    blockManagerMaster.driverEndpoint.askSync[Boolean](
       BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, 
"BlockManagerHeartbeat"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e006cc9..94abe30 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -372,7 +372,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     try {
       if (driverEndpoint != null) {
         logInfo("Shutting down all executors")
-        driverEndpoint.askWithRetry[Boolean](StopExecutors)
+        driverEndpoint.askSync[Boolean](StopExecutors)
       }
     } catch {
       case e: Exception =>
@@ -384,7 +384,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     stopExecutors()
     try {
       if (driverEndpoint != null) {
-        driverEndpoint.askWithRetry[Boolean](StopDriver)
+        driverEndpoint.askSync[Boolean](StopDriver)
       }
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 7a60006..3ca690d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -60,7 +60,7 @@ class BlockManagerMaster(
       maxMemSize: Long,
       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
     logInfo(s"Registering BlockManager $blockManagerId")
-    val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
+    val updatedId = driverEndpoint.askSync[BlockManagerId](
       RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
     logInfo(s"Registered BlockManager $updatedId")
     updatedId
@@ -72,7 +72,7 @@ class BlockManagerMaster(
       storageLevel: StorageLevel,
       memSize: Long,
       diskSize: Long): Boolean = {
-    val res = driverEndpoint.askWithRetry[Boolean](
+    val res = driverEndpoint.askSync[Boolean](
       UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, 
diskSize))
     logDebug(s"Updated info of block $blockId")
     res
@@ -80,12 +80,12 @@ class BlockManagerMaster(
 
   /** Get locations of the blockId from the driver */
   def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
-    driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
+    driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
   }
 
   /** Get locations of multiple blockIds from the driver */
   def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] 
= {
-    driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](
+    driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
       GetLocationsMultipleBlockIds(blockIds))
   }
 
@@ -99,11 +99,11 @@ class BlockManagerMaster(
 
   /** Get ids of other nodes in the cluster from the driver */
   def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
-    driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
+    driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
   def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
-    
driverEndpoint.askWithRetry[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
+    
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
   }
 
   /**
@@ -111,12 +111,12 @@ class BlockManagerMaster(
    * blocks that the driver knows about.
    */
   def removeBlock(blockId: BlockId) {
-    driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
+    driverEndpoint.askSync[Boolean](RemoveBlock(blockId))
   }
 
   /** Remove all blocks belonging to the given RDD. */
   def removeRdd(rddId: Int, blocking: Boolean) {
-    val future = 
driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
+    val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
@@ -128,7 +128,7 @@ class BlockManagerMaster(
 
   /** Remove all blocks belonging to the given shuffle. */
   def removeShuffle(shuffleId: Int, blocking: Boolean) {
-    val future = 
driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
+    val future = 
driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
@@ -140,7 +140,7 @@ class BlockManagerMaster(
 
   /** Remove all blocks belonging to the given broadcast. */
   def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: 
Boolean) {
-    val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](
+    val future = driverEndpoint.askSync[Future[Seq[Int]]](
       RemoveBroadcast(broadcastId, removeFromMaster))
     future.onFailure {
       case e: Exception =>
@@ -159,11 +159,11 @@ class BlockManagerMaster(
    * amount of remaining memory.
    */
   def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
-    driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, 
Long)]](GetMemoryStatus)
+    driverEndpoint.askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
   }
 
   def getStorageStatus: Array[StorageStatus] = {
-    driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus)
+    driverEndpoint.askSync[Array[StorageStatus]](GetStorageStatus)
   }
 
   /**
@@ -184,7 +184,7 @@ class BlockManagerMaster(
      * master endpoint for a response to a prior message.
      */
     val response = driverEndpoint.
-      askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
+      askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
     val (blockManagerIds, futures) = response.unzip
     implicit val sameThread = ThreadUtils.sameThread
     val cbf =
@@ -214,7 +214,7 @@ class BlockManagerMaster(
       filter: BlockId => Boolean,
       askSlaves: Boolean): Seq[BlockId] = {
     val msg = GetMatchingBlockIds(filter, askSlaves)
-    val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
+    val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg)
     timeout.awaitResult(future)
   }
 
@@ -223,7 +223,7 @@ class BlockManagerMaster(
    * since they are not reported the master.
    */
   def hasCachedBlocks(executorId: String): Boolean = {
-    driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
+    driverEndpoint.askSync[Boolean](HasCachedBlocks(executorId))
   }
 
   /** Stop the driver endpoint, called only on the Spark driver node */
@@ -237,7 +237,7 @@ class BlockManagerMaster(
 
   /** Send a one-way message to the master endpoint, to which we expect it to 
reply with true. */
   private def tell(message: Any) {
-    if (!driverEndpoint.askWithRetry[Boolean](message)) {
+    if (!driverEndpoint.askSync[Boolean](message)) {
       throw new SparkException("BlockManagerMasterEndpoint returned false, 
expected true.")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 54ea727..9839dcf 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -547,7 +547,7 @@ class StandaloneDynamicAllocationSuite
 
   /** Get the Master state */
   private def getMasterState: MasterStateResponse = {
-    master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+    master.self.askSync[MasterStateResponse](RequestMasterState)
   }
 
   /** Get the applications that are active from Master */
@@ -620,7 +620,7 @@ class StandaloneDynamicAllocationSuite
       when(endpointRef.address).thenReturn(mockAddress)
       val message = RegisterExecutor(id, endpointRef, "localhost", 10, 
Map.empty)
       val backend = 
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
-      backend.driverEndpoint.askWithRetry[Boolean](message)
+      backend.driverEndpoint.askSync[Boolean](message)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index bc58fb2..936639b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -171,7 +171,7 @@ class AppClientSuite
 
   /** Get the Master state */
   private def getMasterState: MasterStateResponse = {
-    master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+    master.self.askSync[MasterStateResponse](RequestMasterState)
   }
 
   /** Get the applications that are active from Master */

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index da7253b..2127da4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -432,7 +432,7 @@ class MasterSuite extends SparkFunSuite
     val master = makeMaster()
     master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
     eventually(timeout(10.seconds)) {
-      val masterState = 
master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
       assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index b4037d7..31d9dd3 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -118,8 +118,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
       }
     }
     val rpcEndpointRef = env.setupEndpoint("send-ref", endpoint)
-    val newRpcEndpointRef = 
rpcEndpointRef.askWithRetry[RpcEndpointRef]("Hello")
-    val reply = newRpcEndpointRef.askWithRetry[String]("Echo")
+    val newRpcEndpointRef = rpcEndpointRef.askSync[RpcEndpointRef]("Hello")
+    val reply = newRpcEndpointRef.askSync[String]("Echo")
     assert("Echo" === reply)
   }
 
@@ -132,7 +132,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
           context.reply(msg)
       }
     })
-    val reply = rpcEndpointRef.askWithRetry[String]("hello")
+    val reply = rpcEndpointRef.askSync[String]("hello")
     assert("hello" === reply)
   }
 
@@ -150,7 +150,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, 
"ask-remotely")
     try {
-      val reply = rpcEndpointRef.askWithRetry[String]("hello")
+      val reply = rpcEndpointRef.askSync[String]("hello")
       assert("hello" === reply)
     } finally {
       anotherEnv.shutdown()
@@ -177,14 +177,13 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, 
"ask-timeout")
     try {
-      // Any exception thrown in askWithRetry is wrapped with a SparkException 
and set as the cause
-      val e = intercept[SparkException] {
-        rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, 
shortProp))
+      val e = intercept[RpcTimeoutException] {
+        rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1 millis, 
shortProp))
       }
       // The SparkException cause should be a RpcTimeoutException with message 
indicating the
       // controlling timeout property
-      assert(e.getCause.isInstanceOf[RpcTimeoutException])
-      assert(e.getCause.getMessage.contains(shortProp))
+      assert(e.isInstanceOf[RpcTimeoutException])
+      assert(e.getMessage.contains(shortProp))
     } finally {
       anotherEnv.shutdown()
       anotherEnv.awaitTermination()
@@ -677,7 +676,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
         }
       })
       val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, 
"ask-authentication")
-      val reply = rpcEndpointRef.askWithRetry[String]("hello")
+      val reply = rpcEndpointRef.askSync[String]("hello")
       assert("hello" === reply)
     } finally {
       localEnv.shutdown()
@@ -894,7 +893,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
     val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699")
     // Make sure the connect is set up
-    assert(ref.askWithRetry[String]("hello") === "hello")
+    assert(ref.askSync[String]("hello") === "hello")
     anotherEnv.shutdown()
     anotherEnv.awaitTermination()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 705c355..64a67b4 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -394,7 +394,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from 
master")
 
-    val reregister = !master.driverEndpoint.askWithRetry[Boolean](
+    val reregister = !master.driverEndpoint.askSync[Boolean](
       BlockManagerHeartbeat(store.blockManagerId))
     assert(reregister == true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a674da4..cdb3b68 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -442,7 +442,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     backend.stop()
     // Any method of the backend involving sending messages to the driver 
endpoint should not
     // be called after the backend is stopped.
-    verify(driverEndpoint, 
never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+    verify(driverEndpoint, 
never()).askSync(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
   }
 
   test("mesos supports spark.executor.uri") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a76dbd..abd2de7 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -101,7 +101,7 @@ private[yarn] class YarnAllocator(
    * @see SPARK-12864
    */
   private var executorIdCounter: Int =
-    driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
+    driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
 
   // Queue to store the timestamp of failed executors
   private val failedExecutorsTimeStamps = new Queue[Long]()

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index 267d176..d0f8188 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -88,21 +88,21 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
 
   /** Verify whether the given executor has the active instance of a state 
store */
   private[state] def verifyIfInstanceActive(storeId: StateStoreId, executorId: 
String): Boolean = {
-    rpcEndpointRef.askWithRetry[Boolean](VerifyIfInstanceActive(storeId, 
executorId))
+    rpcEndpointRef.askSync[Boolean](VerifyIfInstanceActive(storeId, 
executorId))
   }
 
   /** Get the location of the state store */
   private[state] def getLocation(storeId: StateStoreId): Option[String] = {
-    rpcEndpointRef.askWithRetry[Option[String]](GetLocation(storeId))
+    rpcEndpointRef.askSync[Option[String]](GetLocation(storeId))
   }
 
   /** Deactivate instances related to a set of operator */
   private[state] def deactivateInstances(storeRootLocation: String): Unit = {
-    
rpcEndpointRef.askWithRetry[Boolean](DeactivateInstances(storeRootLocation))
+    rpcEndpointRef.askSync[Boolean](DeactivateInstances(storeRootLocation))
   }
 
   private[state] def stop(): Unit = {
-    rpcEndpointRef.askWithRetry[Boolean](StopCoordinator)
+    rpcEndpointRef.askSync[Boolean](StopCoordinator)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 722024b..f5c8a88 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -188,13 +188,13 @@ private[streaming] class ReceiverSupervisorImpl(
   override protected def onReceiverStart(): Boolean = {
     val msg = RegisterReceiver(
       streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
-    trackerEndpoint.askWithRetry[Boolean](msg)
+    trackerEndpoint.askSync[Boolean](msg)
   }
 
   override protected def onReceiverStop(message: String, error: 
Option[Throwable]) {
     logInfo("Deregistering receiver " + streamId)
     val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
-    trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, 
message, errorString))
+    trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, 
errorString))
     logInfo("Stopped receiver " + streamId)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 8f55d98..bd7ab0b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -170,7 +170,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
       trackerState = Stopping
       if (!skipReceiverLaunch) {
         // Send the stop signal to all the receivers
-        endpoint.askWithRetry[Boolean](StopAllReceivers)
+        endpoint.askSync[Boolean](StopAllReceivers)
 
         // Wait for the Spark job that runs the receivers to be over
         // That is, for the receivers to quit gracefully.
@@ -183,7 +183,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         }
 
         // Check if all the receivers have been deregistered or not
-        val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds)
+        val receivers = endpoint.askSync[Seq[Int]](AllReceiverIds)
         if (receivers.nonEmpty) {
           logWarning("Not all of the receivers have deregistered, " + 
receivers)
         } else {
@@ -249,7 +249,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
    */
   def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
     if (isTrackerStarted) {
-      endpoint.askWithRetry[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+      endpoint.askSync[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
         _.runningExecutor.map {
           _.executorId
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to