[FLINK-3863] Yarn Cluster shutdown may fail if leader changed recently
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e984241 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e984241 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e984241 Branch: refs/heads/master Commit: 9e9842410a635d183a002d1f25a6f489ce9d6a2f Parents: f9b52a3 Author: Maximilian Michels <[email protected]> Authored: Wed Jun 1 12:45:52 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Jun 17 10:37:58 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/yarn/ApplicationClient.scala | 31 ++++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e984241/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index aea1aac..e701269 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -176,29 +176,22 @@ class ApplicationClient( } } - case LocalStopYarnSession(status, diagnostics) => + case msg @ LocalStopYarnSession(status, diagnostics) => log.info("Sending StopCluster request to JobManager.") - val clusterStatus = - status match { - case FinalApplicationStatus.SUCCEEDED => ApplicationStatus.SUCCEEDED - case FinalApplicationStatus.KILLED => ApplicationStatus.CANCELED - case FinalApplicationStatus.FAILED => ApplicationStatus.FAILED - case _ => ApplicationStatus.UNKNOWN - } - - yarnJobManager foreach { - // forward to preserve the sender's address - _ forward decorateMessage(new StopCluster(clusterStatus, diagnostics)) + // preserve the original sender so we can reply + val originalSender = sender() + + yarnJobManager match { + case Some(jm) => + jm.tell(decorateMessage(new StopCluster(status, diagnostics)), originalSender) + case None => + context.system.scheduler.scheduleOnce(1 second) { + // try once more; we might have been connected in the meantime + self.tell(msg, originalSender) + }(context.dispatcher) } - case msg: StopClusterSuccessful => - log.info("Remote JobManager has been stopped successfully. " + - "Stopping local application client") - - // poison ourselves - self ! decorateMessage(PoisonPill) - // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr case status: GetClusterStatusResponse => latestClusterStatus = Some(status)
