GJL commented on a change in pull request #11213: [FLINK-16276][tests] 
Introduce a builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#discussion_r384407096
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 ##########
 @@ -213,4 +267,161 @@ public static CheckpointCoordinator 
getCheckpointCoordinator(SchedulerBase sched
                        return operatorGateway.sendOperatorEventToTask(task, 
operator, evt);
                }
        }
+
+
+       /**
+        * Builder for {@link DefaultScheduler}.
+        */
+       public static class DefaultSchedulerBuilder {
+               private final JobGraph jobGraph;
+
+               private SchedulingStrategyFactory schedulingStrategyFactory;
+
+               private Logger log = LOG;
+               private BackPressureStatsTracker backPressureStatsTracker = 
VoidBackPressureStatsTracker.INSTANCE;
+               private Executor ioExecutor = 
java.util.concurrent.Executors.newSingleThreadExecutor();
+               private Configuration jobMasterConfiguration = new 
Configuration();
+               private ScheduledExecutorService futureExecutor = new 
DirectScheduledExecutorService();
+               private ScheduledExecutor delayExecutor = new 
ScheduledExecutorServiceAdapter(futureExecutor);
+               private ClassLoader userCodeLoader = 
getClass().getClassLoader();
+               private CheckpointRecoveryFactory checkpointRecoveryFactory = 
new StandaloneCheckpointRecoveryFactory();
+               private Time rpcTimeout = DEFAULT_TIMEOUT;
+               private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+               private JobManagerJobMetricGroup jobManagerJobMetricGroup = 
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+               private ShuffleMaster<?> shuffleMaster = 
NettyShuffleMaster.INSTANCE;
+               private JobMasterPartitionTracker partitionTracker = 
NoOpJobMasterPartitionTracker.INSTANCE;
+               private FailoverStrategy.Factory failoverStrategyFactory = new 
RestartPipelinedRegionFailoverStrategy.Factory();
+               private RestartBackoffTimeStrategy restartBackoffTimeStrategy = 
NoRestartBackoffTimeStrategy.INSTANCE;
+               private ExecutionVertexOperations executionVertexOperations = 
new DefaultExecutionVertexOperations();
+               private ExecutionVertexVersioner executionVertexVersioner = new 
ExecutionVertexVersioner();
+               private ExecutionSlotAllocatorFactory 
executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+
+               private DefaultSchedulerBuilder(final JobGraph jobGraph) {
+                       this.jobGraph = jobGraph;
+
+                       // scheduling strategy is by default set according to 
the scheduleMode. It can be re-assigned later.
+                       this.schedulingStrategyFactory = 
DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
+               }
+
+               public DefaultSchedulerBuilder setLogger(final Logger log) {
+                       this.log = log;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setBackPressureStatsTracker(final BackPressureStatsTracker 
backPressureStatsTracker) {
+                       this.backPressureStatsTracker = 
backPressureStatsTracker;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setIoExecutor(final Executor 
ioExecutor) {
+                       this.ioExecutor = ioExecutor;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setJobMasterConfiguration(final 
Configuration jobMasterConfiguration) {
+                       this.jobMasterConfiguration = jobMasterConfiguration;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setFutureExecutor(final 
ScheduledExecutorService futureExecutor) {
+                       this.futureExecutor = futureExecutor;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setDelayExecutor(final 
ScheduledExecutor delayExecutor) {
+                       this.delayExecutor = delayExecutor;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setUserCodeLoader(final 
ClassLoader userCodeLoader) {
+                       this.userCodeLoader = userCodeLoader;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setCheckpointRecoveryFactory(final CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
+                       this.checkpointRecoveryFactory = 
checkpointRecoveryFactory;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setRpcTimeout(final Time 
rpcTimeout) {
+                       this.rpcTimeout = rpcTimeout;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setBlobWriter(final BlobWriter 
blobWriter) {
+                       this.blobWriter = blobWriter;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setJobManagerJobMetricGroup(final JobManagerJobMetricGroup 
jobManagerJobMetricGroup) {
+                       this.jobManagerJobMetricGroup = 
jobManagerJobMetricGroup;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setShuffleMaster(final 
ShuffleMaster<?> shuffleMaster) {
+                       this.shuffleMaster = shuffleMaster;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setPartitionTracker(final 
JobMasterPartitionTracker partitionTracker) {
+                       this.partitionTracker = partitionTracker;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setSchedulingStrategyFactory(final SchedulingStrategyFactory 
schedulingStrategyFactory) {
+                       this.schedulingStrategyFactory = 
schedulingStrategyFactory;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder setFailoverStrategyFactory(final 
FailoverStrategy.Factory failoverStrategyFactory) {
+                       this.failoverStrategyFactory = failoverStrategyFactory;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setRestartBackoffTimeStrategy(final RestartBackoffTimeStrategy 
restartBackoffTimeStrategy) {
+                       this.restartBackoffTimeStrategy = 
restartBackoffTimeStrategy;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setExecutionVertexOperations(final ExecutionVertexOperations 
executionVertexOperations) {
+                       this.executionVertexOperations = 
executionVertexOperations;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setExecutionVertexVersioner(final ExecutionVertexVersioner 
executionVertexVersioner) {
+                       this.executionVertexVersioner = 
executionVertexVersioner;
+                       return this;
+               }
+
+               public DefaultSchedulerBuilder 
setExecutionSlotAllocatorFactory(final ExecutionSlotAllocatorFactory 
executionSlotAllocatorFactory) {
+                       this.executionSlotAllocatorFactory = 
executionSlotAllocatorFactory;
+                       return this;
+               }
+
+               public DefaultScheduler build() throws Exception {
+                       return new DefaultScheduler(
+                               log,
+                               jobGraph,
+                               backPressureStatsTracker,
+                               ioExecutor,
+                               jobMasterConfiguration,
+                               new SimpleSlotProvider(jobGraph.getJobID(), 0), 
// this is not used any more in the new scheduler
 
 Review comment:
   I don't think the comment `// this is not used any more in the new 
scheduler` is needed. It would not be consistent to comment here because there 
is another invocation of this constructor where we do not comment. I would 
remove the parameters `slotProvider` and `slotRequestTimeout` from the 
`DefaultScheduler`, and if possible pass a _"throwing"_ implementation/invalid 
value to `SchedulerBase`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to