This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb94512ae02df71cf6ef9255e96852b3220f5c9a Author: Till Rohrmann <[email protected]> AuthorDate: Tue Mar 16 11:57:11 2021 +0100 [hotfix] Factor ExecutionGraph creation out into ExecutionGraphFactory Using the ExecutionGraphFactory for creating and restoring an ExecutionGraph allows to share the functionality between the DefaultScheduler and the AdaptiveScheduler. --- .../scheduler/DefaultExecutionGraphFactory.java | 169 +++++++++++++++++++++ .../flink/runtime/scheduler/DefaultScheduler.java | 24 +-- .../runtime/scheduler/DefaultSchedulerFactory.java | 22 ++- .../runtime/scheduler/ExecutionGraphFactory.java | 59 +++++++ .../flink/runtime/scheduler/SchedulerBase.java | 149 ++---------------- .../scheduler/adaptive/AdaptiveScheduler.java | 123 ++------------- .../adaptive/AdaptiveSchedulerFactory.java | 24 ++- .../DefaultExecutionGraphFactoryTest.java | 156 +++++++++++++++++++ .../runtime/scheduler/DefaultSchedulerTest.java | 88 ----------- .../runtime/scheduler/SchedulerTestingUtils.java | 22 ++- .../adaptive/AdaptiveSchedulerBuilder.java | 24 ++- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 144 ------------------ 12 files changed, 481 insertions(+), 523 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java new file mode 100644 index 0000000..3dbb80d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener; +import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore; +import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.shuffle.ShuffleMaster; + +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +/** Default {@link ExecutionGraphFactory} implementation. */ +public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { + + private final Configuration configuration; + private final ClassLoader userCodeClassLoader; + private final ExecutionDeploymentTracker executionDeploymentTracker; + private final ScheduledExecutorService futureExecutor; + private final Executor ioExecutor; + private final Time rpcTimeout; + private final JobManagerJobMetricGroup jobManagerJobMetricGroup; + private final BlobWriter blobWriter; + private final ShuffleMaster<?> shuffleMaster; + private final JobMasterPartitionTracker jobMasterPartitionTracker; + + public DefaultExecutionGraphFactory( + Configuration configuration, + ClassLoader userCodeClassLoader, + ExecutionDeploymentTracker executionDeploymentTracker, + ScheduledExecutorService futureExecutor, + Executor ioExecutor, + Time rpcTimeout, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + BlobWriter blobWriter, + ShuffleMaster<?> shuffleMaster, + JobMasterPartitionTracker jobMasterPartitionTracker) { + this.configuration = configuration; + this.userCodeClassLoader = userCodeClassLoader; + this.executionDeploymentTracker = executionDeploymentTracker; + this.futureExecutor = futureExecutor; + this.ioExecutor = ioExecutor; + this.rpcTimeout = rpcTimeout; + this.jobManagerJobMetricGroup = jobManagerJobMetricGroup; + this.blobWriter = blobWriter; + this.shuffleMaster = shuffleMaster; + this.jobMasterPartitionTracker = jobMasterPartitionTracker; + } + + @Override + public ExecutionGraph createAndRestoreExecutionGraph( + JobGraph jobGraph, + CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, + CheckpointIDCounter checkpointIdCounter, + TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, + long initializationTimestamp, + VertexAttemptNumberStore vertexAttemptNumberStore, + Logger log) + throws Exception { + ExecutionDeploymentListener executionDeploymentListener = + new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker); + ExecutionStateUpdateListener executionStateUpdateListener = + (execution, newState) -> { + if (newState.isTerminal()) { + executionDeploymentTracker.stopTrackingDeploymentOf(execution); + } + }; + + final ExecutionGraph newExecutionGraph = + DefaultExecutionGraphBuilder.buildGraph( + jobGraph, + configuration, + futureExecutor, + ioExecutor, + userCodeClassLoader, + completedCheckpointStore, + checkpointsCleaner, + checkpointIdCounter, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + log, + shuffleMaster, + jobMasterPartitionTracker, + partitionLocationConstraint, + executionDeploymentListener, + executionStateUpdateListener, + initializationTimestamp, + vertexAttemptNumberStore); + + final CheckpointCoordinator checkpointCoordinator = + newExecutionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator != null) { + // check whether we find a valid checkpoint + if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( + new HashSet<>(newExecutionGraph.getAllVertices().values()))) { + + // check whether we can restore from a savepoint + tryRestoreExecutionGraphFromSavepoint( + newExecutionGraph, jobGraph.getSavepointRestoreSettings()); + } + } + + return newExecutionGraph; + } + + /** + * Tries to restore the given {@link ExecutionGraph} from the provided {@link + * SavepointRestoreSettings}, iff checkpointing is enabled. + * + * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored + * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about + * the savepoint to restore from + * @throws Exception if the {@link ExecutionGraph} could not be restored + */ + private void tryRestoreExecutionGraphFromSavepoint( + ExecutionGraph executionGraphToRestore, + SavepointRestoreSettings savepointRestoreSettings) + throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = + executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + checkpointCoordinator.restoreSavepoint( + savepointRestoreSettings.getRestorePath(), + savepointRestoreSettings.allowNonRestoredState(), + executionGraphToRestore.getAllVertices(), + userCodeClassLoader); + } + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 394d1cf..be29192 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -20,9 +20,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -38,13 +36,11 @@ import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHa import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; @@ -52,7 +48,6 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; -import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; @@ -70,7 +65,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; @@ -106,25 +100,20 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio final Executor ioExecutor, final Configuration jobMasterConfiguration, final Consumer<ComponentMainThreadExecutor> startUpAction, - final ScheduledExecutorService futureExecutor, final ScheduledExecutor delayExecutor, final ClassLoader userCodeLoader, final CheckpointRecoveryFactory checkpointRecoveryFactory, - final Time rpcTimeout, - final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final ShuffleMaster<?> shuffleMaster, - final JobMasterPartitionTracker partitionTracker, final SchedulingStrategyFactory schedulingStrategyFactory, final FailoverStrategy.Factory failoverStrategyFactory, final RestartBackoffTimeStrategy restartBackoffTimeStrategy, final ExecutionVertexOperations executionVertexOperations, final ExecutionVertexVersioner executionVertexVersioner, final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, - final ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, final ComponentMainThreadExecutor mainThreadExecutor, - final JobStatusListener jobStatusListener) + final JobStatusListener jobStatusListener, + final ExecutionGraphFactory executionGraphFactory) throws Exception { super( @@ -132,19 +121,14 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio jobGraph, ioExecutor, jobMasterConfiguration, - futureExecutor, userCodeLoader, checkpointRecoveryFactory, - rpcTimeout, - blobWriter, jobManagerJobMetricGroup, - shuffleMaster, - partitionTracker, executionVertexVersioner, - executionDeploymentTracker, initializationTimestamp, mainThreadExecutor, - jobStatusListener); + jobStatusListener, + executionGraphFactory); this.log = log; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index 217008f..cf00887 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -101,31 +101,39 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { jobGraph.getName(), jobGraph.getJobID()); + final ExecutionGraphFactory executionGraphFactory = + new DefaultExecutionGraphFactory( + jobMasterConfiguration, + userCodeLoader, + executionDeploymentTracker, + futureExecutor, + ioExecutor, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + shuffleMaster, + partitionTracker); + return new DefaultScheduler( log, jobGraph, ioExecutor, jobMasterConfiguration, schedulerComponents.getStartUpAction(), - futureExecutor, new ScheduledExecutorServiceAdapter(futureExecutor), userCodeLoader, checkpointRecoveryFactory, - rpcTimeout, - blobWriter, jobManagerJobMetricGroup, - shuffleMaster, - partitionTracker, schedulerComponents.getSchedulingStrategyFactory(), FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration), restartBackoffTimeStrategy, new DefaultExecutionVertexOperations(), new ExecutionVertexVersioner(), schedulerComponents.getAllocatorFactory(), - executionDeploymentTracker, initializationTimestamp, mainThreadExecutor, - jobStatusListener); + jobStatusListener, + executionGraphFactory); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java new file mode 100644 index 0000000..f7fed50 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.slf4j.Logger; + +/** Factory for creating an {@link ExecutionGraph}. */ +public interface ExecutionGraphFactory { + + /** + * Create and restore {@link ExecutionGraph} from the given {@link JobGraph} and services. + * + * @param jobGraph jobGraph to initialize the ExecutionGraph with + * @param completedCheckpointStore completedCheckpointStore to pass to the CheckpointCoordinator + * @param checkpointsCleaner checkpointsCleaner to pass to the CheckpointCoordinator + * @param checkpointIdCounter checkpointIdCounter to pass to the CheckpointCoordinator + * @param partitionLocationConstraint partitionLocationConstraint for this job + * @param initializationTimestamp initializationTimestamp when the ExecutionGraph was created + * @param vertexAttemptNumberStore vertexAttemptNumberStore keeping information about the vertex + * attempts of previous runs + * @param log log to use for logging + * @return restored {@link ExecutionGraph} + * @throws Exception if the {@link ExecutionGraph} could not be created and restored + */ + ExecutionGraph createAndRestoreExecutionGraph( + JobGraph jobGraph, + CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, + CheckpointIDCounter checkpointIdCounter, + TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, + long initializationTimestamp, + VertexAttemptNumberStore vertexAttemptNumberStore, + Logger log) + throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 87a1354..3ddb55f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -22,13 +22,10 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.queryablestate.KvStateID; -import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; -import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; @@ -40,37 +37,29 @@ import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore; import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; -import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; -import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; @@ -89,7 +78,6 @@ import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTer import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; -import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.util.IntArrayList; import org.apache.flink.util.ExceptionUtils; @@ -115,7 +103,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -137,24 +124,12 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling protected final InputsLocationsRetriever inputsLocationsRetriever; - private final Executor ioExecutor; - - private final Configuration jobMasterConfiguration; - - private final ScheduledExecutorService futureExecutor; - - private final ClassLoader userCodeLoader; - private final CompletedCheckpointStore completedCheckpointStore; private final CheckpointsCleaner checkpointsCleaner; private final CheckpointIDCounter checkpointIdCounter; - private final Time rpcTimeout; - - private final BlobWriter blobWriter; - private final JobManagerJobMetricGroup jobManagerJobMetricGroup; protected final ExecutionVertexVersioner executionVertexVersioner; @@ -169,35 +144,27 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling private final List<ErrorInfo> taskFailureHistory = new ArrayList<>(); + private final ExecutionGraphFactory executionGraphFactory; + public SchedulerBase( final Logger log, final JobGraph jobGraph, final Executor ioExecutor, final Configuration jobMasterConfiguration, - final ScheduledExecutorService futureExecutor, final ClassLoader userCodeLoader, final CheckpointRecoveryFactory checkpointRecoveryFactory, - final Time rpcTimeout, - final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final ShuffleMaster<?> shuffleMaster, - final JobMasterPartitionTracker partitionTracker, final ExecutionVertexVersioner executionVertexVersioner, - final ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, final ComponentMainThreadExecutor mainThreadExecutor, - final JobStatusListener jobStatusListener) + final JobStatusListener jobStatusListener, + final ExecutionGraphFactory executionGraphFactory) throws Exception { this.log = checkNotNull(log); this.jobGraph = checkNotNull(jobGraph); - this.ioExecutor = checkNotNull(ioExecutor); - this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration); - this.futureExecutor = checkNotNull(futureExecutor); - this.userCodeLoader = checkNotNull(userCodeLoader); - this.rpcTimeout = checkNotNull(rpcTimeout); + this.executionGraphFactory = executionGraphFactory; - this.blobWriter = checkNotNull(blobWriter); this.jobManagerJobMetricGroup = checkNotNull(jobManagerJobMetricGroup); this.executionVertexVersioner = checkNotNull(executionVertexVersioner); this.mainThreadExecutor = mainThreadExecutor; @@ -216,13 +183,9 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling this.executionGraph = createAndRestoreExecutionGraph( - jobManagerJobMetricGroup, completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, - checkNotNull(shuffleMaster), - checkNotNull(partitionTracker), - checkNotNull(executionDeploymentTracker), initializationTimestamp, mainThreadExecutor, jobStatusListener); @@ -273,42 +236,25 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling } private ExecutionGraph createAndRestoreExecutionGraph( - JobManagerJobMetricGroup currentJobManagerJobMetricGroup, CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, - ShuffleMaster<?> shuffleMaster, - JobMasterPartitionTracker partitionTracker, - ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener) throws Exception { - ExecutionGraph newExecutionGraph = - createExecutionGraph( - currentJobManagerJobMetricGroup, + final ExecutionGraph newExecutionGraph = + executionGraphFactory.createAndRestoreExecutionGraph( + jobGraph, completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, - shuffleMaster, - partitionTracker, - executionDeploymentTracker, - initializationTimestamp); - - final CheckpointCoordinator checkpointCoordinator = - newExecutionGraph.getCheckpointCoordinator(); - - if (checkpointCoordinator != null) { - // check whether we find a valid checkpoint - if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( - new HashSet<>(newExecutionGraph.getAllVertices().values()))) { - - // check whether we can restore from a savepoint - tryRestoreExecutionGraphFromSavepoint( - newExecutionGraph, jobGraph.getSavepointRestoreSettings()); - } - } + TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType( + jobGraph.getJobType()), + initializationTimestamp, + new DefaultVertexAttemptNumberStore(), + log); newExecutionGraph.setInternalTaskFailuresListener( new UpdateSchedulerNgOnInternalFailuresListener(this)); @@ -318,75 +264,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling return newExecutionGraph; } - private ExecutionGraph createExecutionGraph( - JobManagerJobMetricGroup currentJobManagerJobMetricGroup, - CompletedCheckpointStore completedCheckpointStore, - CheckpointsCleaner checkpointsCleaner, - CheckpointIDCounter checkpointIdCounter, - ShuffleMaster<?> shuffleMaster, - final JobMasterPartitionTracker partitionTracker, - ExecutionDeploymentTracker executionDeploymentTracker, - long initializationTimestamp) - throws JobExecutionException, JobException { - - ExecutionDeploymentListener executionDeploymentListener = - new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker); - ExecutionStateUpdateListener executionStateUpdateListener = - (execution, newState) -> { - if (newState.isTerminal()) { - executionDeploymentTracker.stopTrackingDeploymentOf(execution); - } - }; - - return DefaultExecutionGraphBuilder.buildGraph( - jobGraph, - jobMasterConfiguration, - futureExecutor, - ioExecutor, - userCodeLoader, - completedCheckpointStore, - checkpointsCleaner, - checkpointIdCounter, - rpcTimeout, - currentJobManagerJobMetricGroup, - blobWriter, - log, - shuffleMaster, - partitionTracker, - TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType( - jobGraph.getJobType()), - executionDeploymentListener, - executionStateUpdateListener, - initializationTimestamp, - new DefaultVertexAttemptNumberStore()); - } - - /** - * Tries to restore the given {@link ExecutionGraph} from the provided {@link - * SavepointRestoreSettings}. - * - * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored - * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about - * the savepoint to restore from - * @throws Exception if the {@link ExecutionGraph} could not be restored - */ - private void tryRestoreExecutionGraphFromSavepoint( - ExecutionGraph executionGraphToRestore, - SavepointRestoreSettings savepointRestoreSettings) - throws Exception { - if (savepointRestoreSettings.restoreSavepoint()) { - final CheckpointCoordinator checkpointCoordinator = - executionGraphToRestore.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - checkpointCoordinator.restoreSavepoint( - savepointRestoreSettings.getRestorePath(), - savepointRestoreSettings.allowNonRestoredState(), - executionGraphToRestore.getAllVertices(), - userCodeLoader); - } - } - } - protected void resetForNewExecutions(final Collection<ExecutionVertexID> vertices) { vertices.stream() .map(this::getExecutionVertex) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index badad27..a7783db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -21,15 +21,12 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; -import org.apache.flink.runtime.blob.BlobWriter; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -45,20 +42,16 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; @@ -67,10 +60,7 @@ import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; -import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; -import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.SlotInfo; @@ -88,6 +78,7 @@ import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; @@ -99,7 +90,6 @@ import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController; -import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.ExceptionUtils; @@ -118,13 +108,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -164,15 +152,8 @@ public class AdaptiveScheduler private final long initializationTimestamp; - private final Configuration configuration; - private final ScheduledExecutorService futureExecutor; private final Executor ioExecutor; private final ClassLoader userCodeClassLoader; - private final Time rpcTimeout; - private final BlobWriter blobWriter; - private final ShuffleMaster<?> shuffleMaster; - private final JobMasterPartitionTracker partitionTracker; - private final ExecutionDeploymentTracker executionDeploymentTracker; private final JobManagerJobMetricGroup jobManagerJobMetricGroup; private final CompletedCheckpointStore completedCheckpointStore; @@ -194,6 +175,8 @@ public class AdaptiveScheduler private final Duration resourceTimeout; + private final ExecutionGraphFactory executionGraphFactory; + private State state = new Created(this, LOG); private boolean isTransitioningState = false; @@ -208,21 +191,16 @@ public class AdaptiveScheduler Configuration configuration, DeclarativeSlotPool declarativeSlotPool, SlotAllocator slotAllocator, - ScheduledExecutorService futureExecutor, Executor ioExecutor, ClassLoader userCodeClassLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, - Time rpcTimeout, - BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, - ShuffleMaster<?> shuffleMaster, - JobMasterPartitionTracker partitionTracker, RestartBackoffTimeStrategy restartBackoffTimeStrategy, - ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) + JobStatusListener jobStatusListener, + ExecutionGraphFactory executionGraphFactory) throws JobExecutionException { ensureFullyPipelinedStreamingJob(jobGraph); @@ -230,16 +208,9 @@ public class AdaptiveScheduler this.jobInformation = new JobGraphJobInformation(jobGraph); this.declarativeSlotPool = declarativeSlotPool; this.initializationTimestamp = initializationTimestamp; - this.configuration = configuration; - this.futureExecutor = futureExecutor; this.ioExecutor = ioExecutor; this.userCodeClassLoader = userCodeClassLoader; - this.rpcTimeout = rpcTimeout; - this.blobWriter = blobWriter; - this.shuffleMaster = shuffleMaster; - this.partitionTracker = partitionTracker; this.restartBackoffTimeStrategy = restartBackoffTimeStrategy; - this.executionDeploymentTracker = executionDeploymentTracker; this.jobManagerJobMetricGroup = jobManagerJobMetricGroup; this.fatalErrorHandler = fatalErrorHandler; this.completedCheckpointStore = @@ -265,6 +236,8 @@ public class AdaptiveScheduler this.resourceTimeout = configuration.get(JobManagerOptions.RESOURCE_WAIT_TIMEOUT); + this.executionGraphFactory = executionGraphFactory; + registerMetrics(); } @@ -794,79 +767,15 @@ public class AdaptiveScheduler @Nonnull private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph) throws Exception { - ExecutionDeploymentListener executionDeploymentListener = - new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker); - ExecutionStateUpdateListener executionStateUpdateListener = - (execution, newState) -> { - if (newState.isTerminal()) { - executionDeploymentTracker.stopTrackingDeploymentOf(execution); - } - }; - - final ExecutionGraph newExecutionGraph = - DefaultExecutionGraphBuilder.buildGraph( - adjustedJobGraph, - configuration, - futureExecutor, - ioExecutor, - userCodeClassLoader, - completedCheckpointStore, - checkpointsCleaner, - checkpointIdCounter, - rpcTimeout, - jobManagerJobMetricGroup, - blobWriter, - LOG, - shuffleMaster, - partitionTracker, - TaskDeploymentDescriptorFactory.PartitionLocationConstraint - .MUST_BE_KNOWN, // AdaptiveScheduler only supports streaming jobs - executionDeploymentListener, - executionStateUpdateListener, - initializationTimestamp, - vertexAttemptNumberStore); - - final CheckpointCoordinator checkpointCoordinator = - newExecutionGraph.getCheckpointCoordinator(); - - if (checkpointCoordinator != null) { - // check whether we find a valid checkpoint - if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( - new HashSet<>(newExecutionGraph.getAllVertices().values()))) { - - // check whether we can restore from a savepoint - tryRestoreExecutionGraphFromSavepoint( - newExecutionGraph, adjustedJobGraph.getSavepointRestoreSettings()); - } - } - - return newExecutionGraph; - } - - /** - * Tries to restore the given {@link ExecutionGraph} from the provided {@link - * SavepointRestoreSettings}, iff checkpointing is enabled. - * - * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored - * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about - * the savepoint to restore from - * @throws Exception if the {@link ExecutionGraph} could not be restored - */ - private void tryRestoreExecutionGraphFromSavepoint( - ExecutionGraph executionGraphToRestore, - SavepointRestoreSettings savepointRestoreSettings) - throws Exception { - if (savepointRestoreSettings.restoreSavepoint()) { - final CheckpointCoordinator checkpointCoordinator = - executionGraphToRestore.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - checkpointCoordinator.restoreSavepoint( - savepointRestoreSettings.getRestorePath(), - savepointRestoreSettings.allowNonRestoredState(), - executionGraphToRestore.getAllVertices(), - userCodeClassLoader); - } - } + return executionGraphFactory.createAndRestoreExecutionGraph( + adjustedJobGraph, + completedCheckpointStore, + checkpointsCleaner, + checkpointIdCounter, + TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, + initializationTimestamp, + vertexAttemptNumberStore, + LOG); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java index 278baa0..4c790cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java @@ -34,6 +34,8 @@ import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerNGFactory; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator; @@ -92,26 +94,34 @@ public class AdaptiveSchedulerFactory implements SchedulerNGFactory { final SlotSharingSlotAllocator slotAllocator = createSlotSharingSlotAllocator(declarativeSlotPool); + final ExecutionGraphFactory executionGraphFactory = + new DefaultExecutionGraphFactory( + jobMasterConfiguration, + userCodeLoader, + executionDeploymentTracker, + futureExecutor, + ioExecutor, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + shuffleMaster, + partitionTracker); + return new AdaptiveScheduler( jobGraph, jobMasterConfiguration, declarativeSlotPool, slotAllocator, - futureExecutor, ioExecutor, userCodeLoader, checkpointRecoveryFactory, - rpcTimeout, - blobWriter, jobManagerJobMetricGroup, - shuffleMaster, - partitionTracker, restartBackoffTimeStrategy, - executionDeploymentTracker, initializationTimestamp, mainThreadExecutor, fatalErrorHandler, - jobStatusListener); + jobStatusListener, + executionGraphFactory); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java new file mode 100644 index 0000000..2a57d04 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; +import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore; +import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.TestUtils; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; + +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** Tests for the {@link DefaultExecutionGraphFactory}. */ +public class DefaultExecutionGraphFactoryTest extends TestLogger { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Test + public void testRestoringModifiedJobFromSavepointFails() throws Exception { + final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(false, 42L); + + final ExecutionGraphFactory executionGraphFactory = createExecutionGraphFactory(); + + try { + executionGraphFactory.createAndRestoreExecutionGraph( + jobGraphWithNewOperator, + new StandaloneCompletedCheckpointStore(1), + new CheckpointsCleaner(), + new StandaloneCheckpointIDCounter(), + TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, + 0L, + new DefaultVertexAttemptNumberStore(), + log); + fail("Expected ExecutionGraph creation to fail because of non restored state."); + } catch (Exception e) { + assertThat( + e, FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint")); + } + } + + @Test + public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() + throws Exception { + // create savepoint data + final long savepointId = 42L; + final JobGraph jobGraphWithNewOperator = createJobGraphWithSavepoint(true, savepointId); + + final ExecutionGraphFactory executionGraphFactory = createExecutionGraphFactory(); + + final StandaloneCompletedCheckpointStore completedCheckpointStore = + new StandaloneCompletedCheckpointStore(1); + executionGraphFactory.createAndRestoreExecutionGraph( + jobGraphWithNewOperator, + completedCheckpointStore, + new CheckpointsCleaner(), + new StandaloneCheckpointIDCounter(), + TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, + 0L, + new DefaultVertexAttemptNumberStore(), + log); + + final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint(false); + + MatcherAssert.assertThat(savepoint, notNullValue()); + + MatcherAssert.assertThat(savepoint.getCheckpointID(), Matchers.is(savepointId)); + } + + @Nonnull + private ExecutionGraphFactory createExecutionGraphFactory() { + final ExecutionGraphFactory executionGraphFactory = + new DefaultExecutionGraphFactory( + new Configuration(), + ClassLoader.getSystemClassLoader(), + new DefaultExecutionDeploymentTracker(), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + Time.milliseconds(0L), + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + VoidBlobWriter.getInstance(), + NettyShuffleMaster.INSTANCE, + NoOpJobMasterPartitionTracker.INSTANCE); + return executionGraphFactory; + } + + @Nonnull + private JobGraph createJobGraphWithSavepoint(boolean allowNonRestoredState, long savepointId) + throws IOException { + // create savepoint data + final OperatorID operatorID = new OperatorID(); + final File savepointFile = + TestUtils.createSavepointWithOperatorState( + TEMPORARY_FOLDER.newFile(), savepointId, operatorID); + + // set savepoint settings which don't allow non restored state + final SavepointRestoreSettings savepointRestoreSettings = + SavepointRestoreSettings.forPath( + savepointFile.getAbsolutePath(), allowNonRestoredState); + + // create a new operator + final JobVertex jobVertex = new JobVertex("New operator"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(1); + + // this test will fail in the end due to the previously created Savepoint having a state for + // a given OperatorID that does not match any operator of the newly created JobGraph + return TestUtils.createJobGraphFromJobVerticesWithCheckpointing( + savepointRestoreSettings, jobVertex); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 3be6b25..e924538 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -24,10 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; @@ -49,12 +45,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.TestUtils; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; @@ -72,7 +65,6 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import org.apache.flink.shaded.guava18.com.google.common.collect.Range; -import org.hamcrest.MatcherAssert; import org.hamcrest.core.Is; import org.junit.After; import org.junit.Before; @@ -80,7 +72,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -94,7 +85,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator; @@ -1141,84 +1131,6 @@ public class DefaultSchedulerTest extends TestLogger { lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); } - @Test - public void testRestoringModifiedJobFromSavepointFails() throws Exception { - // create savepoint data - final long savepointId = 42L; - final OperatorID operatorID = new OperatorID(); - final File savepointFile = - TestUtils.createSavepointWithOperatorState( - TEMPORARY_FOLDER.newFile(), savepointId, operatorID); - - // set savepoint settings which don't allow non restored state - final SavepointRestoreSettings savepointRestoreSettings = - SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false); - - // create a new operator - final JobVertex jobVertex = new JobVertex("New operator"); - jobVertex.setInvokableClass(NoOpInvokable.class); - - // this test will fail in the end due to the previously created Savepoint having a state for - // a given OperatorID that does not match any operator of the newly created JobGraph - final JobGraph jobGraphWithNewOperator = - TestUtils.createJobGraphFromJobVerticesWithCheckpointing( - savepointRestoreSettings, jobVertex); - - try { - // creating the DefaultScheduler should try to restore the ExecutionGraph - SchedulerTestingUtils.newSchedulerBuilder( - jobGraphWithNewOperator, - ComponentMainThreadExecutorServiceAdapter.forMainThread()) - .build(); - fail("Expected JobMaster creation to fail because of restore failure."); - } catch (IllegalStateException ise) { - assertThat( - ise, - FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint")); - } - } - - @Test - public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() - throws Exception { - // create savepoint data - final long savepointId = 42L; - final OperatorID operatorID = new OperatorID(); - final File savepointFile = - TestUtils.createSavepointWithOperatorState( - TEMPORARY_FOLDER.newFile(), savepointId, operatorID); - - // allow for non restored state - final SavepointRestoreSettings savepointRestoreSettings = - SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true); - - // create a new operator - final JobVertex jobVertex = new JobVertex("New operator"); - jobVertex.setInvokableClass(NoOpInvokable.class); - final JobGraph jobGraphWithNewOperator = - TestUtils.createJobGraphFromJobVerticesWithCheckpointing( - savepointRestoreSettings, jobVertex); - - final StandaloneCompletedCheckpointStore completedCheckpointStore = - new StandaloneCompletedCheckpointStore(1); - final CheckpointRecoveryFactory testingCheckpointRecoveryFactory = - useSameServicesForAllJobs( - completedCheckpointStore, new StandaloneCheckpointIDCounter()); - - SchedulerTestingUtils.newSchedulerBuilder( - jobGraphWithNewOperator, - ComponentMainThreadExecutorServiceAdapter.forMainThread()) - .setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory) - .build(); - - // creating the DefaultScheduler should have read the savepoint - final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint(false); - - MatcherAssert.assertThat(savepoint, notNullValue()); - - MatcherAssert.assertThat(savepoint.getCheckpointID(), is(savepointId)); - } - private static TaskExecutionState createFailedTaskExecutionState( ExecutionAttemptID executionAttemptID) { return new TaskExecutionState( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index 1901953..9f7d993 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -523,31 +523,39 @@ public class SchedulerTestingUtils { } public DefaultScheduler build() throws Exception { + final ExecutionGraphFactory executionGraphFactory = + new DefaultExecutionGraphFactory( + jobMasterConfiguration, + userCodeLoader, + new DefaultExecutionDeploymentTracker(), + futureExecutor, + ioExecutor, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + shuffleMaster, + partitionTracker); + return new DefaultScheduler( log, jobGraph, ioExecutor, jobMasterConfiguration, componentMainThreadExecutor -> {}, - futureExecutor, delayExecutor, userCodeLoader, checkpointRecoveryFactory, - rpcTimeout, - blobWriter, jobManagerJobMetricGroup, - shuffleMaster, - partitionTracker, schedulingStrategyFactory, failoverStrategyFactory, restartBackoffTimeStrategy, executionVertexOperations, executionVertexVersioner, executionSlotAllocatorFactory, - new DefaultExecutionDeploymentTracker(), System.currentTimeMillis(), mainThreadExecutor, - jobStatusListener); + jobStatusListener, + executionGraphFactory); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index e005170..9435ccd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -37,6 +37,8 @@ import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleMaster; @@ -182,6 +184,19 @@ public class AdaptiveSchedulerBuilder { } public AdaptiveScheduler build() throws Exception { + final ExecutionGraphFactory executionGraphFactory = + new DefaultExecutionGraphFactory( + jobMasterConfiguration, + userCodeLoader, + new DefaultExecutionDeploymentTracker(), + futureExecutor, + ioExecutor, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + shuffleMaster, + partitionTracker); + return new AdaptiveScheduler( jobGraph, jobMasterConfiguration, @@ -190,20 +205,15 @@ public class AdaptiveSchedulerBuilder { ? AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( declarativeSlotPool) : slotAllocator, - futureExecutor, ioExecutor, userCodeLoader, checkpointRecoveryFactory, - rpcTimeout, - blobWriter, jobManagerJobMetricGroup, - shuffleMaster, - partitionTracker, restartBackoffTimeStrategy, - new DefaultExecutionDeploymentTracker(), initializationTimestamp, mainThreadExecutor, fatalErrorHandler, - jobStatusListener); + jobStatusListener, + executionGraphFactory); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index c4ba684..9bbf7a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -22,16 +22,11 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore; @@ -54,13 +49,11 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.TestUtils; import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils; @@ -73,22 +66,17 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.operators.coordination.TestOperatorEvent; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; -import org.apache.flink.runtime.scheduler.GloballyTerminalJobStatusListener; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; -import org.apache.flink.util.function.FunctionUtils; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -97,7 +85,6 @@ import org.slf4j.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.Optional; @@ -111,13 +98,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith; -import static org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs; import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots; import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -764,135 +749,6 @@ public class AdaptiveSchedulerTest extends TestLogger { scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID()); } - @Test - public void testRestoringModifiedJobFromSavepointFails() throws Exception { - // create savepoint data - final long savepointId = 42L; - final OperatorID operatorID = new OperatorID(); - final File savepointFile = - TestUtils.createSavepointWithOperatorState( - TEMPORARY_FOLDER.newFile(), savepointId, operatorID); - - // set savepoint settings which don't allow non restored state - final SavepointRestoreSettings savepointRestoreSettings = - SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false); - - // create a new operator - final JobVertex jobVertex = new JobVertex("New operator"); - jobVertex.setInvokableClass(NoOpInvokable.class); - jobVertex.setParallelism(1); - - // this test will fail in the end due to the previously created Savepoint having a state for - // a given OperatorID that does not match any operator of the newly created JobGraph - final JobGraph jobGraphWithNewOperator = - TestUtils.createJobGraphFromJobVerticesWithCheckpointing( - savepointRestoreSettings, jobVertex); - - final DefaultDeclarativeSlotPool declarativeSlotPool = - createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID()); - - final GloballyTerminalJobStatusListener jobStatusListener = - new GloballyTerminalJobStatusListener(); - - final AdaptiveScheduler adaptiveScheduler = - new AdaptiveSchedulerBuilder( - jobGraphWithNewOperator, singleThreadMainThreadExecutor) - .setDeclarativeSlotPool(declarativeSlotPool) - .setJobStatusListener(jobStatusListener) - .build(); - - singleThreadMainThreadExecutor.execute( - () -> { - adaptiveScheduler.startScheduling(); - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1))); - }); - - assertThat(jobStatusListener.getTerminationFuture().join(), is(JobStatus.FAILED)); - - final ArchivedExecutionGraph archivedExecutionGraph = - CompletableFuture.supplyAsync( - () -> adaptiveScheduler.requestJob().getArchivedExecutionGraph(), - singleThreadMainThreadExecutor) - .join(); - - assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); - assertThat( - archivedExecutionGraph.getFailureInfo().getException(), - FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint")); - } - - @Test - public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() - throws Exception { - // create savepoint data - final long savepointId = 42L; - final OperatorID operatorID = new OperatorID(); - final File savepointFile = - TestUtils.createSavepointWithOperatorState( - TEMPORARY_FOLDER.newFile(), savepointId, operatorID); - - // allow for non restored state - final SavepointRestoreSettings savepointRestoreSettings = - SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true); - - // create a new operator - final JobVertex jobVertex = new JobVertex("New operator"); - jobVertex.setInvokableClass(NoOpInvokable.class); - jobVertex.setParallelism(1); - - final JobGraph jobGraphWithNewOperator = - TestUtils.createJobGraphFromJobVerticesWithCheckpointing( - savepointRestoreSettings, jobVertex); - - final StandaloneCompletedCheckpointStore completedCheckpointStore = - new StandaloneCompletedCheckpointStore(1); - final CheckpointRecoveryFactory testingCheckpointRecoveryFactory = - useSameServicesForAllJobs( - completedCheckpointStore, new StandaloneCheckpointIDCounter()); - - final DefaultDeclarativeSlotPool declarativeSlotPool = - createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID()); - - AdaptiveScheduler adaptiveScheduler = - new AdaptiveSchedulerBuilder( - jobGraphWithNewOperator, singleThreadMainThreadExecutor) - .setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory) - .setDeclarativeSlotPool(declarativeSlotPool) - .build(); - - final OneShotLatch submitTaskLatch = new OneShotLatch(); - final TaskManagerGateway taskManagerGateway = - createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch); - - singleThreadMainThreadExecutor.execute( - () -> { - adaptiveScheduler.startScheduling(); - - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), - taskManagerGateway); - }); - - submitTaskLatch.await(); - - // starting and offering the required slots should trigger the ExecutionGraph creation - final CompletedCheckpoint savepoint = - CompletableFuture.supplyAsync( - FunctionUtils.uncheckedSupplier( - () -> completedCheckpointStore.getLatestCheckpoint(false)), - singleThreadMainThreadExecutor) - .join(); - - MatcherAssert.assertThat(savepoint, notNullValue()); - - MatcherAssert.assertThat(savepoint.getCheckpointID(), Matchers.is(savepointId)); - } - @Nonnull private TaskManagerGateway createWaitingForTaskSubmissionTaskManagerGateway( OneShotLatch submitTaskLatch) {
