[FLINK-4754] [checkpoints] Small followups to the configuration of number of 
retained checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24408e19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24408e19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24408e19

Branch: refs/heads/master
Commit: 24408e19037c8761924ca66a557dfdd8236a7be4
Parents: b46f5e0
Author: Stephan Ewen <[email protected]>
Authored: Thu Mar 16 11:17:16 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |  2 +-
 .../apache/flink/configuration/CoreOptions.java | 21 +++--
 .../executiongraph/ExecutionGraphBuilder.java   | 11 ++-
 .../ExecutionGraphDeploymentTest.java           | 80 +++++++++-----------
 4 files changed, 58 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 048e012..c835882 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,7 +182,7 @@ will be used under the directory specified by 
jobmanager.web.tmpdir.
 
 - `state.checkpoints.dir`: The target directory for meta data of [externalized 
checkpoints]({{ site.baseurl 
}}/setup/checkpoints.html#externalized-checkpoints).
 
-- `state.checkpoints.max-retained-checkpoints`: The maximum number of 
completed checkpoint instances to retain. This setting defines how many 
completed checkpoint instances can be stored in `CompletedCheckpointStore`. 
(Default: 1)
+- `state.checkpoints.num-retained`: The number of completed checkpoint 
instances to retain. Having more than one allows recovery fallback to an 
earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
 
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for 
storing JobManager metadata; this is persisted in the state backend and only a 
pointer to this state is stored in ZooKeeper. Exactly like the checkpoint 
directory it must be accessible from the JobManager and a local filesystem 
should only be used for local deployments. Previously this key was named 
`recovery.zookeeper.storageDir`.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 1e40569..8cb4123 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -23,9 +23,10 @@ import org.apache.flink.annotation.PublicEvolving;
 @PublicEvolving
 public class CoreOptions {
 
-       /**
-        * 
-        */
+       // 
------------------------------------------------------------------------
+       //  process parameters
+       // 
------------------------------------------------------------------------
+
        public static final ConfigOption<String> FLINK_JVM_OPTIONS = 
ConfigOptions
                .key("env.java.opts")
                .defaultValue("");
@@ -38,16 +39,24 @@ public class CoreOptions {
                .key("env.java.opts.taskmanager")
                .defaultValue("");
 
+       // 
------------------------------------------------------------------------
+       //  program
+       // 
------------------------------------------------------------------------
+
        public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = 
ConfigOptions
                .key("parallelism.default")
                .defaultValue(-1);
-       
+
+       // 
------------------------------------------------------------------------
+       //  checkpoints / fault tolerance
+       // 
------------------------------------------------------------------------
+
        public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
                .key("state.backend")
                .noDefaultValue();
 
        /** The maximum number of completed checkpoint instances to retain.*/
-       public static final ConfigOption<Integer> 
STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
-               .key("state.checkpoints.max-retained-checkpoints")
+       public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+               .key("state.checkpoints.num-retained")
                .defaultValue(1);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8a35773..8471178 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -178,12 +178,17 @@ public class ExecutionGraphBuilder {
                        CheckpointIDCounter checkpointIdCounter;
                        try {
                                int maxNumberOfCheckpointsToRetain = 
jobManagerConfig.getInteger(
-                                       
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+                                       CoreOptions.MAX_RETAINED_CHECKPOINTS);
+
                                if (maxNumberOfCheckpointsToRetain <= 0) {
                                        // warning and use 1 as the default 
value if the setting in
                                        // 
state.checkpoints.max-retained-checkpoints is not greater than 0.
-                                       log.warn("The setting for 
max-retained-checkpoints is not a positive number.");
-                                       maxNumberOfCheckpointsToRetain = 
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+                                       log.warn("The setting for '{} : {}' is 
invalid. Using default value of {}",
+                                                       
CoreOptions.MAX_RETAINED_CHECKPOINTS.key(),
+                                                       
maxNumberOfCheckpointsToRetain,
+                                                       
CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+
+                                       maxNumberOfCheckpointsToRetain = 
CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
                                }
 
                                completedCheckpoints = 
recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, 
classLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 57b549b..7f5811a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,7 +33,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -446,61 +445,50 @@ public class ExecutionGraphDeploymentTest {
                assertEquals(JobStatus.FAILED, eg.getState());
        }
 
+       // 
------------------------------------------------------------------------
+       //  retained checkpoints config test
+       // 
------------------------------------------------------------------------
+
        @Test
-       public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
-               try {
-                       final Configuration jobManagerConfig = new 
Configuration();
+       public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws 
Exception {
+               final Configuration jobManagerConfig = new Configuration();
 
-                       final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+               final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
 
-                       assertEquals((int) 
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+               
assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
                                
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
        }
 
        @Test
-       public void testSettingMaxNumberOfCheckpointsToRetain() {
-               try {
-                       final int maxNumberOfCheckpointsToRetain = 10;
-                       final Configuration jobManagerConfig = new 
Configuration();
-                       
jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
-                               maxNumberOfCheckpointsToRetain);
+       public void testSettingMaxNumberOfCheckpointsToRetain() throws 
Exception {
 
-                       final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+               final int maxNumberOfCheckpointsToRetain = 10;
+               final Configuration jobManagerConfig = new Configuration();
+               
jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+                       maxNumberOfCheckpointsToRetain);
 
-                       assertEquals(maxNumberOfCheckpointsToRetain,
-                               
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+
+               assertEquals(maxNumberOfCheckpointsToRetain,
+                       
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        }
 
        @Test
-       public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
-               try {
-                       final int negativeMaxNumberOfCheckpointsToRetain = -10;
+       public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws 
Exception {
 
-                       final Configuration jobManagerConfig = new 
Configuration();
-                       
jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
-                               negativeMaxNumberOfCheckpointsToRetain);
+               final int negativeMaxNumberOfCheckpointsToRetain = -10;
 
-                       final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+               final Configuration jobManagerConfig = new Configuration();
+               
jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+                       negativeMaxNumberOfCheckpointsToRetain);
 
-                       assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
-                               
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-                       assertEquals((int) 
CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
-                               
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               final ExecutionGraph eg = 
createExecutionGraph(jobManagerConfig);
+
+               assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+                       
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+
+               
assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+                       
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        }
 
        private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> 
setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception 
{
@@ -567,14 +555,14 @@ public class ExecutionGraphDeploymentTest {
        }
 
        private ExecutionGraph createExecutionGraph(Configuration 
configuration) throws Exception {
-               final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+               final ScheduledExecutorService executor = 
TestingUtils.defaultExecutor();
 
                final JobID jobId = new JobID();
                final JobGraph jobGraph = new JobGraph(jobId, "test");
                jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
-                       new ArrayList<JobVertexID>(1),
-                       new ArrayList<JobVertexID>(1),
-                       new ArrayList<JobVertexID>(1),
+                       Collections.<JobVertexID>emptyList(),
+                       Collections.<JobVertexID>emptyList(),
+                       Collections.<JobVertexID>emptyList(),
                        100,
                        10 * 60 * 1000,
                        0,
@@ -592,7 +580,7 @@ public class ExecutionGraphDeploymentTest {
                        new ProgrammedSlotProvider(1),
                        getClass().getClassLoader(),
                        new StandaloneCheckpointRecoveryFactory(),
-                       Time.minutes(10),
+                       Time.seconds(10),
                        new NoRestartStrategy(),
                        new UnregisteredMetricsGroup(),
                        1,

Reply via email to