Repository: flink Updated Branches: refs/heads/master aedc0fd48 -> bce068c49
[hotfix] [runtime] Disable restart suppression on cancelAndClearEverything This temporary disables 28c57c3 (\cc @tillrohrmann) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bce068c4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bce068c4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bce068c4 Branch: refs/heads/master Commit: bce068c49f7dc38955dcd6a6c4e26fb483332aaa Parents: aedc0fd Author: Ufuk Celebi <[email protected]> Authored: Tue May 31 10:50:08 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jun 1 13:15:54 2016 +0200 ---------------------------------------------------------------------- .../scala/org/apache/flink/runtime/jobmanager/JobManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bce068c4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 540957d..8ab887a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -210,7 +210,7 @@ class JobManager( log.info(s"Stopping JobManager $getAddress.") val newFuturesToComplete = cancelAndClearEverything( - new SuppressRestartsException(new Exception("The JobManager is shutting down.")), + new Exception("The JobManager is shutting down."), removeJobFromStateBackend = true) implicit val executionContext = context.dispatcher @@ -307,7 +307,7 @@ class JobManager( log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") val newFuturesToComplete = cancelAndClearEverything( - new SuppressRestartsException(new Exception("JobManager is no longer the leader.")), + new Exception("JobManager is no longer the leader."), removeJobFromStateBackend = false) futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) @@ -1633,7 +1633,7 @@ class JobManager( * @param cause Cause for the cancelling. */ private def cancelAndClearEverything( - cause: SuppressRestartsException, + cause: Throwable, removeJobFromStateBackend: Boolean) : Seq[Future[Unit]] = { val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
