Repository: flink Updated Branches: refs/heads/master 9bcbcf4a5 -> 884d3e2a4
[FLINK-4258] fix potential NPE in SavepointCoordinator Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/884d3e2a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/884d3e2a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/884d3e2a Branch: refs/heads/master Commit: 884d3e2a4a894c79676a1b2cf0b1bd075550bbbd Parents: 9bcbcf4 Author: Maximilian Michels <[email protected]> Authored: Tue Jul 26 18:35:33 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Tue Jul 26 18:37:58 2016 +0200 ---------------------------------------------------------------------- .../runtime/checkpoint/savepoint/SavepointCoordinator.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/884d3e2a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java index 5759e35..cd9eb0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java @@ -200,8 +200,6 @@ public class SavepointCoordinator extends CheckpointCoordinator { Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); - long recoveryTimestamp = System.currentTimeMillis(); - for (TaskState taskState : savepoint.getTaskStates()) { ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); @@ -292,8 +290,9 @@ public class SavepointCoordinator extends CheckpointCoordinator { Promise<String> promise = savepointPromises.remove(checkpoint.getCheckpointID()); if (promise == null) { - LOG.info("Pending savepoint with ID " + checkpoint.getCheckpointID() + " has been " + + LOG.warn("Pending savepoint with ID " + checkpoint.getCheckpointID() + " has been " + "removed before receiving acknowledgment."); + return; } // Sanity check
