[FLINK-4754] [checkpoints] Small followups to the configuration of number of retained checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24408e19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24408e19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24408e19 Branch: refs/heads/master Commit: 24408e19037c8761924ca66a557dfdd8236a7be4 Parents: b46f5e0 Author: Stephan Ewen <[email protected]> Authored: Thu Mar 16 11:17:16 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Mar 16 14:43:27 2017 +0100 ---------------------------------------------------------------------- docs/setup/config.md | 2 +- .../apache/flink/configuration/CoreOptions.java | 21 +++-- .../executiongraph/ExecutionGraphBuilder.java | 11 ++- .../ExecutionGraphDeploymentTest.java | 80 +++++++++----------- 4 files changed, 58 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 048e012..c835882 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -182,7 +182,7 @@ will be used under the directory specified by jobmanager.web.tmpdir. - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints). -- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1) +- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1) - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`. http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 1e40569..8cb4123 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -23,9 +23,10 @@ import org.apache.flink.annotation.PublicEvolving; @PublicEvolving public class CoreOptions { - /** - * - */ + // ------------------------------------------------------------------------ + // process parameters + // ------------------------------------------------------------------------ + public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions .key("env.java.opts") .defaultValue(""); @@ -38,16 +39,24 @@ public class CoreOptions { .key("env.java.opts.taskmanager") .defaultValue(""); + // ------------------------------------------------------------------------ + // program + // ------------------------------------------------------------------------ + public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions .key("parallelism.default") .defaultValue(-1); - + + // ------------------------------------------------------------------------ + // checkpoints / fault tolerance + // ------------------------------------------------------------------------ + public static final ConfigOption<String> STATE_BACKEND = ConfigOptions .key("state.backend") .noDefaultValue(); /** The maximum number of completed checkpoint instances to retain.*/ - public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions - .key("state.checkpoints.max-retained-checkpoints") + public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") .defaultValue(1); } http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 8a35773..8471178 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -178,12 +178,17 @@ public class ExecutionGraphBuilder { CheckpointIDCounter checkpointIdCounter; try { int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger( - CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS); + CoreOptions.MAX_RETAINED_CHECKPOINTS); + if (maxNumberOfCheckpointsToRetain <= 0) { // warning and use 1 as the default value if the setting in // state.checkpoints.max-retained-checkpoints is not greater than 0. - log.warn("The setting for max-retained-checkpoints is not a positive number."); - maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(); + log.warn("The setting for '{} : {}' is invalid. Using default value of {}", + CoreOptions.MAX_RETAINED_CHECKPOINTS.key(), + maxNumberOfCheckpointsToRetain, + CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); + + maxNumberOfCheckpointsToRetain = CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); } completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader); http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 57b549b..7f5811a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.apache.flink.api.common.ExecutionConfig; @@ -446,61 +445,50 @@ public class ExecutionGraphDeploymentTest { assertEquals(JobStatus.FAILED, eg.getState()); } + // ------------------------------------------------------------------------ + // retained checkpoints config test + // ------------------------------------------------------------------------ + @Test - public void testSettingDefaultMaxNumberOfCheckpointsToRetain() { - try { - final Configuration jobManagerConfig = new Configuration(); + public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception { + final Configuration jobManagerConfig = new Configuration(); - final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); - assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(), + assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(), eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } } @Test - public void testSettingMaxNumberOfCheckpointsToRetain() { - try { - final int maxNumberOfCheckpointsToRetain = 10; - final Configuration jobManagerConfig = new Configuration(); - jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS, - maxNumberOfCheckpointsToRetain); + public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception { - final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + final int maxNumberOfCheckpointsToRetain = 10; + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, + maxNumberOfCheckpointsToRetain); - assertEquals(maxNumberOfCheckpointsToRetain, - eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + + assertEquals(maxNumberOfCheckpointsToRetain, + eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); } @Test - public void testSettingIllegalMaxNumberOfCheckpointsToRetain() { - try { - final int negativeMaxNumberOfCheckpointsToRetain = -10; + public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception { - final Configuration jobManagerConfig = new Configuration(); - jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS, - negativeMaxNumberOfCheckpointsToRetain); + final int negativeMaxNumberOfCheckpointsToRetain = -10; - final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, + negativeMaxNumberOfCheckpointsToRetain); - assertNotEquals(negativeMaxNumberOfCheckpointsToRetain, - eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); - assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(), - eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); + + assertNotEquals(negativeMaxNumberOfCheckpointsToRetain, + eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); + + assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(), + eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); } private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception { @@ -567,14 +555,14 @@ public class ExecutionGraphDeploymentTest { } private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception { - final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService executor = TestingUtils.defaultExecutor(); final JobID jobId = new JobID(); final JobGraph jobGraph = new JobGraph(jobId, "test"); jobGraph.setSnapshotSettings(new JobSnapshottingSettings( - new ArrayList<JobVertexID>(1), - new ArrayList<JobVertexID>(1), - new ArrayList<JobVertexID>(1), + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), 100, 10 * 60 * 1000, 0, @@ -592,7 +580,7 @@ public class ExecutionGraphDeploymentTest { new ProgrammedSlotProvider(1), getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(), - Time.minutes(10), + Time.seconds(10), new NoRestartStrategy(), new UnregisteredMetricsGroup(), 1,
