zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests]
Introduce a builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#discussion_r384935971
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -213,4 +267,161 @@ public static CheckpointCoordinator
getCheckpointCoordinator(SchedulerBase sched
return operatorGateway.sendOperatorEventToTask(task,
operator, evt);
}
}
+
+
+ /**
+ * Builder for {@link DefaultScheduler}.
+ */
+ public static class DefaultSchedulerBuilder {
+ private final JobGraph jobGraph;
+
+ private SchedulingStrategyFactory schedulingStrategyFactory;
+
+ private Logger log = LOG;
+ private BackPressureStatsTracker backPressureStatsTracker =
VoidBackPressureStatsTracker.INSTANCE;
+ private Executor ioExecutor =
java.util.concurrent.Executors.newSingleThreadExecutor();
+ private Configuration jobMasterConfiguration = new
Configuration();
+ private ScheduledExecutorService futureExecutor = new
DirectScheduledExecutorService();
+ private ScheduledExecutor delayExecutor = new
ScheduledExecutorServiceAdapter(futureExecutor);
+ private ClassLoader userCodeLoader =
getClass().getClassLoader();
+ private CheckpointRecoveryFactory checkpointRecoveryFactory =
new StandaloneCheckpointRecoveryFactory();
+ private Time rpcTimeout = DEFAULT_TIMEOUT;
+ private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+ private JobManagerJobMetricGroup jobManagerJobMetricGroup =
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+ private ShuffleMaster<?> shuffleMaster =
NettyShuffleMaster.INSTANCE;
+ private JobMasterPartitionTracker partitionTracker =
NoOpJobMasterPartitionTracker.INSTANCE;
+ private FailoverStrategy.Factory failoverStrategyFactory = new
RestartPipelinedRegionFailoverStrategy.Factory();
+ private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
NoRestartBackoffTimeStrategy.INSTANCE;
+ private ExecutionVertexOperations executionVertexOperations =
new DefaultExecutionVertexOperations();
+ private ExecutionVertexVersioner executionVertexVersioner = new
ExecutionVertexVersioner();
+ private ExecutionSlotAllocatorFactory
executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+
+ private DefaultSchedulerBuilder(final JobGraph jobGraph) {
+ this.jobGraph = jobGraph;
+
+ // scheduling strategy is by default set according to
the scheduleMode. It can be re-assigned later.
+ this.schedulingStrategyFactory =
DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
+ }
+
+ public DefaultSchedulerBuilder setLogger(final Logger log) {
+ this.log = log;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setBackPressureStatsTracker(final BackPressureStatsTracker
backPressureStatsTracker) {
+ this.backPressureStatsTracker =
backPressureStatsTracker;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setIoExecutor(final Executor
ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setJobMasterConfiguration(final
Configuration jobMasterConfiguration) {
+ this.jobMasterConfiguration = jobMasterConfiguration;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setFutureExecutor(final
ScheduledExecutorService futureExecutor) {
+ this.futureExecutor = futureExecutor;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setDelayExecutor(final
ScheduledExecutor delayExecutor) {
+ this.delayExecutor = delayExecutor;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setUserCodeLoader(final
ClassLoader userCodeLoader) {
+ this.userCodeLoader = userCodeLoader;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setCheckpointRecoveryFactory(final CheckpointRecoveryFactory
checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory =
checkpointRecoveryFactory;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setRpcTimeout(final Time
rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setBlobWriter(final BlobWriter
blobWriter) {
+ this.blobWriter = blobWriter;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setJobManagerJobMetricGroup(final JobManagerJobMetricGroup
jobManagerJobMetricGroup) {
+ this.jobManagerJobMetricGroup =
jobManagerJobMetricGroup;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setShuffleMaster(final
ShuffleMaster<?> shuffleMaster) {
+ this.shuffleMaster = shuffleMaster;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setPartitionTracker(final
JobMasterPartitionTracker partitionTracker) {
+ this.partitionTracker = partitionTracker;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setSchedulingStrategyFactory(final SchedulingStrategyFactory
schedulingStrategyFactory) {
+ this.schedulingStrategyFactory =
schedulingStrategyFactory;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder setFailoverStrategyFactory(final
FailoverStrategy.Factory failoverStrategyFactory) {
+ this.failoverStrategyFactory = failoverStrategyFactory;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setRestartBackoffTimeStrategy(final RestartBackoffTimeStrategy
restartBackoffTimeStrategy) {
+ this.restartBackoffTimeStrategy =
restartBackoffTimeStrategy;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setExecutionVertexOperations(final ExecutionVertexOperations
executionVertexOperations) {
+ this.executionVertexOperations =
executionVertexOperations;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setExecutionVertexVersioner(final ExecutionVertexVersioner
executionVertexVersioner) {
+ this.executionVertexVersioner =
executionVertexVersioner;
+ return this;
+ }
+
+ public DefaultSchedulerBuilder
setExecutionSlotAllocatorFactory(final ExecutionSlotAllocatorFactory
executionSlotAllocatorFactory) {
+ this.executionSlotAllocatorFactory =
executionSlotAllocatorFactory;
+ return this;
+ }
+
+ public DefaultScheduler build() throws Exception {
+ return new DefaultScheduler(
+ log,
+ jobGraph,
+ backPressureStatsTracker,
+ ioExecutor,
+ jobMasterConfiguration,
+ new SimpleSlotProvider(jobGraph.getJobID(), 0),
// this is not used any more in the new scheduler
Review comment:
Good suggestion. 1b292e3cce846b600e841ba0fdab9668031edfd0 is added to remove
the these two params of DefaultScheduler.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services