Repository: flink Updated Branches: refs/heads/master c607b9a22 -> 42328bd9b
[hotfix] [checkpoints] Rename JobSnapshottingSettings to JobCheckpointingSettings Cleanup to consistently use - checkpoint for the overall fault tolerance mechanism and procedure - snapshot for an operators state snapshot that is part of a checkpoint Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f0d6769 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f0d6769 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f0d6769 Branch: refs/heads/master Commit: 5f0d6769051be8db8ee12659e13d22e5c5fd2f2d Parents: 4d8627c Author: Stephan Ewen <[email protected]> Authored: Mon Mar 27 18:23:13 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Apr 20 10:59:50 2017 +0200 ---------------------------------------------------------------------- .../jobmanager/JMXJobManagerMetricTest.java | 4 +- .../checkpoints/CheckpointConfigHandler.java | 4 +- .../CheckpointConfigHandlerTest.java | 14 +- .../checkpoint/CheckpointStatsTracker.java | 14 +- .../executiongraph/AccessExecutionGraph.java | 6 +- .../executiongraph/ArchivedExecutionGraph.java | 13 +- .../runtime/executiongraph/ExecutionGraph.java | 7 +- .../executiongraph/ExecutionGraphBuilder.java | 4 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 12 +- .../tasks/JobCheckpointingSettings.java | 151 +++++++++++++++++++ .../jobgraph/tasks/JobSnapshottingSettings.java | 151 ------------------- .../checkpoint/CheckpointStatsTrackerTest.java | 16 +- .../checkpoint/CoordinatorShutdownTest.java | 6 +- .../ArchivedExecutionGraphTest.java | 4 +- .../ExecutionGraphDeploymentTest.java | 4 +- .../tasks/JobCheckpointingSettingsTest.java | 65 ++++++++ .../tasks/JobSnapshottingSettingsTest.java | 65 -------- .../jobmanager/JobManagerHARecoveryTest.java | 4 +- .../runtime/jobmanager/JobManagerTest.java | 12 +- .../flink/runtime/jobmanager/JobSubmitTest.java | 4 +- .../runtime/jobmanager/JobManagerITCase.scala | 8 +- .../api/graph/StreamingJobGraphGenerator.java | 4 +- .../graph/StreamingJobGraphGeneratorTest.java | 4 +- 23 files changed, 287 insertions(+), 289 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 1fdac65..934a621 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.junit.Assert; @@ -70,7 +70,7 @@ public class JMXJobManagerMetricTest { sourceJobVertex.setInvokableClass(BlockingInvokable.class); JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); - jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index 947b7c3..7914c29 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; @@ -69,7 +69,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); - JobSnapshottingSettings settings = graph.getJobSnapshottingSettings(); + JobCheckpointingSettings settings = graph.getJobCheckpointingSettings(); if (settings == null) { return "{}"; http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java index 9d339f5..6e48973 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -50,7 +50,7 @@ public class CheckpointConfigHandlerTest { AccessExecutionGraph graph = graphAndSettings.graph; when(graph.getJobID()).thenReturn(new JobID()); - JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings; + JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings; ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); @@ -90,7 +90,7 @@ public class CheckpointConfigHandlerTest { GraphAndSettings graphAndSettings = createGraphAndSettings(false, true); AccessExecutionGraph graph = graphAndSettings.graph; - JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings; + JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings; CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class)); String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()); @@ -156,7 +156,7 @@ public class CheckpointConfigHandlerTest { ? ExternalizedCheckpointSettings.externalizeCheckpoints(true) : ExternalizedCheckpointSettings.none(); - JobSnapshottingSettings settings = new JobSnapshottingSettings( + JobCheckpointingSettings settings = new JobCheckpointingSettings( Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), @@ -169,19 +169,19 @@ public class CheckpointConfigHandlerTest { exactlyOnce); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getJobSnapshottingSettings()).thenReturn(settings); + when(graph.getJobCheckpointingSettings()).thenReturn(settings); return new GraphAndSettings(graph, settings, externalizedSetting); } private static class GraphAndSettings { public final AccessExecutionGraph graph; - public final JobSnapshottingSettings snapshottingSettings; + public final JobCheckpointingSettings snapshottingSettings; public final ExternalizedCheckpointSettings externalizedSettings; public GraphAndSettings( AccessExecutionGraph graph, - JobSnapshottingSettings snapshottingSettings, + JobCheckpointingSettings snapshottingSettings, ExternalizedCheckpointSettings externalizedSettings) { this.graph = graph; this.snapshottingSettings = snapshottingSettings; http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index c7efb7b..313fe13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -31,7 +31,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; /** * Tracker for checkpoint statistics. @@ -67,7 +67,7 @@ public class CheckpointStatsTracker { private final int totalSubtaskCount; /** Snapshotting settings created from the CheckpointConfig. */ - private final JobSnapshottingSettings jobSnapshottingSettings; + private final JobCheckpointingSettings jobCheckpointingSettings; /** Checkpoint counts. */ private final CheckpointStatsCounts counts = new CheckpointStatsCounts(); @@ -104,19 +104,19 @@ public class CheckpointStatsTracker { * * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in progress ones. * @param jobVertices Job vertices involved in the checkpoints. - * @param jobSnapshottingSettings Snapshotting settings created from the CheckpointConfig. + * @param jobCheckpointingSettings Snapshotting settings created from the CheckpointConfig. * @param metricGroup Metric group for exposed metrics */ public CheckpointStatsTracker( int numRememberedCheckpoints, List<ExecutionJobVertex> jobVertices, - JobSnapshottingSettings jobSnapshottingSettings, + JobCheckpointingSettings jobCheckpointingSettings, MetricGroup metricGroup) { checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints"); this.history = new CheckpointStatsHistory(numRememberedCheckpoints); this.jobVertices = checkNotNull(jobVertices, "JobVertices"); - this.jobSnapshottingSettings = checkNotNull(jobSnapshottingSettings); + this.jobCheckpointingSettings = checkNotNull(jobCheckpointingSettings); // Compute the total subtask count. We do this here in order to only // do it once. @@ -143,8 +143,8 @@ public class CheckpointStatsTracker { * * @return The job's snapshotting settings. */ - public JobSnapshottingSettings getSnapshottingSettings() { - return jobSnapshottingSettings; + public JobCheckpointingSettings getSnapshottingSettings() { + return jobCheckpointingSettings; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index 18c2ec2..3b064c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.util.SerializedValue; import java.io.IOException; @@ -116,12 +116,12 @@ public interface AccessExecutionGraph { CheckpointCoordinator getCheckpointCoordinator(); /** - * Returns the {@link JobSnapshottingSettings} or <code>null</code> if + * Returns the {@link JobCheckpointingSettings} or <code>null</code> if * checkpointing is disabled. * * @return JobSnapshottingSettings for this execution graph */ - JobSnapshottingSettings getJobSnapshottingSettings(); + JobCheckpointingSettings getJobCheckpointingSettings(); /** * Returns a snapshot of the checkpoint statistics or <code>null</code> if http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index 334b0d0..b9db1e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; @@ -81,7 +81,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl private final Map<String, SerializedValue<Object>> serializedUserAccumulators; @Nullable - private final JobSnapshottingSettings jobSnapshottingSettings; + private final JobCheckpointingSettings jobCheckpointingSettings; @Nullable private final CheckpointStatsSnapshot checkpointStatsSnapshot; @@ -99,7 +99,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl Map<String, SerializedValue<Object>> serializedUserAccumulators, ArchivedExecutionConfig executionConfig, boolean isStoppable, - @Nullable JobSnapshottingSettings jobSnapshottingSettings, + @Nullable JobCheckpointingSettings jobCheckpointingSettings, @Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) { this.jobID = jobID; @@ -114,7 +114,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl this.serializedUserAccumulators = serializedUserAccumulators; this.archivedExecutionConfig = executionConfig; this.isStoppable = isStoppable; - this.jobSnapshottingSettings = jobSnapshottingSettings; + this.jobCheckpointingSettings = jobCheckpointingSettings; this.checkpointStatsSnapshot = checkpointStatsSnapshot; } @@ -210,9 +210,8 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl return null; } - @Override - public JobSnapshottingSettings getJobSnapshottingSettings() { - return jobSnapshottingSettings; + public JobCheckpointingSettings getJobCheckpointingSettings() { + return jobCheckpointingSettings; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index b21b72b..29b9806 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -55,7 +55,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.query.KvStateLocationRegistry; @@ -419,8 +419,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive return restartStrategy; } - @Override - public JobSnapshottingSettings getJobSnapshottingSettings() { + public JobCheckpointingSettings getJobCheckpointingSettings() { if (checkpointStatsTracker != null) { return checkpointStatsTracker.getSnapshottingSettings(); } else { @@ -1477,7 +1476,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive serializedUserAccumulators, getArchivedExecutionConfig(), isStoppable(), - getJobSnapshottingSettings(), + getJobCheckpointingSettings(), getCheckpointStatsSnapshot()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index a6455f5..a10c62e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.util.DynamicCodeLoadingException; @@ -159,7 +159,7 @@ public class ExecutionGraphBuilder { } // configure the state checkpointing - JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings(); + JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings(); if (snapshotSettings != null) { List<ExecutionJobVertex> triggerVertices = idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph); http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index f6377e5..2a8af37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -28,7 +28,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.util.SerializedValue; import scala.concurrent.duration.FiniteDuration; @@ -92,7 +92,7 @@ public class JobGraph implements Serializable { private SerializedValue<ExecutionConfig> serializedExecutionConfig; /** The settings for the job checkpoints */ - private JobSnapshottingSettings snapshotSettings; + private JobCheckpointingSettings snapshotSettings; /** Savepoint restore settings. */ private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); @@ -331,17 +331,17 @@ public class JobGraph implements Serializable { * * @param settings The snapshot settings, or null, to disable snapshotting. */ - public void setSnapshotSettings(JobSnapshottingSettings settings) { + public void setSnapshotSettings(JobCheckpointingSettings settings) { this.snapshotSettings = settings; } /** * Gets the settings for asynchronous snapshots. This method returns null, when - * snapshotting is not enabled. + * checkpointing is not enabled. * - * @return The snapshot settings, or null, if snapshotting is not enabled. + * @return The snapshot settings, or null, if checkpointing is not enabled. */ - public JobSnapshottingSettings getSnapshotSettings() { + public JobCheckpointingSettings getCheckpointingSettings() { return snapshotSettings; } http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java new file mode 100644 index 0000000..38130d4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.StateBackend; + +import javax.annotation.Nullable; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * The JobCheckpointingSettings are attached to a JobGraph and describe the settings + * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices + * need to participate. + */ +public class JobCheckpointingSettings implements java.io.Serializable { + + private static final long serialVersionUID = -2593319571078198180L; + + private final List<JobVertexID> verticesToTrigger; + + private final List<JobVertexID> verticesToAcknowledge; + + private final List<JobVertexID> verticesToConfirm; + + private final long checkpointInterval; + + private final long checkpointTimeout; + + private final long minPauseBetweenCheckpoints; + + private final int maxConcurrentCheckpoints; + + /** Settings for externalized checkpoints. */ + private final ExternalizedCheckpointSettings externalizedCheckpointSettings; + + /** The default state backend, if configured by the user in the job */ + @Nullable + private final StateBackend defaultStateBackend; + + /** + * Flag indicating whether exactly once checkpoint mode has been configured. + * If <code>false</code>, at least once mode has been configured. This is + * not a necessary attribute, because the checkpointing mode is only relevant + * for the stream tasks, but we expose it here to forward it to the web runtime + * UI. + */ + private final boolean isExactlyOnce; + + public JobCheckpointingSettings( + List<JobVertexID> verticesToTrigger, + List<JobVertexID> verticesToAcknowledge, + List<JobVertexID> verticesToConfirm, + long checkpointInterval, + long checkpointTimeout, + long minPauseBetweenCheckpoints, + int maxConcurrentCheckpoints, + ExternalizedCheckpointSettings externalizedCheckpointSettings, + @Nullable StateBackend defaultStateBackend, + boolean isExactlyOnce) { + + // sanity checks + if (checkpointInterval < 1 || checkpointTimeout < 1 || + minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { + throw new IllegalArgumentException(); + } + + this.verticesToTrigger = requireNonNull(verticesToTrigger); + this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge); + this.verticesToConfirm = requireNonNull(verticesToConfirm); + this.checkpointInterval = checkpointInterval; + this.checkpointTimeout = checkpointTimeout; + this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; + this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; + this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings); + this.defaultStateBackend = defaultStateBackend; + this.isExactlyOnce = isExactlyOnce; + } + + // -------------------------------------------------------------------------------------------- + + public List<JobVertexID> getVerticesToTrigger() { + return verticesToTrigger; + } + + public List<JobVertexID> getVerticesToAcknowledge() { + return verticesToAcknowledge; + } + + public List<JobVertexID> getVerticesToConfirm() { + return verticesToConfirm; + } + + public long getCheckpointInterval() { + return checkpointInterval; + } + + public long getCheckpointTimeout() { + return checkpointTimeout; + } + + public long getMinPauseBetweenCheckpoints() { + return minPauseBetweenCheckpoints; + } + + public int getMaxConcurrentCheckpoints() { + return maxConcurrentCheckpoints; + } + + public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() { + return externalizedCheckpointSettings; + } + + @Nullable + public StateBackend getDefaultStateBackend() { + return defaultStateBackend; + } + + public boolean isExactlyOnce() { + return isExactlyOnce; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String toString() { + return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " + + "maxConcurrent=%d, trigger=%s, ack=%s, commit=%s", + checkpointInterval, checkpointTimeout, + minPauseBetweenCheckpoints, maxConcurrentCheckpoints, + verticesToTrigger, verticesToAcknowledge, verticesToConfirm); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java deleted file mode 100644 index 233aa88..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobgraph.tasks; - -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.StateBackend; - -import javax.annotation.Nullable; -import java.util.List; - -import static java.util.Objects.requireNonNull; - -/** - * The JobCheckpointingSettings are attached to a JobGraph and describe the settings - * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices - * need to participate. - */ -public class JobSnapshottingSettings implements java.io.Serializable { - - private static final long serialVersionUID = -2593319571078198180L; - - private final List<JobVertexID> verticesToTrigger; - - private final List<JobVertexID> verticesToAcknowledge; - - private final List<JobVertexID> verticesToConfirm; - - private final long checkpointInterval; - - private final long checkpointTimeout; - - private final long minPauseBetweenCheckpoints; - - private final int maxConcurrentCheckpoints; - - /** Settings for externalized checkpoints. */ - private final ExternalizedCheckpointSettings externalizedCheckpointSettings; - - /** The default state backend, if configured by the user in the job */ - @Nullable - private final StateBackend defaultStateBackend; - - /** - * Flag indicating whether exactly once checkpoint mode has been configured. - * If <code>false</code>, at least once mode has been configured. This is - * not a necessary attribute, because the checkpointing mode is only relevant - * for the stream tasks, but we expose it here to forward it to the web runtime - * UI. - */ - private final boolean isExactlyOnce; - - public JobSnapshottingSettings( - List<JobVertexID> verticesToTrigger, - List<JobVertexID> verticesToAcknowledge, - List<JobVertexID> verticesToConfirm, - long checkpointInterval, - long checkpointTimeout, - long minPauseBetweenCheckpoints, - int maxConcurrentCheckpoints, - ExternalizedCheckpointSettings externalizedCheckpointSettings, - @Nullable StateBackend defaultStateBackend, - boolean isExactlyOnce) { - - // sanity checks - if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { - throw new IllegalArgumentException(); - } - - this.verticesToTrigger = requireNonNull(verticesToTrigger); - this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge); - this.verticesToConfirm = requireNonNull(verticesToConfirm); - this.checkpointInterval = checkpointInterval; - this.checkpointTimeout = checkpointTimeout; - this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; - this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; - this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings); - this.defaultStateBackend = defaultStateBackend; - this.isExactlyOnce = isExactlyOnce; - } - - // -------------------------------------------------------------------------------------------- - - public List<JobVertexID> getVerticesToTrigger() { - return verticesToTrigger; - } - - public List<JobVertexID> getVerticesToAcknowledge() { - return verticesToAcknowledge; - } - - public List<JobVertexID> getVerticesToConfirm() { - return verticesToConfirm; - } - - public long getCheckpointInterval() { - return checkpointInterval; - } - - public long getCheckpointTimeout() { - return checkpointTimeout; - } - - public long getMinPauseBetweenCheckpoints() { - return minPauseBetweenCheckpoints; - } - - public int getMaxConcurrentCheckpoints() { - return maxConcurrentCheckpoints; - } - - public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() { - return externalizedCheckpointSettings; - } - - @Nullable - public StateBackend getDefaultStateBackend() { - return defaultStateBackend; - } - - public boolean isExactlyOnce() { - return isExactlyOnce; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " + - "maxConcurrent=%d, trigger=%s, ack=%s, commit=%s", - checkpointInterval, checkpointTimeout, - minPauseBetweenCheckpoints, maxConcurrentCheckpoints, - verticesToTrigger, verticesToAcknowledge, verticesToConfirm); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index aaf1774..d66d0be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -44,7 +44,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.junit.Test; public class CheckpointStatsTrackerTest { @@ -58,7 +58,7 @@ public class CheckpointStatsTrackerTest { when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); when(jobVertex.getParallelism()).thenReturn(1); - JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings( Collections.singletonList(new JobVertexID()), Collections.singletonList(new JobVertexID()), Collections.singletonList(new JobVertexID()), @@ -94,7 +94,7 @@ public class CheckpointStatsTrackerTest { CheckpointStatsTracker tracker = new CheckpointStatsTracker( 0, Collections.singletonList(jobVertex), - mock(JobSnapshottingSettings.class), + mock(JobCheckpointingSettings.class), new UnregisteredMetricsGroup()); PendingCheckpointStats pending = tracker.reportPendingCheckpoint( @@ -142,7 +142,7 @@ public class CheckpointStatsTrackerTest { CheckpointStatsTracker tracker = new CheckpointStatsTracker( 10, Collections.singletonList(jobVertex), - mock(JobSnapshottingSettings.class), + mock(JobCheckpointingSettings.class), new UnregisteredMetricsGroup()); // Completed checkpoint @@ -247,7 +247,7 @@ public class CheckpointStatsTrackerTest { CheckpointStatsTracker tracker = new CheckpointStatsTracker( 10, Collections.singletonList(jobVertex), - mock(JobSnapshottingSettings.class), + mock(JobCheckpointingSettings.class), new UnregisteredMetricsGroup()); CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot(); @@ -293,7 +293,7 @@ public class CheckpointStatsTrackerTest { new CheckpointStatsTracker( 0, Collections.singletonList(jobVertex), - mock(JobSnapshottingSettings.class), + mock(JobCheckpointingSettings.class), metricGroup); verify(metricGroup, times(1)).gauge(eq(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC), any(Gauge.class)); @@ -409,7 +409,7 @@ public class CheckpointStatsTrackerTest { CheckpointStatsTracker stats = new CheckpointStatsTracker( 0, Collections.singletonList(jobVertex), - mock(JobSnapshottingSettings.class), + mock(JobCheckpointingSettings.class), metricGroup); // Make sure to adjust this test if metrics are added/removed @@ -522,7 +522,7 @@ public class CheckpointStatsTrackerTest { return new CheckpointStatsTracker( 0, Collections.singletonList(jobVertex), - mock(JobSnapshottingSettings.class), + mock(JobCheckpointingSettings.class), new UnregisteredMetricsGroup()); } http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 976da48..2e0bd76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -65,7 +65,7 @@ public class CoordinatorShutdownTest { List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); - testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, + testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); @@ -124,7 +124,7 @@ public class CoordinatorShutdownTest { List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); - testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, + testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 077ab53..f96b624 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.junit.BeforeClass; @@ -116,7 +116,7 @@ public class ArchivedExecutionGraphTest { CheckpointStatsTracker statsTracker = new CheckpointStatsTracker( 0, jobVertices, - mock(JobSnapshottingSettings.class), + mock(JobCheckpointingSettings.class), new UnregisteredMetricsGroup()); runtimeGraph.enableCheckpointing( http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 8d91b84..866f55c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -61,7 +61,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.operators.BatchTask; @@ -559,7 +559,7 @@ public class ExecutionGraphDeploymentTest { final JobID jobId = new JobID(); final JobGraph jobGraph = new JobGraph(jobId, "test"); - jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java new file mode 100644 index 0000000..c3524fa --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class JobCheckpointingSettingsTest { + + /** + * Tests that the settings are actually serializable. + */ + @Test + public void testIsJavaSerializable() throws Exception { + JobCheckpointingSettings settings = new JobCheckpointingSettings( + Arrays.asList(new JobVertexID(), new JobVertexID()), + Arrays.asList(new JobVertexID(), new JobVertexID()), + Arrays.asList(new JobVertexID(), new JobVertexID()), + 1231231, + 1231, + 112, + 12, + ExternalizedCheckpointSettings.externalizeCheckpoints(true), + new MemoryStateBackend(), + false); + + JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings); + assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge()); + assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm()); + assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger()); + assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval()); + assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout()); + assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints()); + assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints()); + assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints()); + assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation()); + assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce()); + assertNotNull(copy.getDefaultStateBackend()); + assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java deleted file mode 100644 index 2508d5c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobgraph.tasks; - -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.junit.Test; - -import java.util.Arrays; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public class JobSnapshottingSettingsTest { - - /** - * Tests that the settings are actually serializable. - */ - @Test - public void testIsJavaSerializable() throws Exception { - JobSnapshottingSettings settings = new JobSnapshottingSettings( - Arrays.asList(new JobVertexID(), new JobVertexID()), - Arrays.asList(new JobVertexID(), new JobVertexID()), - Arrays.asList(new JobVertexID(), new JobVertexID()), - 1231231, - 1231, - 112, - 12, - ExternalizedCheckpointSettings.externalizeCheckpoints(true), - new MemoryStateBackend(), - false); - - JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings); - assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge()); - assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm()); - assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger()); - assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval()); - assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout()); - assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints()); - assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints()); - assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints()); - assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation()); - assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce()); - assertNotNull(copy.getDefaultStateBackend()); - assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index dcf4722..6eacaac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -61,7 +61,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.leaderelection.LeaderElectionService; @@ -220,7 +220,7 @@ public class JobManagerHARecoveryTest { JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); List<JobVertexID> vertexId = Collections.singletonList(sourceJobVertex.getID()); - jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( vertexId, vertexId, vertexId, http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 4dec84b..d7fc71d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -48,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; @@ -821,7 +821,7 @@ public class JobManagerTest extends TestLogger { JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); - JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings( Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), @@ -947,7 +947,7 @@ public class JobManagerTest extends TestLogger { JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); - JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings( Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), @@ -1053,7 +1053,7 @@ public class JobManagerTest extends TestLogger { JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); - JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings( Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), @@ -1156,7 +1156,7 @@ public class JobManagerTest extends TestLogger { JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); - JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings( Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), @@ -1203,7 +1203,7 @@ public class JobManagerTest extends TestLogger { JobGraph newJobGraph = new JobGraph("NewTestingJob", newSourceVertex); - JobSnapshottingSettings newSnapshottingSettings = new JobSnapshottingSettings( + JobCheckpointingSettings newSnapshottingSettings = new JobCheckpointingSettings( Collections.singletonList(newSourceVertex.getID()), Collections.singletonList(newSourceVertex.getID()), Collections.singletonList(newSourceVertex.getID()), http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index ba5f973..bdff401 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -228,7 +228,7 @@ public class JobSubmitTest { List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID()); JobGraph jg = new JobGraph("test job", jobVertex); - jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, + jg.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), null, true)); return jg; } http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 5374d01..ce8517e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedChec import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobSnapshottingSettings} +import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings} import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode} import org.apache.flink.runtime.jobmanager.Tasks._ import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup} @@ -827,7 +827,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) val jobGraph = new JobGraph(jobVertex) - jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), java.util.Collections.emptyList(), @@ -887,7 +887,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) val jobGraph = new JobGraph(jobVertex) - jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), java.util.Collections.emptyList(), @@ -955,7 +955,7 @@ class JobManagerITCase(_system: ActorSystem) val jobVertex = new JobVertex("Blocking vertex") jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable]) val jobGraph = new JobGraph(jobVertex) - jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( java.util.Collections.emptyList(), java.util.Collections.emptyList(), java.util.Collections.emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 794de5a..7d62273 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.util.TaskConfig; @@ -579,7 +579,7 @@ public class StreamingJobGraphGenerator { "exactly-once or at-least-once."); } - JobSnapshottingSettings settings = new JobSnapshottingSettings( + JobCheckpointingSettings settings = new JobCheckpointingSettings( triggerVertices, ackVertices, commitVertices, interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 6d2fcaa..2c71a07 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -118,7 +118,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph); JobGraph jobGraph = jobGraphGenerator.createJobGraph(); - JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings(); + JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings(); assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval()); }
