This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d75f186d99e8699a2dac802de803b25fdf612e01 Author: Zhu Zhu <[email protected]> AuthorDate: Sat Jun 6 13:26:28 2020 +0800 [FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined region scheduling --- .../runtime/scheduler/DefaultSchedulerFactory.java | 34 +++++++++++-- .../OneSlotPerExecutionSlotAllocatorFactory.java | 55 ++++++++++++++++++++++ 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index 1671b5e..333472b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.shuffle.ShuffleMaster; @@ -79,10 +80,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { .create(); log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID()); - final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from( - jobGraph.getScheduleMode(), - slotProvider, - slotRequestTimeout); + final ExecutionSlotAllocatorFactory slotAllocatorFactory = + createExecutionSlotAllocatorFactory( + jobGraph.getScheduleMode(), + slotProvider, + slotRequestTimeout, + schedulingStrategyFactory); return new DefaultScheduler( log, @@ -104,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { restartBackoffTimeStrategy, new DefaultExecutionVertexOperations(), new ExecutionVertexVersioner(), - new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy)); + slotAllocatorFactory); } static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) { @@ -118,4 +121,25 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { throw new IllegalStateException("Unsupported schedule mode " + scheduleMode); } } + + private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory( + final ScheduleMode scheduleMode, + final SlotProvider slotProvider, + final Time slotRequestTimeout, + final SchedulingStrategyFactory schedulingStrategyFactory) { + + if (schedulingStrategyFactory instanceof PipelinedRegionSchedulingStrategy.Factory) { + return new OneSlotPerExecutionSlotAllocatorFactory( + slotProvider, + scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, + slotRequestTimeout); + } else { + final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from( + scheduleMode, + slotProvider, + slotRequestTimeout); + + return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java new file mode 100644 index 0000000..8412b05 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Factory for {@link OneSlotPerExecutionSlotAllocator}. + */ +class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory { + + private final SlotProvider slotProvider; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + OneSlotPerExecutionSlotAllocatorFactory( + final SlotProvider slotProvider, + final boolean slotWillBeOccupiedIndefinitely, + final Time allocationTimeout) { + this.slotProvider = checkNotNull(slotProvider); + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + this.allocationTimeout = checkNotNull(allocationTimeout); + } + + @Override + public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) { + return new OneSlotPerExecutionSlotAllocator( + slotProvider, + preferredLocationsRetriever, + slotWillBeOccupiedIndefinitely, + allocationTimeout); + } +}
