zhuzhurk commented on a change in pull request #9663: 
[WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r324505819
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##########
 @@ -0,0 +1,727 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskFailureListener;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class which can be used to implement {@link SchedulerNG}.
+ */
+public abstract class SchedulerBase implements SchedulerNG {
+
+       private final Logger log;
+
+       private final JobGraph jobGraph;
+
+       private final ExecutionGraph executionGraph;
+
+       private final BackPressureStatsTracker backPressureStatsTracker;
+
+       private final Executor ioExecutor;
+
+       private final Configuration jobMasterConfiguration;
+
+       private final SlotProvider slotProvider;
+
+       private final ScheduledExecutorService futureExecutor;
+
+       private final ClassLoader userCodeLoader;
+
+       private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+       private final Time rpcTimeout;
+
+       private final RestartStrategy restartStrategy;
+
+       private final BlobWriter blobWriter;
+
+       private final Time slotRequestTimeout;
+
+       private ComponentMainThreadExecutor mainThreadExecutor = new 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
+               "SchedulerBase is not initialized with proper main thread 
executor. " +
+                       "Call to SchedulerBase.setMainThreadExecutor(...) 
required.");
+
+       public SchedulerBase(
+               final Logger log,
+               final JobGraph jobGraph,
+               final BackPressureStatsTracker backPressureStatsTracker,
+               final Executor ioExecutor,
+               final Configuration jobMasterConfiguration,
+               final SlotProvider slotProvider,
+               final ScheduledExecutorService futureExecutor,
+               final ClassLoader userCodeLoader,
+               final CheckpointRecoveryFactory checkpointRecoveryFactory,
+               final Time rpcTimeout,
+               final RestartStrategyFactory restartStrategyFactory,
+               final BlobWriter blobWriter,
+               final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+               final Time slotRequestTimeout,
+               final ShuffleMaster<?> shuffleMaster,
+               final PartitionTracker partitionTracker) throws Exception {
+
+               this.log = checkNotNull(log);
+               this.jobGraph = checkNotNull(jobGraph);
+               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker);
+               this.ioExecutor = checkNotNull(ioExecutor);
+               this.jobMasterConfiguration = 
checkNotNull(jobMasterConfiguration);
+               this.slotProvider = checkNotNull(slotProvider);
+               this.futureExecutor = checkNotNull(futureExecutor);
+               this.userCodeLoader = checkNotNull(userCodeLoader);
+               this.checkpointRecoveryFactory = 
checkNotNull(checkpointRecoveryFactory);
+               this.rpcTimeout = checkNotNull(rpcTimeout);
+
+               final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+                       jobGraph.getSerializedExecutionConfig()
+                               .deserializeValue(userCodeLoader)
+                               .getRestartStrategy();
+
+               this.restartStrategy = 
RestartStrategyResolving.resolve(restartStrategyConfiguration,
+                       restartStrategyFactory,
+                       jobGraph.isCheckpointingEnabled());
+
+               log.info("Using restart strategy {} for {} ({}).", 
this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+
+               this.blobWriter = checkNotNull(blobWriter);
+               this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
+
+               this.executionGraph = 
createAndRestoreExecutionGraph(jobManagerJobMetricGroup, 
checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
+       }
+
+       private ExecutionGraph createAndRestoreExecutionGraph(
+               JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+               ShuffleMaster<?> shuffleMaster,
+               PartitionTracker partitionTracker) throws Exception {
+
+               ExecutionGraph newExecutionGraph = 
createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, 
partitionTracker);
+
+               final CheckpointCoordinator checkpointCoordinator = 
newExecutionGraph.getCheckpointCoordinator();
+
+               if (checkpointCoordinator != null) {
+                       // check whether we find a valid checkpoint
+                       if 
(!checkpointCoordinator.restoreLatestCheckpointedState(
+                               newExecutionGraph.getAllVertices(),
+                               false,
+                               false)) {
+
+                               // check whether we can restore from a savepoint
+                               
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
+                       }
+               }
+
+               return newExecutionGraph;
+       }
+
+       private ExecutionGraph createExecutionGraph(
+               JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+               ShuffleMaster<?> shuffleMaster,
+               final PartitionTracker partitionTracker) throws 
JobExecutionException, JobException {
+               return ExecutionGraphBuilder.buildGraph(
+                       null,
+                       jobGraph,
+                       jobMasterConfiguration,
+                       futureExecutor,
+                       ioExecutor,
+                       slotProvider,
+                       userCodeLoader,
+                       checkpointRecoveryFactory,
+                       rpcTimeout,
+                       restartStrategy,
+                       currentJobManagerJobMetricGroup,
+                       blobWriter,
+                       slotRequestTimeout,
+                       log,
+                       shuffleMaster,
+                       partitionTracker);
+       }
+
+       /**
+        * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
+        *
+        * @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
+        * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about the savepoint to restore from
+        * @throws Exception if the {@link ExecutionGraph} could not be restored
+        */
+       private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph 
executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) 
throws Exception {
+               if (savepointRestoreSettings.restoreSavepoint()) {
+                       final CheckpointCoordinator checkpointCoordinator = 
executionGraphToRestore.getCheckpointCoordinator();
+                       if (checkpointCoordinator != null) {
+                               checkpointCoordinator.restoreSavepoint(
+                                       
savepointRestoreSettings.getRestorePath(),
+                                       
savepointRestoreSettings.allowNonRestoredState(),
+                                       
executionGraphToRestore.getAllVertices(),
+                                       userCodeLoader);
+                       }
+               }
+       }
+
+       @Deprecated
+       protected ExecutionGraph getExecutionGraph() {
+               return executionGraph;
+       }
+
+       protected void setTaskFailureListener(TaskFailureListener 
taskFailureListener) {
+               executionGraph.setTaskFailureListener(taskFailureListener);
+       }
+
+       protected void resetForNewExecution(final Collection<ExecutionVertexID> 
verticesToDeploy) {
+               verticesToDeploy.forEach(executionVertexId -> 
getExecutionVertex(executionVertexId)
+                       .resetForNewExecutionIfInTerminalState());
+       }
+
+       protected void transitionToScheduled(final 
Collection<ExecutionVertexID> verticesToDeploy) {
+               verticesToDeploy.forEach(executionVertexId -> 
getExecutionVertex(executionVertexId)
+                       .getCurrentExecutionAttempt()
+                       .transitionState(ExecutionState.SCHEDULED));
+       }
+
+       protected void updateConsumers(ResultPartitionID resultPartitionId) {
+               try {
+                       
executionGraph.scheduleOrUpdateConsumers(resultPartitionId);
+               } catch (ExecutionGraphException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       protected ComponentMainThreadExecutor getMainThreadExecutor() {
+               return mainThreadExecutor;
+       }
+
+       protected void failJob(Throwable cause) {
+               executionGraph.failJob(cause);
+       }
+
+       protected void updateState(TaskExecutionState taskExecutionState) {
+               executionGraph.updateState(taskExecutionState);
+       }
+
+       protected FailoverTopology getFailoverTopology() {
+               return new DefaultFailoverTopology(executionGraph);
+       }
+
+       protected ExecutionGraphToSchedulingTopologyAdapter 
getSchedulingTopology() {
+               return new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
 
 Review comment:
   It's better to cache the created topology.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to