This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d75f186d99e8699a2dac802de803b25fdf612e01
Author: Zhu Zhu <[email protected]>
AuthorDate: Sat Jun 6 13:26:28 2020 +0800

    [FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined 
region scheduling
---
 .../runtime/scheduler/DefaultSchedulerFactory.java | 34 +++++++++++--
 .../OneSlotPerExecutionSlotAllocatorFactory.java   | 55 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 1671b5e..333472b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import 
org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
@@ -79,10 +80,12 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                        .create();
                log.info("Using restart back off time strategy {} for {} 
({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
 
-               final SlotProviderStrategy slotProviderStrategy = 
SlotProviderStrategy.from(
-                       jobGraph.getScheduleMode(),
-                       slotProvider,
-                       slotRequestTimeout);
+               final ExecutionSlotAllocatorFactory slotAllocatorFactory =
+                       createExecutionSlotAllocatorFactory(
+                               jobGraph.getScheduleMode(),
+                               slotProvider,
+                               slotRequestTimeout,
+                               schedulingStrategyFactory);
 
                return new DefaultScheduler(
                        log,
@@ -104,7 +107,7 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                        restartBackoffTimeStrategy,
                        new DefaultExecutionVertexOperations(),
                        new ExecutionVertexVersioner(),
-                       new 
DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
+                       slotAllocatorFactory);
        }
 
        static SchedulingStrategyFactory createSchedulingStrategyFactory(final 
ScheduleMode scheduleMode) {
@@ -118,4 +121,25 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                                throw new IllegalStateException("Unsupported 
schedule mode " + scheduleMode);
                }
        }
+
+       private static ExecutionSlotAllocatorFactory 
createExecutionSlotAllocatorFactory(
+                       final ScheduleMode scheduleMode,
+                       final SlotProvider slotProvider,
+                       final Time slotRequestTimeout,
+                       final SchedulingStrategyFactory 
schedulingStrategyFactory) {
+
+               if (schedulingStrategyFactory instanceof 
PipelinedRegionSchedulingStrategy.Factory) {
+                       return new OneSlotPerExecutionSlotAllocatorFactory(
+                               slotProvider,
+                               scheduleMode != 
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+                               slotRequestTimeout);
+               } else {
+                       final SlotProviderStrategy slotProviderStrategy = 
SlotProviderStrategy.from(
+                               scheduleMode,
+                               slotProvider,
+                               slotRequestTimeout);
+
+                       return new 
DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
new file mode 100644
index 0000000..8412b05
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Factory for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+class OneSlotPerExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocatorFactory {
+
+       private final SlotProvider slotProvider;
+
+       private final boolean slotWillBeOccupiedIndefinitely;
+
+       private final Time allocationTimeout;
+
+       OneSlotPerExecutionSlotAllocatorFactory(
+                       final SlotProvider slotProvider,
+                       final boolean slotWillBeOccupiedIndefinitely,
+                       final Time allocationTimeout) {
+               this.slotProvider = checkNotNull(slotProvider);
+               this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+               this.allocationTimeout = checkNotNull(allocationTimeout);
+       }
+
+       @Override
+       public ExecutionSlotAllocator createInstance(final 
PreferredLocationsRetriever preferredLocationsRetriever) {
+               return new OneSlotPerExecutionSlotAllocator(
+                       slotProvider,
+                       preferredLocationsRetriever,
+                       slotWillBeOccupiedIndefinitely,
+                       allocationTimeout);
+       }
+}

Reply via email to