This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f80cfde34749e882f2171bb9ef070feb1624847 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Nov 2 16:58:53 2021 +0100 [FLINK-24749][coordination] Reuse CheckpointStatsTracker --- .../DefaultExecutionGraphBuilder.java | 13 +-- .../scheduler/DefaultExecutionGraphFactory.java | 15 ++- .../TestingDefaultExecutionGraphBuilder.java | 4 +- .../adaptive/AdaptiveSchedulerClusterITCase.java | 104 +++++++++++++++++++++ 4 files changed, 125 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index 4a61dd3..d1c6d6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.WebOptions; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; @@ -64,6 +63,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -93,7 +93,8 @@ public class DefaultExecutionGraphBuilder { ExecutionStateUpdateListener executionStateUpdateListener, long initializationTimestamp, VertexAttemptNumberStore vertexAttemptNumberStore, - VertexParallelismStore vertexParallelismStore) + VertexParallelismStore vertexParallelismStore, + Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory) throws JobExecutionException, JobException { checkNotNull(jobGraph, "job graph cannot be null"); @@ -204,12 +205,6 @@ public class DefaultExecutionGraphBuilder { if (isCheckpointingEnabled(jobGraph)) { JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings(); - // Maximum number of remembered checkpoints - int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); - - CheckpointStatsTracker checkpointStatsTracker = - new CheckpointStatsTracker(historySize, metrics); - // load the state backend from the application settings final StateBackend applicationConfiguredBackend; final SerializedValue<StateBackend> serializedAppConfigured = @@ -316,7 +311,7 @@ public class DefaultExecutionGraphBuilder { completedCheckpointStore, rootBackend, rootStorage, - checkpointStatsTracker, + checkpointStatsTrackerFactory.get(), checkpointsCleaner); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index f1721c7..0f49909 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -20,9 +20,11 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; @@ -38,12 +40,14 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.function.CachingSupplier; import org.slf4j.Logger; import java.util.HashSet; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; /** Default {@link ExecutionGraphFactory} implementation. */ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { @@ -58,6 +62,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { private final BlobWriter blobWriter; private final ShuffleMaster<?> shuffleMaster; private final JobMasterPartitionTracker jobMasterPartitionTracker; + private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory; public DefaultExecutionGraphFactory( Configuration configuration, @@ -80,6 +85,13 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { this.blobWriter = blobWriter; this.shuffleMaster = shuffleMaster; this.jobMasterPartitionTracker = jobMasterPartitionTracker; + this.checkpointStatsTrackerFactory = + new CachingSupplier<>( + () -> + new CheckpointStatsTracker( + configuration.getInteger( + WebOptions.CHECKPOINTS_HISTORY_SIZE), + jobManagerJobMetricGroup)); } @Override @@ -124,7 +136,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { executionStateUpdateListener, initializationTimestamp, vertexAttemptNumberStore, - vertexParallelismStore); + vertexParallelismStore, + checkpointStatsTrackerFactory); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index bde0cbb..edf85c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; @@ -185,6 +186,7 @@ public class TestingDefaultExecutionGraphBuilder { System.currentTimeMillis(), new DefaultVertexAttemptNumberStore(), Optional.ofNullable(vertexParallelismStore) - .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph))); + .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)), + () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup())); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java index 9865395..4d1d7e44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java @@ -25,11 +25,20 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable; @@ -44,8 +53,14 @@ import org.junit.Test; import java.io.IOException; import java.time.Duration; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertTrue; /** @@ -76,6 +91,8 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger { configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); configuration.set( JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(100L)); + // required for #testCheckpointStatsPersistedAcrossRescale + configuration.set(WebOptions.CHECKPOINTS_HISTORY_SIZE, Integer.MAX_VALUE); return configuration; } @@ -141,6 +158,93 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger { assertTrue(jobResultFuture.join().isSuccess()); } + @Test + public void testCheckpointStatsPersistedAcrossRescale() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + + JobVertex jobVertex = new JobVertex("jobVertex", JOB_VERTEX_ID); + jobVertex.setInvokableClass(CheckpointingNoOpInvokable.class); + jobVertex.setParallelism(PARALLELISM); + + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex); + jobGraph.setSnapshotSettings( + new JobCheckpointingSettings( + CheckpointCoordinatorConfiguration.builder() + .setCheckpointInterval(100) + .setCheckpointTimeout(1000) + .build(), + null)); + + miniCluster.submitJob(jobGraph).join(); + + // wait until some checkpoints have been completed + CommonTestUtils.waitUntilCondition( + () -> + miniCluster + .getExecutionGraph(jobGraph.getJobID()) + .thenApply( + eg -> + eg.getCheckpointStatsSnapshot() + .getCounts() + .getNumberOfCompletedCheckpoints() + > 0) + .get(), + Deadline.fromNow(Duration.ofHours(1))); + + miniCluster.terminateTaskManager(0); + + waitUntilParallelismForVertexReached( + jobGraph.getJobID(), + JOB_VERTEX_ID, + NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS - 1)); + + // check that the very first checkpoint is still accessible + final List<AbstractCheckpointStats> checkpointHistory = + miniCluster + .getExecutionGraph(jobGraph.getJobID()) + .thenApply( + eg -> eg.getCheckpointStatsSnapshot().getHistory().getCheckpoints()) + .get(); + assertThat(checkpointHistory.get(checkpointHistory.size() - 1).getCheckpointId(), is(1L)); + } + + /** An invokable that doesn't do anything interesting, but does support checkpointing. */ + public static class CheckpointingNoOpInvokable extends AbstractInvokable { + + private static final long CANCEL_SIGNAL = -2L; + private final BlockingQueue<Long> checkpointsToConfirm = new ArrayBlockingQueue<>(1); + + public CheckpointingNoOpInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + long signal = checkpointsToConfirm.take(); + while (signal != CANCEL_SIGNAL) { + getEnvironment().acknowledgeCheckpoint(signal, new CheckpointMetrics()); + signal = checkpointsToConfirm.take(); + } + } + + @Override + public void cancel() throws Exception { + checkpointsToConfirm.add(CANCEL_SIGNAL); + } + + @Override + public CompletableFuture<Boolean> triggerCheckpointAsync( + CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { + checkpointsToConfirm.add(checkpointMetaData.getCheckpointId()); + return CompletableFuture.completedFuture(true); + } + + @Override + public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) { + return CompletableFuture.completedFuture(null); + } + } + private JobGraph createBlockingJobGraph(int parallelism) throws IOException { final JobVertex blockingOperator = new JobVertex("Blocking operator", JOB_VERTEX_ID);
