tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280447981
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
 ##########
 @@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A scheduler that delegates to the scheduling logic in the {@link 
ExecutionGraph}.
+ *
+ * @see ExecutionGraph#scheduleForExecution()
+ */
+public class LegacyScheduler implements SchedulerNG {
+
+       private final Logger log;
+
+       private final JobGraph jobGraph;
+
+       private final ExecutionGraph executionGraph;
+
+       private final BackPressureStatsTracker backPressureStatsTracker;
+
+       private final Executor ioExecutor;
+
+       private final Configuration jobMasterConfiguration;
+
+       private final SlotProvider slotProvider;
+
+       private final ScheduledExecutorService futureExecutor;
+
+       private final ClassLoader userCodeLoader;
+
+       private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+       private final Time rpcTimeout;
+
+       private final RestartStrategy restartStrategy;
+
+       private final BlobWriter blobWriter;
+
+       private final Time slotRequestTimeout;
+
+       private ComponentMainThreadExecutor mainThreadExecutor = new 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
+               "LegacyScheduler is not initialized with proper main thread 
executor. " +
+                       "Call to LegacyScheduler.setMainThreadExecutor(...) 
required.");
+
+       public LegacyScheduler(
+                       final Logger log,
+                       final JobGraph jobGraph,
+                       final BackPressureStatsTracker backPressureStatsTracker,
+                       final Executor ioExecutor,
+                       final Configuration jobMasterConfiguration,
+                       final SlotProvider slotProvider,
+                       final ScheduledExecutorService futureExecutor,
+                       final ClassLoader userCodeLoader,
+                       final CheckpointRecoveryFactory 
checkpointRecoveryFactory,
+                       final Time rpcTimeout,
+                       final RestartStrategyFactory restartStrategyFactory,
+                       final BlobWriter blobWriter,
+                       final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+                       final Time slotRequestTimeout) throws Exception {
+
+               this.log = checkNotNull(log);
+               this.jobGraph = checkNotNull(jobGraph);
+               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker);
+               this.ioExecutor = checkNotNull(ioExecutor);
+               this.jobMasterConfiguration = 
checkNotNull(jobMasterConfiguration);
+               this.slotProvider = checkNotNull(slotProvider);
+               this.futureExecutor = checkNotNull(futureExecutor);
+               this.userCodeLoader = checkNotNull(userCodeLoader);
+               this.checkpointRecoveryFactory = 
checkNotNull(checkpointRecoveryFactory);
+               this.rpcTimeout = checkNotNull(rpcTimeout);
+
+               final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+                       jobGraph.getSerializedExecutionConfig()
+                               .deserializeValue(userCodeLoader)
+                               .getRestartStrategy();
+
+               this.restartStrategy = 
RestartStrategyResolving.resolve(restartStrategyConfiguration,
+                       restartStrategyFactory,
+                       jobGraph.isCheckpointingEnabled());
+
+               log.info("Using restart strategy {} for {} ({}).", 
this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+
+               this.blobWriter = checkNotNull(blobWriter);
+               this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
+
+               this.executionGraph = 
createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
+       }
+
+       private ExecutionGraph 
createAndRestoreExecutionGraph(JobManagerJobMetricGroup 
currentJobManagerJobMetricGroup) throws Exception {
+
+               ExecutionGraph newExecutionGraph = 
createExecutionGraph(currentJobManagerJobMetricGroup);
+
+               final CheckpointCoordinator checkpointCoordinator = 
newExecutionGraph.getCheckpointCoordinator();
+
+               if (checkpointCoordinator != null) {
+                       // check whether we find a valid checkpoint
+                       if 
(!checkpointCoordinator.restoreLatestCheckpointedState(
+                               newExecutionGraph.getAllVertices(),
+                               false,
+                               false)) {
+
+                               // check whether we can restore from a savepoint
+                               
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
+                       }
+               }
+
+               return newExecutionGraph;
+       }
+
+       private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup 
currentJobManagerJobMetricGroup) throws JobExecutionException, JobException {
+               return ExecutionGraphBuilder.buildGraph(
+                       null,
+                       jobGraph,
+                       jobMasterConfiguration,
+                       futureExecutor,
+                       ioExecutor,
+                       slotProvider,
+                       userCodeLoader,
+                       checkpointRecoveryFactory,
+                       rpcTimeout,
+                       restartStrategy,
+                       currentJobManagerJobMetricGroup,
+                       blobWriter,
+                       slotRequestTimeout,
+                       log);
+       }
+
+       /**
+        * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
+        *
+        * @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
+        * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about the savepoint to restore from
+        * @throws Exception if the {@link ExecutionGraph} could not be restored
+        */
+       private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph 
executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) 
throws Exception {
+               if (savepointRestoreSettings.restoreSavepoint()) {
+                       final CheckpointCoordinator checkpointCoordinator = 
executionGraphToRestore.getCheckpointCoordinator();
+                       if (checkpointCoordinator != null) {
+                               checkpointCoordinator.restoreSavepoint(
+                                       
savepointRestoreSettings.getRestorePath(),
+                                       
savepointRestoreSettings.allowNonRestoredState(),
+                                       
executionGraphToRestore.getAllVertices(),
+                                       userCodeLoader);
+                       }
+               }
+       }
+
+       @Override
+       public void setMainThreadExecutor(final ComponentMainThreadExecutor 
mainThreadExecutor) {
+               this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+               executionGraph.start(mainThreadExecutor);
+       }
+
+       @Override
+       public void registerJobStatusListener(final JobStatusListener 
jobStatusListener) {
+               executionGraph.registerJobStatusListener(jobStatusListener);
+       }
+
+       @Override
+       public void startScheduling() {
+               mainThreadExecutor.assertRunningInMainThread();
+
+               try {
+                       executionGraph.scheduleForExecution();
+               }
+               catch (Throwable t) {
+                       executionGraph.failGlobal(t);
+               }
+       }
+
+       @Override
+       public void suspend(Throwable cause) {
+               mainThreadExecutor.assertRunningInMainThread();
+               executionGraph.suspend(cause);
+       }
+
+       @Override
+       public void cancel() {
+               mainThreadExecutor.assertRunningInMainThread();
+               executionGraph.cancel();
+       }
+
+       @Override
+       public CompletableFuture<Void> getTerminationFuture() {
+               return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
+       }
+
+       @Override
+       public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+               mainThreadExecutor.assertRunningInMainThread();
+               return executionGraph.updateState(taskExecutionState);
+       }
+
+       @Override
+       public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, 
ExecutionAttemptID executionAttempt) throws IOException {
+               mainThreadExecutor.assertRunningInMainThread();
+
+               final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+               if (execution == null) {
+                       // can happen when JobManager had already unregistered 
this execution upon on task failure,
+                       // but TaskManager get some delay to aware of that 
situation
+                       if (log.isDebugEnabled()) {
+                               log.debug("Can not find Execution for attempt 
{}.", executionAttempt);
+                       }
+                       // but we should TaskManager be aware of this
+                       throw new IllegalArgumentException("Can not find 
Execution for attempt " + executionAttempt);
+               }
+
+               final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+               if (vertex == null) {
+                       log.error("Cannot find execution vertex for vertex ID 
{}.", vertexID);
 
 Review comment:
   I think we could remove the error logging statements and log them on the 
`JobMaster` level.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to