This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 397ca61c238a25f59ec823bceedf254667dbb524 Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Jan 13 12:04:11 2022 +0100 [hotfix][tests] Refactor scheduler factory methods Factory methods were refactored to first create a builder that sets various defaults and then override the defaults. --- .../runtime/scheduler/DefaultSchedulerTest.java | 41 ++++++++++++---------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 43beb6a..46742dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -1684,12 +1684,10 @@ public class DefaultSchedulerTest extends TestLogger { private DefaultScheduler createSchedulerAndStartScheduling( final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) { - final SchedulingStrategyFactory schedulingStrategyFactory = - new PipelinedRegionSchedulingStrategy.Factory(); try { final DefaultScheduler scheduler = - createScheduler(jobGraph, mainThreadExecutor, schedulingStrategyFactory); + createSchedulerBuilder(jobGraph, mainThreadExecutor).build(); mainThreadExecutor.execute(scheduler::startScheduling); return scheduler; } catch (Exception e) { @@ -1702,11 +1700,9 @@ public class DefaultSchedulerTest extends TestLogger { final ComponentMainThreadExecutor mainThreadExecutor, final SchedulingStrategyFactory schedulingStrategyFactory) throws Exception { - return createScheduler( - jobGraph, - mainThreadExecutor, - schedulingStrategyFactory, - new RestartPipelinedRegionFailoverStrategy.Factory()); + return createSchedulerBuilder(jobGraph, mainThreadExecutor) + .setSchedulingStrategyFactory(schedulingStrategyFactory) + .build(); } private DefaultScheduler createScheduler( @@ -1715,12 +1711,10 @@ public class DefaultSchedulerTest extends TestLogger { final SchedulingStrategyFactory schedulingStrategyFactory, final FailoverStrategy.Factory failoverStrategyFactory) throws Exception { - return createScheduler( - jobGraph, - mainThreadExecutor, - schedulingStrategyFactory, - failoverStrategyFactory, - taskRestartExecutor); + return createSchedulerBuilder(jobGraph, mainThreadExecutor) + .setSchedulingStrategyFactory(schedulingStrategyFactory) + .setFailoverStrategyFactory(failoverStrategyFactory) + .build(); } private DefaultScheduler createScheduler( @@ -1730,22 +1724,31 @@ public class DefaultSchedulerTest extends TestLogger { final FailoverStrategy.Factory failoverStrategyFactory, final ScheduledExecutor delayExecutor) throws Exception { + return createSchedulerBuilder(jobGraph, mainThreadExecutor) + .setDelayExecutor(delayExecutor) + .setSchedulingStrategyFactory(schedulingStrategyFactory) + .setFailoverStrategyFactory(failoverStrategyFactory) + .build(); + } + + private SchedulerTestingUtils.DefaultSchedulerBuilder createSchedulerBuilder( + final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) + throws Exception { return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, mainThreadExecutor) .setLogger(log) .setIoExecutor(executor) .setJobMasterConfiguration(configuration) .setFutureExecutor(scheduledExecutorService) - .setDelayExecutor(delayExecutor) - .setSchedulingStrategyFactory(schedulingStrategyFactory) - .setFailoverStrategyFactory(failoverStrategyFactory) + .setDelayExecutor(taskRestartExecutor) + .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory()) + .setFailoverStrategyFactory(new RestartPipelinedRegionFailoverStrategy.Factory()) .setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy) .setExecutionVertexOperations(testExecutionVertexOperations) .setExecutionVertexVersioner(executionVertexVersioner) .setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory) .setShuffleMaster(shuffleMaster) .setPartitionTracker(partitionTracker) - .setRpcTimeout(timeout) - .build(); + .setRpcTimeout(timeout); } /**
