tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280447981
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java ########## @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.FunctionUtils; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A scheduler that delegates to the scheduling logic in the {@link ExecutionGraph}. + * + * @see ExecutionGraph#scheduleForExecution() + */ +public class LegacyScheduler implements SchedulerNG { + + private final Logger log; + + private final JobGraph jobGraph; + + private final ExecutionGraph executionGraph; + + private final BackPressureStatsTracker backPressureStatsTracker; + + private final Executor ioExecutor; + + private final Configuration jobMasterConfiguration; + + private final SlotProvider slotProvider; + + private final ScheduledExecutorService futureExecutor; + + private final ClassLoader userCodeLoader; + + private final CheckpointRecoveryFactory checkpointRecoveryFactory; + + private final Time rpcTimeout; + + private final RestartStrategy restartStrategy; + + private final BlobWriter blobWriter; + + private final Time slotRequestTimeout; + + private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor( + "LegacyScheduler is not initialized with proper main thread executor. " + + "Call to LegacyScheduler.setMainThreadExecutor(...) required."); + + public LegacyScheduler( + final Logger log, + final JobGraph jobGraph, + final BackPressureStatsTracker backPressureStatsTracker, + final Executor ioExecutor, + final Configuration jobMasterConfiguration, + final SlotProvider slotProvider, + final ScheduledExecutorService futureExecutor, + final ClassLoader userCodeLoader, + final CheckpointRecoveryFactory checkpointRecoveryFactory, + final Time rpcTimeout, + final RestartStrategyFactory restartStrategyFactory, + final BlobWriter blobWriter, + final JobManagerJobMetricGroup jobManagerJobMetricGroup, + final Time slotRequestTimeout) throws Exception { + + this.log = checkNotNull(log); + this.jobGraph = checkNotNull(jobGraph); + this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker); + this.ioExecutor = checkNotNull(ioExecutor); + this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration); + this.slotProvider = checkNotNull(slotProvider); + this.futureExecutor = checkNotNull(futureExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.checkpointRecoveryFactory = checkNotNull(checkpointRecoveryFactory); + this.rpcTimeout = checkNotNull(rpcTimeout); + + final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = + jobGraph.getSerializedExecutionConfig() + .deserializeValue(userCodeLoader) + .getRestartStrategy(); + + this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration, + restartStrategyFactory, + jobGraph.isCheckpointingEnabled()); + + log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID()); + + this.blobWriter = checkNotNull(blobWriter); + this.slotRequestTimeout = checkNotNull(slotRequestTimeout); + + this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup); + } + + private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception { + + ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup); + + final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator != null) { + // check whether we find a valid checkpoint + if (!checkpointCoordinator.restoreLatestCheckpointedState( + newExecutionGraph.getAllVertices(), + false, + false)) { + + // check whether we can restore from a savepoint + tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); + } + } + + return newExecutionGraph; + } + + private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException { + return ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfiguration, + futureExecutor, + ioExecutor, + slotProvider, + userCodeLoader, + checkpointRecoveryFactory, + rpcTimeout, + restartStrategy, + currentJobManagerJobMetricGroup, + blobWriter, + slotRequestTimeout, + log); + } + + /** + * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. + * + * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored + * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from + * @throws Exception if the {@link ExecutionGraph} could not be restored + */ + private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + checkpointCoordinator.restoreSavepoint( + savepointRestoreSettings.getRestorePath(), + savepointRestoreSettings.allowNonRestoredState(), + executionGraphToRestore.getAllVertices(), + userCodeLoader); + } + } + } + + @Override + public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) { + this.mainThreadExecutor = checkNotNull(mainThreadExecutor); + executionGraph.start(mainThreadExecutor); + } + + @Override + public void registerJobStatusListener(final JobStatusListener jobStatusListener) { + executionGraph.registerJobStatusListener(jobStatusListener); + } + + @Override + public void startScheduling() { + mainThreadExecutor.assertRunningInMainThread(); + + try { + executionGraph.scheduleForExecution(); + } + catch (Throwable t) { + executionGraph.failGlobal(t); + } + } + + @Override + public void suspend(Throwable cause) { + mainThreadExecutor.assertRunningInMainThread(); + executionGraph.suspend(cause); + } + + @Override + public void cancel() { + mainThreadExecutor.assertRunningInMainThread(); + executionGraph.cancel(); + } + + @Override + public CompletableFuture<Void> getTerminationFuture() { + return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + mainThreadExecutor.assertRunningInMainThread(); + return executionGraph.updateState(taskExecutionState); + } + + @Override + public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException { + mainThreadExecutor.assertRunningInMainThread(); + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + // can happen when JobManager had already unregistered this execution upon on task failure, + // but TaskManager get some delay to aware of that situation + if (log.isDebugEnabled()) { + log.debug("Can not find Execution for attempt {}.", executionAttempt); + } + // but we should TaskManager be aware of this + throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttempt); + } + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex == null) { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); Review comment: I think we could remove the error logging statements and log them on the `JobMaster` level. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
