Repository: flink Updated Branches: refs/heads/release-1.1 020da2ce1 -> da09d418c
[FLINK-5246] Don't discard checkpoint messages if they are unknown This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding checkpoint messages are not known to the checkpoint coordinator and thus should not be discarded. Instead, the JobManager will now discard all messages which have not been accepted by neither the CheckpointCoordinator nor the SavepointCoordinator. This closes #2930. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da09d418 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da09d418 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da09d418 Branch: refs/heads/release-1.1 Commit: da09d418c1add17169368a38aeb9d793f9a2324c Parents: 020da2c Author: Till Rohrmann <[email protected]> Authored: Sat Dec 3 20:15:35 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Dec 4 12:38:32 2016 +0100 ---------------------------------------------------------------------- .../runtime/checkpoint/CheckpointCoordinator.java | 6 +++--- .../flink/runtime/jobmanager/JobManager.scala | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/da09d418/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 592bafc..a3e511f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -735,15 +735,15 @@ public class CheckpointCoordinator { if (recentPendingCheckpoints.contains(checkpointId)) { wasPendingCheckpoint = true; LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId); + + // try to discard the state so that we don't have lingering state lying around + discardState(message.getState()); } else { LOG.debug("Received message for an unknown checkpoint {}.", checkpointId); wasPendingCheckpoint = false; } - // try to discard the state so that we don't have lingering state lying around - discardState(message.getState()); - return wasPendingCheckpoint; } } http://git-wip-us.apache.org/repos/asf/flink/blob/da09d418/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9f6e2db..cbf7b5d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1376,6 +1376,24 @@ class JobManager( // addressed to the periodic checkpoint coordinator. log.info("Received message for non-existing checkpoint " + ackMessage.getCheckpointId) + + val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match { + case Some(userCodeClassLoader) => userCodeClassLoader + case None => getClass.getClassLoader + } + + future { + Option(ackMessage.getState()) match { + case Some(state) => + try { + state.deserializeValue(classLoader).discardState() + } catch { + case e: Exception => log.warn("Could not discard orphaned checkpoint " + + "state.", e) + } + case None => + } + }(ExecutionContext.fromExecutor(ioExecutor)) } } catch { case t: Throwable =>
