This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08094878252304c41c964eb90876d2e5b45e7515
Author: Till Rohrmann <[email protected]>
AuthorDate: Tue Mar 16 10:52:40 2021 +0100

    [hotfix] Make SlotAllocator configurable for the AdaptiveScheduler
---
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java      |  8 ++------
 .../scheduler/adaptive/AdaptiveSchedulerFactory.java       | 13 +++++++++++++
 .../adaptive/allocator/SlotSharingSlotAllocator.java       | 10 +++++++++-
 .../scheduler/adaptive/AdaptiveSchedulerBuilder.java       | 14 ++++++++++++++
 .../adaptive/allocator/SlotSharingSlotAllocatorTest.java   | 12 ++++++------
 5 files changed, 44 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index c1d0caa..3bac7a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -96,7 +96,6 @@ import org.apache.flink.runtime.scheduler.SchedulerUtils;
 import 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
-import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController;
@@ -208,6 +207,7 @@ public class AdaptiveScheduler
             JobGraph jobGraph,
             Configuration configuration,
             DeclarativeSlotPool declarativeSlotPool,
+            SlotAllocator slotAllocator,
             ScheduledExecutorService futureExecutor,
             Executor ioExecutor,
             ClassLoader userCodeClassLoader,
@@ -254,11 +254,7 @@ public class AdaptiveScheduler
                         jobGraph, checkpointRecoveryFactory);
         this.checkpointsCleaner = new CheckpointsCleaner();
 
-        this.slotAllocator =
-                new SlotSharingSlotAllocator(
-                        declarativeSlotPool::reserveFreeSlot,
-                        declarativeSlotPool::freeReservedSlot,
-                        declarativeSlotPool::containsFreeSlot);
+        this.slotAllocator = slotAllocator;
 
         
declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index c7eaf40..278baa0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -88,10 +89,14 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
                 jobGraph.getName(),
                 jobGraph.getJobID());
 
+        final SlotSharingSlotAllocator slotAllocator =
+                createSlotSharingSlotAllocator(declarativeSlotPool);
+
         return new AdaptiveScheduler(
                 jobGraph,
                 jobMasterConfiguration,
                 declarativeSlotPool,
+                slotAllocator,
                 futureExecutor,
                 ioExecutor,
                 userCodeLoader,
@@ -113,4 +118,12 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
     public JobManagerOptions.SchedulerType getSchedulerType() {
         return JobManagerOptions.SchedulerType.Adaptive;
     }
+
+    public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
+            DeclarativeSlotPool declarativeSlotPool) {
+        return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
+                declarativeSlotPool::reserveFreeSlot,
+                declarativeSlotPool::freeReservedSlot,
+                declarativeSlotPool::containsFreeSlot);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index fc72a45..edc3b7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -50,7 +50,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     private final FreeSlotFunction freeSlotFunction;
     private final IsSlotAvailableAndFreeFunction 
isSlotAvailableAndFreeFunction;
 
-    public SlotSharingSlotAllocator(
+    private SlotSharingSlotAllocator(
             ReserveSlotFunction reserveSlot,
             FreeSlotFunction freeSlotFunction,
             IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
@@ -59,6 +59,14 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
     }
 
+    public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
+            ReserveSlotFunction reserveSlot,
+            FreeSlotFunction freeSlotFunction,
+            IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
+        return new SlotSharingSlotAllocator(
+                reserveSlot, freeSlotFunction, isSlotAvailableAndFreeFunction);
+    }
+
     @Override
     public ResourceCounter calculateRequiredSlots(
             Iterable<JobInformation.VertexInformation> vertices) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 6d6257d..e005170 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -37,11 +37,14 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -75,6 +78,8 @@ public class AdaptiveSchedulerBuilder {
     private JobStatusListener jobStatusListener = (ignoredA, ignoredB, 
ignoredC, ignoredD) -> {};
     private long initializationTimestamp = System.currentTimeMillis();
 
+    @Nullable private SlotAllocator slotAllocator;
+
     public AdaptiveSchedulerBuilder(
             final JobGraph jobGraph, ComponentMainThreadExecutor 
mainThreadExecutor) {
         this.jobGraph = jobGraph;
@@ -171,11 +176,20 @@ public class AdaptiveSchedulerBuilder {
         return this;
     }
 
+    public AdaptiveSchedulerBuilder setSlotAllocator(SlotAllocator 
slotAllocator) {
+        this.slotAllocator = slotAllocator;
+        return this;
+    }
+
     public AdaptiveScheduler build() throws Exception {
         return new AdaptiveScheduler(
                 jobGraph,
                 jobMasterConfiguration,
                 declarativeSlotPool,
+                slotAllocator == null
+                        ? 
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
+                                declarativeSlotPool)
+                        : slotAllocator,
                 futureExecutor,
                 ioExecutor,
                 userCodeLoader,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 22b8ddd..a8dc02f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -68,7 +68,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger {
     @Test
     public void testCalculateRequiredSlots() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION);
@@ -87,7 +87,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger {
     @Test
     public void testDetermineParallelismWithMinimumSlots() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION);
@@ -109,7 +109,7 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
     @Test
     public void testDetermineParallelismWithManySlots() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION);
@@ -137,7 +137,7 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
     @Test
     public void 
testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION);
@@ -154,7 +154,7 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
     @Test
     public void testReserveAvailableResources() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                         TEST_RESERVE_SLOT_FUNCTION,
                         TEST_FREE_SLOT_FUNCTION,
                         TEST_IS_SLOT_FREE_FUNCTION);
@@ -193,7 +193,7 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
     @Test
     public void testReserveUnavailableResources() {
         final SlotSharingSlotAllocator slotSharingSlotAllocator =
-                new SlotSharingSlotAllocator(
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
                         TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, 
ignored -> false);
 
         final JobInformation jobInformation =

Reply via email to