Repository: spark Updated Branches: refs/heads/master 42d225f44 -> 7bb6d31cf
[SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send won't be interrupted The current `NettyRpcEndpointRef.send` can be interrupted because it uses `LinkedBlockingQueue.put`, which may hang the application. Image the following execution order: | thread 1: TaskRunner.kill | thread 2: TaskRunner.run ------------- | ------------- | ------------- 1 | killed = true | 2 | | if (killed) { 3 | | throw new TaskKilledException 4 | | case _: TaskKilledException _: InterruptedException if task.killed => 5 | task.kill(interruptThread): interruptThread is true | 6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) 7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in LocalBackend Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will throw `InterruptedException`. This will prevent the executor from updating the task status and hang the application. An failure caused by the above issue here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use `LinkedBlockingQueue.offer` to resolve this issue. Author: zsxwing <zsxw...@gmail.com> Closes #9198 from zsxwing/dont-interrupt-send. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bb6d31c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bb6d31c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bb6d31c Branch: refs/heads/master Commit: 7bb6d31cff279776f90744407291682774cfe1c2 Parents: 42d225f Author: zsxwing <zsxw...@gmail.com> Authored: Thu Oct 22 11:31:47 2015 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Oct 22 11:31:47 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/rpc/netty/Dispatcher.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7bb6d31c/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index f1a8273..7bf44a6 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -66,7 +66,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) - receivers.put(data) // for the OnStart message + receivers.offer(data) // for the OnStart message } endpointRef } @@ -80,7 +80,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val data = endpoints.remove(name) if (data != null) { data.inbox.stop() - receivers.put(data) // for the OnStop message + receivers.offer(data) // for the OnStop message } // Don't clean `endpointRefs` here because it's possible that some messages are being processed // now and they can use `getRpcEndpointRef`. So `endpointRefs` will be cleaned in Inbox via @@ -163,7 +163,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { true } else { data.inbox.post(createMessageFn(data.ref)) - receivers.put(data) + receivers.offer(data) false } } @@ -183,7 +183,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { // Stop all endpoints. This will queue all endpoints for processing by the message loops. endpoints.keySet().asScala.foreach(unregisterRpcEndpoint) // Enqueue a message that tells the message loops to stop. - receivers.put(PoisonPill) + receivers.offer(PoisonPill) threadpool.shutdown() } @@ -218,7 +218,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val data = receivers.take() if (data == PoisonPill) { // Put PoisonPill back so that other MessageLoops can see it. - receivers.put(PoisonPill) + receivers.offer(PoisonPill) return } data.inbox.process(Dispatcher.this) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org