Repository: spark
Updated Branches:
  refs/heads/master df4a27cc5 -> c5fcb7f68


[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple 
times because of askWithRetry.

## What changes were proposed in this pull request?

`ReceiverSupervisorImpl` on executor side reports block's meta back to 
`ReceiverTracker` on driver side. In current code, `askWithRetry` is used. 
However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result 
in messages are processed multiple times.

**To reproduce**:

1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if 
so sleep long enough(say 200 seconds), thus the first RPC call will be timeout 
in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
  def streamProcessing(): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingTest")
      .setMaster(masterUrl)
    val ssc = new StreamingContext(conf, Seconds(200))
    val stream = ssc.socketTextStream("localhost", 1234)
    stream.print()
    ssc.start()
    ssc.awaitTermination()
  }
```
**To fix**:

It makes sense to provide a blocking version `ask` in RpcEndpointRef, as 
mentioned in SPARK-18113 
(https://github.com/apache/spark/pull/16503#event-927953218). Because Netty RPC 
layer will not drop messages. `askWithRetry` is a leftover from akka days. It 
imposes restrictions on the caller(e.g. idempotency) and other things that 
people generally don't pay that much attention to when using it.

## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.

Author: jinxing <[email protected]>

Closes #16690 from jinxing64/SPARK-19347.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5fcb7f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5fcb7f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5fcb7f6

Branch: refs/heads/master
Commit: c5fcb7f68bff055cc56e487bd48994945e7935cd
Parents: df4a27c
Author: jinxing <[email protected]>
Authored: Wed Feb 1 13:54:37 2017 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Wed Feb 1 13:54:37 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rpc/RpcEndpointRef.scala   | 38 ++++++++++++++++++--
 .../receiver/ReceiverSupervisorImpl.scala       |  2 +-
 2 files changed, 36 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5fcb7f6/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 994e186..a577887 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -63,8 +63,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
   def ask[T: ClassTag](message: Any): Future[T] = ask(message, 
defaultAskTimeout)
 
   /**
-   * Send a message to the corresponding [[RpcEndpoint]] and get its result 
within a default
-   * timeout, or throw a SparkException if this fails even after the default 
number of retries.
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
+   * default timeout, throw an exception if this fails.
+   *
+   * Note: this is a blocking action which may cost a lot of time,  so don't 
call it in a message
+   * loop of [[RpcEndpoint]].
+
+   * @param message the message to send
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askSync[T: ClassTag](message: Any): T = askSync(message, 
defaultAskTimeout)
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
+   * specified timeout, throw an exception if this fails.
+   *
+   * Note: this is a blocking action which may cost a lot of time, so don't 
call it in a message
+   * loop of [[RpcEndpoint]].
+   *
+   * @param message the message to send
+   * @param timeout the timeout duration
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
+    val future = ask[T](message, timeout)
+    timeout.awaitResult(future)
+  }
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
+   * default timeout, throw a SparkException if this fails even after the 
default number of retries.
    * The default `timeout` will be used in every trial of calling 
`sendWithReply`. Because this
    * method retries, the message handling in the receiver side should be 
idempotent.
    *
@@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: 
SparkConf)
    * @tparam T type of the reply message
    * @return the reply message from the corresponding [[RpcEndpoint]]
    */
+  @deprecated("use 'askSync' instead.", "2.2.0")
   def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, 
defaultAskTimeout)
 
   /**
-   * Send a message to the corresponding [[RpcEndpoint.receive]] and get its 
result within a
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
    * specified timeout, throw a SparkException if this fails even after the 
specified number of
    * retries. `timeout` will be used in every trial of calling 
`sendWithReply`. Because this method
    * retries, the message handling in the receiver side should be idempotent.
@@ -91,6 +122,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
    * @tparam T type of the reply message
    * @return the reply message from the corresponding [[RpcEndpoint]]
    */
+  @deprecated("use 'askSync' instead.", "2.2.0")
   def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
     // TODO: Consider removing multiple attempts
     var attempts = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/c5fcb7f6/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index eca7c79..722024b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl(
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} 
ms")
     val numRecords = blockStoreResult.numRecords
     val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, 
blockStoreResult)
-    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
+    trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))
     logDebug(s"Reported block $blockId")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to