Repository: flink
Updated Branches:
  refs/heads/release-1.1 020da2ce1 -> da09d418c


[FLINK-5246] Don't discard checkpoint messages if they are unknown

This is the case if the savepoint coordinator has triggered a checkpoint. The 
corresponding
checkpoint messages are not known to the checkpoint coordinator and thus should 
not be
discarded. Instead, the JobManager will now discard all messages which have not 
been accepted
by neither the CheckpointCoordinator nor the SavepointCoordinator.

This closes #2930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da09d418
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da09d418
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da09d418

Branch: refs/heads/release-1.1
Commit: da09d418c1add17169368a38aeb9d793f9a2324c
Parents: 020da2c
Author: Till Rohrmann <[email protected]>
Authored: Sat Dec 3 20:15:35 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Dec 4 12:38:32 2016 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java |  6 +++---
 .../flink/runtime/jobmanager/JobManager.scala     | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da09d418/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 592bafc..a3e511f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -735,15 +735,15 @@ public class CheckpointCoordinator {
                                if 
(recentPendingCheckpoints.contains(checkpointId)) {
                                        wasPendingCheckpoint = true;
                                        LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
+
+                                       // try to discard the state so that we 
don't have lingering state lying around
+                                       discardState(message.getState());
                                }
                                else {
                                        LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
                                        wasPendingCheckpoint = false;
                                }
 
-                               // try to discard the state so that we don't 
have lingering state lying around
-                               discardState(message.getState());
-
                                return wasPendingCheckpoint;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/da09d418/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9f6e2db..cbf7b5d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1376,6 +1376,24 @@ class JobManager(
                     // addressed to the periodic checkpoint coordinator.
                     log.info("Received message for non-existing checkpoint " +
                       ackMessage.getCheckpointId)
+
+                    val classLoader = 
Option(libraryCacheManager.getClassLoader(jid)) match {
+                      case Some(userCodeClassLoader) => userCodeClassLoader
+                      case None => getClass.getClassLoader
+                    }
+
+                    future {
+                      Option(ackMessage.getState()) match {
+                        case Some(state) =>
+                          try {
+                            state.deserializeValue(classLoader).discardState()
+                          } catch {
+                            case e: Exception => log.warn("Could not discard 
orphaned checkpoint " +
+                                             "state.", e)
+                          }
+                        case None =>
+                      }
+                    }(ExecutionContext.fromExecutor(ioExecutor))
                   }
                 } catch {
                   case t: Throwable =>

Reply via email to