Repository: flink
Updated Branches:
  refs/heads/master 57f7747bb -> 38ab7164a


[FLINK-5007] [checkpointing] Retain externalized checkpoint on suspension

Handles graceful cluster shut down (non-HA) like cancellation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38ab7164
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38ab7164
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38ab7164

Branch: refs/heads/master
Commit: 38ab7164af6ec6e011aa489f7ebe2ed1611fee5e
Parents: 57f7747
Author: Ufuk Celebi <[email protected]>
Authored: Thu Nov 3 15:52:24 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Tue Dec 13 09:39:29 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/checkpoint/CheckpointProperties.java   | 4 +++-
 .../flink/runtime/checkpoint/CheckpointPropertiesTest.java      | 4 ++--
 .../flink/streaming/api/environment/CheckpointConfig.java       | 5 +++--
 3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index e4856cf..68a4998 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -264,6 +264,8 @@ public class CheckpointProperties implements Serializable {
         * @return Checkpoint properties for an external checkpoint.
         */
        public static CheckpointProperties forExternalizedCheckpoint(boolean 
deleteOnCancellation) {
-               return new CheckpointProperties(false, true, true, true, 
deleteOnCancellation, false, true);
+               // Handle suspension like cancellation as graceful cluster shut 
down
+               // suspends all jobs (non-HA).
+               return new CheckpointProperties(false, true, true, true, 
deleteOnCancellation, false, deleteOnCancellation);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index c996886..11bddb9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -48,7 +48,7 @@ public class CheckpointPropertiesTest {
         * Tests the external checkpoints properties.
         */
        @Test
-       public void testPersistentCheckpointProperties() {
+       public void testExternalizedCheckpointProperties() {
                CheckpointProperties props = 
CheckpointProperties.forExternalizedCheckpoint(true);
 
                assertFalse(props.forceCheckpoint());
@@ -67,7 +67,7 @@ public class CheckpointPropertiesTest {
                assertTrue(props.discardOnJobFinished());
                assertFalse(props.discardOnJobCancelled());
                assertFalse(props.discardOnJobFailed());
-               assertTrue(props.discardOnJobSuspended());
+               assertFalse(props.discardOnJobSuspended());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 0a7f65e..eb7833a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -235,8 +235,9 @@ public class CheckpointConfig implements 
java.io.Serializable {
         *
         * <p>Externalized checkpoints write their meta data out to persistent
         * storage and are <strong>not</strong> automatically cleaned up when
-        * the owning job fails (terminating with job status {@link 
JobStatus#FAILED}).
-        * In this case, you have to manually clean up the checkpoint state, 
both
+        * the owning job fails or is suspended (terminating with job status
+        * {@link JobStatus#FAILED} or {@link JobStatus#SUSPENDED}). In this
+        * case, you have to manually clean up the checkpoint state, both
         * the meta data and actual program state.
         *
         * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an

Reply via email to