This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 397ca61c238a25f59ec823bceedf254667dbb524
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Jan 13 12:04:11 2022 +0100

    [hotfix][tests] Refactor scheduler factory methods
    
    Factory methods were refactored to first create a builder that sets various 
defaults and then override the defaults.
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 41 ++++++++++++----------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 43beb6a..46742dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1684,12 +1684,10 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private DefaultScheduler createSchedulerAndStartScheduling(
             final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor) {
-        final SchedulingStrategyFactory schedulingStrategyFactory =
-                new PipelinedRegionSchedulingStrategy.Factory();
 
         try {
             final DefaultScheduler scheduler =
-                    createScheduler(jobGraph, mainThreadExecutor, 
schedulingStrategyFactory);
+                    createSchedulerBuilder(jobGraph, 
mainThreadExecutor).build();
             mainThreadExecutor.execute(scheduler::startScheduling);
             return scheduler;
         } catch (Exception e) {
@@ -1702,11 +1700,9 @@ public class DefaultSchedulerTest extends TestLogger {
             final ComponentMainThreadExecutor mainThreadExecutor,
             final SchedulingStrategyFactory schedulingStrategyFactory)
             throws Exception {
-        return createScheduler(
-                jobGraph,
-                mainThreadExecutor,
-                schedulingStrategyFactory,
-                new RestartPipelinedRegionFailoverStrategy.Factory());
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                .setSchedulingStrategyFactory(schedulingStrategyFactory)
+                .build();
     }
 
     private DefaultScheduler createScheduler(
@@ -1715,12 +1711,10 @@ public class DefaultSchedulerTest extends TestLogger {
             final SchedulingStrategyFactory schedulingStrategyFactory,
             final FailoverStrategy.Factory failoverStrategyFactory)
             throws Exception {
-        return createScheduler(
-                jobGraph,
-                mainThreadExecutor,
-                schedulingStrategyFactory,
-                failoverStrategyFactory,
-                taskRestartExecutor);
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                .setSchedulingStrategyFactory(schedulingStrategyFactory)
+                .setFailoverStrategyFactory(failoverStrategyFactory)
+                .build();
     }
 
     private DefaultScheduler createScheduler(
@@ -1730,22 +1724,31 @@ public class DefaultSchedulerTest extends TestLogger {
             final FailoverStrategy.Factory failoverStrategyFactory,
             final ScheduledExecutor delayExecutor)
             throws Exception {
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                .setDelayExecutor(delayExecutor)
+                .setSchedulingStrategyFactory(schedulingStrategyFactory)
+                .setFailoverStrategyFactory(failoverStrategyFactory)
+                .build();
+    }
+
+    private SchedulerTestingUtils.DefaultSchedulerBuilder 
createSchedulerBuilder(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor)
+            throws Exception {
         return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, 
mainThreadExecutor)
                 .setLogger(log)
                 .setIoExecutor(executor)
                 .setJobMasterConfiguration(configuration)
                 .setFutureExecutor(scheduledExecutorService)
-                .setDelayExecutor(delayExecutor)
-                .setSchedulingStrategyFactory(schedulingStrategyFactory)
-                .setFailoverStrategyFactory(failoverStrategyFactory)
+                .setDelayExecutor(taskRestartExecutor)
+                .setSchedulingStrategyFactory(new 
PipelinedRegionSchedulingStrategy.Factory())
+                .setFailoverStrategyFactory(new 
RestartPipelinedRegionFailoverStrategy.Factory())
                 .setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
                 .setExecutionVertexOperations(testExecutionVertexOperations)
                 .setExecutionVertexVersioner(executionVertexVersioner)
                 
.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
                 .setShuffleMaster(shuffleMaster)
                 .setPartitionTracker(partitionTracker)
-                .setRpcTimeout(timeout)
-                .build();
+                .setRpcTimeout(timeout);
     }
 
     /**

Reply via email to