Repository: spark Updated Branches: refs/heads/master cd47e2337 -> 938434dc7
[SPARK-15913][CORE] Dispatcher.stopped should be enclosed by synchronized block. ## What changes were proposed in this pull request? `Dispatcher.stopped` is guarded by `this`, but it is used without synchronization in `postMessage` function. This PR fixes this and also the exception message became more accurate. ## How was this patch tested? Pass the existing Jenkins tests. Author: Dongjoon Hyun <dongj...@apache.org> Closes #13634 from dongjoon-hyun/SPARK-15913. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/938434dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/938434dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/938434dc Branch: refs/heads/master Commit: 938434dc78f35f77cdebd15dcce8d5e7871b396b Parents: cd47e23 Author: Dongjoon Hyun <dongj...@apache.org> Authored: Mon Jun 13 10:30:17 2016 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Mon Jun 13 10:30:17 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/rpc/netty/Dispatcher.scala | 21 ++++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/938434dc/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 4f8fe01..d305de2 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -144,25 +144,20 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { - val shouldCallOnStop = synchronized { + val error = synchronized { val data = endpoints.get(endpointName) - if (stopped || data == null) { - true + if (stopped) { + Some(new RpcEnvStoppedException()) + } else if (data == null) { + Some(new SparkException(s"Could not find $endpointName.")) } else { data.inbox.post(message) receivers.offer(data) - false + None } } - if (shouldCallOnStop) { - // We don't need to call `onStop` in the `synchronized` block - val error = if (stopped) { - new RpcEnvStoppedException() - } else { - new SparkException(s"Could not find $endpointName or it has been stopped.") - } - callbackIfStopped(error) - } + // We don't need to call `onStop` in the `synchronized` block + error.foreach(callbackIfStopped) } def stop(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org