Repository: flink Updated Branches: refs/heads/master 57f7747bb -> 38ab7164a
[FLINK-5007] [checkpointing] Retain externalized checkpoint on suspension Handles graceful cluster shut down (non-HA) like cancellation. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38ab7164 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38ab7164 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38ab7164 Branch: refs/heads/master Commit: 38ab7164af6ec6e011aa489f7ebe2ed1611fee5e Parents: 57f7747 Author: Ufuk Celebi <[email protected]> Authored: Thu Nov 3 15:52:24 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Tue Dec 13 09:39:29 2016 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/checkpoint/CheckpointProperties.java | 4 +++- .../flink/runtime/checkpoint/CheckpointPropertiesTest.java | 4 ++-- .../flink/streaming/api/environment/CheckpointConfig.java | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java index e4856cf..68a4998 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java @@ -264,6 +264,8 @@ public class CheckpointProperties implements Serializable { * @return Checkpoint properties for an external checkpoint. */ public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) { - return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, true); + // Handle suspension like cancellation as graceful cluster shut down + // suspends all jobs (non-HA). + return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, deleteOnCancellation); } } http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java index c996886..11bddb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java @@ -48,7 +48,7 @@ public class CheckpointPropertiesTest { * Tests the external checkpoints properties. */ @Test - public void testPersistentCheckpointProperties() { + public void testExternalizedCheckpointProperties() { CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true); assertFalse(props.forceCheckpoint()); @@ -67,7 +67,7 @@ public class CheckpointPropertiesTest { assertTrue(props.discardOnJobFinished()); assertFalse(props.discardOnJobCancelled()); assertFalse(props.discardOnJobFailed()); - assertTrue(props.discardOnJobSuspended()); + assertFalse(props.discardOnJobSuspended()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 0a7f65e..eb7833a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -235,8 +235,9 @@ public class CheckpointConfig implements java.io.Serializable { * * <p>Externalized checkpoints write their meta data out to persistent * storage and are <strong>not</strong> automatically cleaned up when - * the owning job fails (terminating with job status {@link JobStatus#FAILED}). - * In this case, you have to manually clean up the checkpoint state, both + * the owning job fails or is suspended (terminating with job status + * {@link JobStatus#FAILED} or {@link JobStatus#SUSPENDED}). In this + * case, you have to manually clean up the checkpoint state, both * the meta data and actual program state. * * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an
