This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1538eb23930fedc5e7e0a1334a15e7ec86c79a06 Author: Till Rohrmann <[email protected]> AuthorDate: Mon Jan 4 15:35:41 2021 +0100 [FLINK-20846] Move checkpoint service shut down out of CheckpointCoordinator By moving the shut down of checkpoint services out of the CheckpointCoordinator, it is now possible to reuse these services across different CheckpointCoordinators. This closes #14553. --- .../runtime/checkpoint/CheckpointCoordinator.java | 8 -- .../runtime/executiongraph/ExecutionGraph.java | 5 +- .../executiongraph/ExecutionGraphBuilder.java | 5 +- .../flink/runtime/scheduler/SchedulerBase.java | 118 ++++++++++++++++----- .../checkpoint/CheckpointCoordinatorTest.java | 6 +- .../TestingExecutionGraphBuilder.java | 2 + 6 files changed, 107 insertions(+), 37 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0da6f00..558f16a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -423,14 +423,6 @@ public class CheckpointCoordinator { CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); // clear queued requests and in-flight checkpoints abortPendingAndQueuedCheckpoints(reason); - - completedCheckpointStore.shutdown( - jobStatus, - checkpointsCleaner, - () -> { - // don't schedule anything on shutdown - }); - checkpointIdCounter.shutdown(jobStatus); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3b93800..13a4c03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -416,7 +416,8 @@ public class ExecutionGraph implements AccessExecutionGraph { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, - CheckpointStatsTracker statsTracker) { + CheckpointStatsTracker statsTracker, + CheckpointsCleaner checkpointsCleaner) { checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); checkState(checkpointCoordinator == null, "checkpointing already enabled"); @@ -470,7 +471,7 @@ public class ExecutionGraph implements AccessExecutionGraph { checkpointStore, checkpointStateBackend, ioExecutor, - new CheckpointsCleaner(), + checkpointsCleaner, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index d63b045..86c26e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; @@ -80,6 +81,7 @@ public class ExecutionGraphBuilder { SlotProvider slotProvider, ClassLoader classLoader, CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, Time rpcTimeout, MetricGroup metrics, @@ -289,7 +291,8 @@ public class ExecutionGraphBuilder { checkpointIdCounter, completedCheckpointStore, rootBackend, - checkpointStatsTracker); + checkpointStatsTracker, + checkpointsCleaner); } // create all the metrics for the Execution Graph diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 15ac47a..cf93cdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore; @@ -157,6 +158,12 @@ public abstract class SchedulerBase implements SchedulerNG { private final CheckpointRecoveryFactory checkpointRecoveryFactory; + private final CompletedCheckpointStore completedCheckpointStore; + + private final CheckpointsCleaner checkpointsCleaner; + + private final CheckpointIDCounter checkpointIdCounter; + private final Time rpcTimeout; private final BlobWriter blobWriter; @@ -211,14 +218,23 @@ public abstract class SchedulerBase implements SchedulerNG { this.slotRequestTimeout = checkNotNull(slotRequestTimeout); this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + this.checkpointsCleaner = new CheckpointsCleaner(); + this.completedCheckpointStore = createCompletedCheckpointStore(); + this.checkpointIdCounter = createCheckpointIdCounter(); + this.executionGraph = createAndRestoreExecutionGraph( jobManagerJobMetricGroup, + completedCheckpointStore, + checkpointsCleaner, + checkpointIdCounter, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp); + registerShutDownCheckpointServicesOnExecutionGraphTermination(executionGraph); + this.schedulingTopology = executionGraph.getSchedulingTopology(); stateLocationRetriever = @@ -230,8 +246,78 @@ public abstract class SchedulerBase implements SchedulerNG { this.coordinatorMap = createCoordinatorMap(); } + private void registerShutDownCheckpointServicesOnExecutionGraphTermination( + ExecutionGraph executionGraph) { + FutureUtils.assertNoException( + executionGraph.getTerminationFuture().thenAccept(this::shutDownCheckpointServices)); + } + + private void shutDownCheckpointServices(JobStatus jobStatus) { + Exception exception = null; + + try { + completedCheckpointStore.shutdown( + jobStatus, + checkpointsCleaner, + () -> { + // don't schedule anything on shutdown + }); + } catch (Exception e) { + exception = e; + } + + try { + checkpointIdCounter.shutdown(jobStatus); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + log.error("Error while shutting down checkpoint services.", exception); + } + } + + private CompletedCheckpointStore createCompletedCheckpointStore() throws JobExecutionException { + final JobID jobId = jobGraph.getJobID(); + if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { + try { + return ExecutionGraphBuilder.createCompletedCheckpointStore( + jobMasterConfiguration, + userCodeLoader, + checkpointRecoveryFactory, + log, + jobId); + } catch (Exception e) { + throw new JobExecutionException( + jobId, + "Failed to initialize high-availability completed checkpoint store", + e); + } + } else { + return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE; + } + } + + private CheckpointIDCounter createCheckpointIdCounter() throws JobExecutionException { + final JobID jobId = jobGraph.getJobID(); + if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { + try { + return ExecutionGraphBuilder.createCheckpointIdCounter( + checkpointRecoveryFactory, jobId); + } catch (Exception e) { + throw new JobExecutionException( + jobId, "Failed to initialize high-availability checkpoint id counter", e); + } + } else { + return DeactivatedCheckpointIDCounter.INSTANCE; + } + } + private ExecutionGraph createAndRestoreExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, + CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, + CheckpointIDCounter checkpointIdCounter, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, @@ -241,6 +327,9 @@ public abstract class SchedulerBase implements SchedulerNG { ExecutionGraph newExecutionGraph = createExecutionGraph( currentJobManagerJobMetricGroup, + completedCheckpointStore, + checkpointsCleaner, + checkpointIdCounter, shuffleMaster, partitionTracker, executionDeploymentTracker, @@ -265,6 +354,9 @@ public abstract class SchedulerBase implements SchedulerNG { private ExecutionGraph createExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, + CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, + CheckpointIDCounter checkpointIdCounter, ShuffleMaster<?> shuffleMaster, final JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, @@ -280,31 +372,6 @@ public abstract class SchedulerBase implements SchedulerNG { } }; - final JobID jobId = jobGraph.getJobID(); - final CheckpointIDCounter checkpointIdCounter; - final CompletedCheckpointStore completedCheckpointStore; - - if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { - try { - checkpointIdCounter = - ExecutionGraphBuilder.createCheckpointIdCounter( - checkpointRecoveryFactory, jobId); - completedCheckpointStore = - ExecutionGraphBuilder.createCompletedCheckpointStore( - jobMasterConfiguration, - userCodeLoader, - checkpointRecoveryFactory, - log, - jobId); - } catch (Exception e) { - throw new JobExecutionException( - jobId, "Failed to initialize high-availability checkpoint handler", e); - } - } else { - checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE; - completedCheckpointStore = DeactivatedCheckpointCompletedCheckpointStore.INSTANCE; - } - return ExecutionGraphBuilder.buildGraph( jobGraph, jobMasterConfiguration, @@ -313,6 +380,7 @@ public abstract class SchedulerBase implements SchedulerNG { slotProvider, userCodeLoader, completedCheckpointStore, + checkpointsCleaner, checkpointIdCounter, rpcTimeout, currentJobManagerJobMetricGroup, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index dd26126..c7d66fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -983,6 +983,8 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); // set up the coordinator and validate the initial state + final StandaloneCompletedCheckpointStore completedCheckpointStore = + new StandaloneCompletedCheckpointStore(10); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() .setJobId(jobId) @@ -995,7 +997,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .setTasksToWaitFor( new ExecutionVertex[] {ackVertex1, ackVertex2, ackVertex3}) .setTasksToCommitTo(new ExecutionVertex[] {commitVertex}) - .setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(10)) + .setCompletedCheckpointStore(completedCheckpointStore) .setTimer(manuallyTriggeredScheduledExecutor) .build(); @@ -1168,6 +1170,8 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(subtaskState13, times(1)).discardState(); checkpointCoordinator.shutdown(JobStatus.FINISHED); + completedCheckpointStore.shutdown( + JobStatus.FINISHED, new CheckpointsCleaner(), () -> {}); // validate that the states in the second checkpoint have been discarded verify(subtaskState21, times(1)).discardState(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java index 0add2f6..59360ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; @@ -175,6 +176,7 @@ public class TestingExecutionGraphBuilder { slotProvider, userClassLoader, completedCheckpointStore, + new CheckpointsCleaner(), checkpointIdCounter, rpcTimeout, metricGroup,
