This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 08094878252304c41c964eb90876d2e5b45e7515 Author: Till Rohrmann <[email protected]> AuthorDate: Tue Mar 16 10:52:40 2021 +0100 [hotfix] Make SlotAllocator configurable for the AdaptiveScheduler --- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 8 ++------ .../scheduler/adaptive/AdaptiveSchedulerFactory.java | 13 +++++++++++++ .../adaptive/allocator/SlotSharingSlotAllocator.java | 10 +++++++++- .../scheduler/adaptive/AdaptiveSchedulerBuilder.java | 14 ++++++++++++++ .../adaptive/allocator/SlotSharingSlotAllocatorTest.java | 12 ++++++------ 5 files changed, 44 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index c1d0caa..3bac7a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -96,7 +96,6 @@ import org.apache.flink.runtime.scheduler.SchedulerUtils; import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener; import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; -import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController; @@ -208,6 +207,7 @@ public class AdaptiveScheduler JobGraph jobGraph, Configuration configuration, DeclarativeSlotPool declarativeSlotPool, + SlotAllocator slotAllocator, ScheduledExecutorService futureExecutor, Executor ioExecutor, ClassLoader userCodeClassLoader, @@ -254,11 +254,7 @@ public class AdaptiveScheduler jobGraph, checkpointRecoveryFactory); this.checkpointsCleaner = new CheckpointsCleaner(); - this.slotAllocator = - new SlotSharingSlotAllocator( - declarativeSlotPool::reserveFreeSlot, - declarativeSlotPool::freeReservedSlot, - declarativeSlotPool::containsFreeSlot); + this.slotAllocator = slotAllocator; declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java index c7eaf40..278baa0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerNGFactory; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; @@ -88,10 +89,14 @@ public class AdaptiveSchedulerFactory implements SchedulerNGFactory { jobGraph.getName(), jobGraph.getJobID()); + final SlotSharingSlotAllocator slotAllocator = + createSlotSharingSlotAllocator(declarativeSlotPool); + return new AdaptiveScheduler( jobGraph, jobMasterConfiguration, declarativeSlotPool, + slotAllocator, futureExecutor, ioExecutor, userCodeLoader, @@ -113,4 +118,12 @@ public class AdaptiveSchedulerFactory implements SchedulerNGFactory { public JobManagerOptions.SchedulerType getSchedulerType() { return JobManagerOptions.SchedulerType.Adaptive; } + + public static SlotSharingSlotAllocator createSlotSharingSlotAllocator( + DeclarativeSlotPool declarativeSlotPool) { + return SlotSharingSlotAllocator.createSlotSharingSlotAllocator( + declarativeSlotPool::reserveFreeSlot, + declarativeSlotPool::freeReservedSlot, + declarativeSlotPool::containsFreeSlot); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index fc72a45..edc3b7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -50,7 +50,7 @@ public class SlotSharingSlotAllocator implements SlotAllocator { private final FreeSlotFunction freeSlotFunction; private final IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction; - public SlotSharingSlotAllocator( + private SlotSharingSlotAllocator( ReserveSlotFunction reserveSlot, FreeSlotFunction freeSlotFunction, IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) { @@ -59,6 +59,14 @@ public class SlotSharingSlotAllocator implements SlotAllocator { this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction; } + public static SlotSharingSlotAllocator createSlotSharingSlotAllocator( + ReserveSlotFunction reserveSlot, + FreeSlotFunction freeSlotFunction, + IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) { + return new SlotSharingSlotAllocator( + reserveSlot, freeSlotFunction, isSlotAvailableAndFreeFunction); + } + @Override public ResourceCounter calculateRequiredSlots( Iterable<JobInformation.VertexInformation> vertices) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index 6d6257d..e005170 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -37,11 +37,14 @@ import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.FatalExitExceptionHandler; +import javax.annotation.Nullable; + import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -75,6 +78,8 @@ public class AdaptiveSchedulerBuilder { private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC, ignoredD) -> {}; private long initializationTimestamp = System.currentTimeMillis(); + @Nullable private SlotAllocator slotAllocator; + public AdaptiveSchedulerBuilder( final JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) { this.jobGraph = jobGraph; @@ -171,11 +176,20 @@ public class AdaptiveSchedulerBuilder { return this; } + public AdaptiveSchedulerBuilder setSlotAllocator(SlotAllocator slotAllocator) { + this.slotAllocator = slotAllocator; + return this; + } + public AdaptiveScheduler build() throws Exception { return new AdaptiveScheduler( jobGraph, jobMasterConfiguration, declarativeSlotPool, + slotAllocator == null + ? AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( + declarativeSlotPool) + : slotAllocator, futureExecutor, ioExecutor, userCodeLoader, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java index 22b8ddd..a8dc02f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java @@ -68,7 +68,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { @Test public void testCalculateRequiredSlots() { final SlotSharingSlotAllocator slotAllocator = - new SlotSharingSlotAllocator( + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION); @@ -87,7 +87,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { @Test public void testDetermineParallelismWithMinimumSlots() { final SlotSharingSlotAllocator slotAllocator = - new SlotSharingSlotAllocator( + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION); @@ -109,7 +109,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { @Test public void testDetermineParallelismWithManySlots() { final SlotSharingSlotAllocator slotAllocator = - new SlotSharingSlotAllocator( + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION); @@ -137,7 +137,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { @Test public void testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() { final SlotSharingSlotAllocator slotAllocator = - new SlotSharingSlotAllocator( + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION); @@ -154,7 +154,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { @Test public void testReserveAvailableResources() { final SlotSharingSlotAllocator slotAllocator = - new SlotSharingSlotAllocator( + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION); @@ -193,7 +193,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { @Test public void testReserveUnavailableResources() { final SlotSharingSlotAllocator slotSharingSlotAllocator = - new SlotSharingSlotAllocator( + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, ignored -> false); final JobInformation jobInformation =
