Repository: flink
Updated Branches:
  refs/heads/master 9bcbcf4a5 -> 884d3e2a4


[FLINK-4258] fix potential NPE in SavepointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/884d3e2a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/884d3e2a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/884d3e2a

Branch: refs/heads/master
Commit: 884d3e2a4a894c79676a1b2cf0b1bd075550bbbd
Parents: 9bcbcf4
Author: Maximilian Michels <[email protected]>
Authored: Tue Jul 26 18:35:33 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Tue Jul 26 18:37:58 2016 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/savepoint/SavepointCoordinator.java      | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/884d3e2a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
index 5759e35..cd9eb0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
@@ -200,8 +200,6 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
 
                        Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
 
-                       long recoveryTimestamp = System.currentTimeMillis();
-
                        for (TaskState taskState : savepoint.getTaskStates()) {
                                ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
 
@@ -292,8 +290,9 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
                Promise<String> promise = 
savepointPromises.remove(checkpoint.getCheckpointID());
 
                if (promise == null) {
-                       LOG.info("Pending savepoint with ID " + 
checkpoint.getCheckpointID() + "  has been " +
+                       LOG.warn("Pending savepoint with ID " + 
checkpoint.getCheckpointID() + "  has been " +
                                        "removed before receiving 
acknowledgment.");
+                       return;
                }
 
                // Sanity check

Reply via email to