This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c7a4203b433267a4fa9c0073e3d61893639b3e3e Author: Till Rohrmann <[email protected]> AuthorDate: Mon Mar 15 12:12:13 2021 +0100 [FLINK-21602] Add DeclarativeSlotPool. and AllocatedSlotPool.containsFreeSlot The newly introduced method containsFreeSlot allows to check whether an assumption that a given slot is still free holds true or not. This allows to continue reserving slots from the DeclarativeSlotPool or not. --- .../jobmaster/slotpool/AllocatedSlotPool.java | 10 ++++++++ .../jobmaster/slotpool/DeclarativeSlotPool.java | 10 ++++++++ .../slotpool/DefaultAllocatedSlotPool.java | 5 ++++ .../slotpool/DefaultDeclarativeSlotPool.java | 5 ++++ .../slotpool/DefaultAllocatedSlotPoolTest.java | 28 ++++++++++++++++++++++ .../slotpool/TestingDeclarativeSlotPool.java | 9 +++++++ .../TestingDeclarativeSlotPoolBuilder.java | 8 +++++++ 7 files changed, 75 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java index 3eed88f..221fa9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java @@ -74,6 +74,16 @@ public interface AllocatedSlotPool { boolean containsSlot(AllocationID allocationId); /** + * Checks whether the slot pool contains a slot with the given {@link AllocationID} and if it is + * free. + * + * @param allocationId allocationId specifies the slot to check for + * @return {@code true} if the slot pool contains a free slot registered under the given + * allocation id; otherwise {@code false} + */ + boolean containsFreeSlot(AllocationID allocationId); + + /** * Reserves the free slot specified by the given allocationId. * * @param allocationId allocationId identifying the free slot to reserve diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index 752181f..39849bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -102,6 +102,16 @@ public interface DeclarativeSlotPool { Collection<? extends SlotInfo> getAllSlotsInformation(); /** + * Checks whether the slot pool contains a slot with the given {@link AllocationID} and if it is + * free. + * + * @param allocationId allocationId specifies the slot to check for + * @return {@code true} if the slot pool contains a free slot registered under the given + * allocation id; otherwise {@code false} + */ + boolean containsFreeSlot(AllocationID allocationId); + + /** * Reserves the free slot identified by the given allocationId and maps it to the given * requiredSlotProfile. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java index c188171..a585d6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java @@ -140,6 +140,11 @@ public class DefaultAllocatedSlotPool implements AllocatedSlotPool { } @Override + public boolean containsFreeSlot(AllocationID allocationId) { + return freeSlotsSince.containsKey(allocationId); + } + + @Override public AllocatedSlot reserveFreeSlot(AllocationID allocationId) { LOG.debug("Reserve free slot with allocation id {}.", allocationId); Preconditions.checkState( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index 24ad556..65a2f40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -478,6 +478,11 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { } @Override + public boolean containsFreeSlot(AllocationID allocationId) { + return slotPool.containsFreeSlot(allocationId); + } + + @Override public boolean containsSlots(ResourceID owner) { return slotPool.containsSlots(owner); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java index 548dea0..2407eaa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java @@ -234,6 +234,34 @@ public class DefaultAllocatedSlotPoolTest extends TestLogger { slotPool.removeSlots(ResourceID.generate()); } + @Test + public void testContainsFreeSlotReturnsTrueIfSlotIsFree() { + final DefaultAllocatedSlotPool slotPool = new DefaultAllocatedSlotPool(); + final AllocatedSlot allocatedSlot = createAllocatedSlot(ResourceID.generate()); + + slotPool.addSlots(Collections.singleton(allocatedSlot), 0); + + assertTrue(slotPool.containsFreeSlot(allocatedSlot.getAllocationId())); + } + + @Test + public void testContainsFreeSlotReturnsFalseIfSlotDoesNotExist() { + final DefaultAllocatedSlotPool slotPool = new DefaultAllocatedSlotPool(); + + assertFalse(slotPool.containsFreeSlot(new AllocationID())); + } + + @Test + public void testContainsFreeSlotReturnsFalseIfSlotIsReserved() { + final DefaultAllocatedSlotPool slotPool = new DefaultAllocatedSlotPool(); + final AllocatedSlot allocatedSlot = createAllocatedSlot(ResourceID.generate()); + + slotPool.addSlots(Collections.singleton(allocatedSlot), 0); + slotPool.reserveFreeSlot(allocatedSlot.getAllocationId()); + + assertFalse(slotPool.containsFreeSlot(allocatedSlot.getAllocationId())); + } + private void assertSlotPoolContainsSlots( DefaultAllocatedSlotPool slotPool, Collection<AllocatedSlot> slots) { assertThat(slotPool.getAllSlotsInformation(), hasSize(slots.size())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index 40a5687..a22a877 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -71,6 +71,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { private final Function<ResourceID, Boolean> containsSlotsFunction; + private final Function<AllocationID, Boolean> containsFreeSlotFunction; + private final LongConsumer releaseIdleSlotsConsumer; private final Consumer<ResourceCounter> setResourceRequirementsConsumer; @@ -93,6 +95,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { BiFunction<AllocationID, ResourceProfile, PhysicalSlot> reserveFreeSlotFunction, TriFunction<AllocationID, Throwable, Long, ResourceCounter> freeReservedSlotFunction, Function<ResourceID, Boolean> containsSlotsFunction, + Function<AllocationID, Boolean> containsFreeSlotFunction, LongConsumer releaseIdleSlotsConsumer, Consumer<ResourceCounter> setResourceRequirementsConsumer) { this.increaseResourceRequirementsByConsumer = increaseResourceRequirementsByConsumer; @@ -106,6 +109,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { this.reserveFreeSlotFunction = reserveFreeSlotFunction; this.freeReservedSlotFunction = freeReservedSlotFunction; this.containsSlotsFunction = containsSlotsFunction; + this.containsFreeSlotFunction = containsFreeSlotFunction; this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer; this.setResourceRequirementsConsumer = setResourceRequirementsConsumer; } @@ -151,6 +155,11 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { } @Override + public boolean containsFreeSlot(AllocationID allocationId) { + return containsFreeSlotFunction.apply(allocationId); + } + + @Override public ResourceCounter releaseSlots(ResourceID owner, Exception cause) { return releaseSlotsFunction.apply(owner, cause); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java index 7982858..e2ecb34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java @@ -68,6 +68,7 @@ public class TestingDeclarativeSlotPoolBuilder { private Function<ResourceID, Boolean> containsSlotsFunction = ignored -> false; private LongConsumer returnIdleSlotsConsumer = ignored -> {}; private Consumer<ResourceCounter> setResourceRequirementsConsumer = ignored -> {}; + private Function<AllocationID, Boolean> containsFreeSlotFunction = ignored -> false; public TestingDeclarativeSlotPoolBuilder setIncreaseResourceRequirementsByConsumer( Consumer<ResourceCounter> increaseResourceRequirementsByConsumer) { @@ -148,6 +149,12 @@ public class TestingDeclarativeSlotPoolBuilder { return this; } + public TestingDeclarativeSlotPoolBuilder setContainsFreeSlotFunction( + Function<AllocationID, Boolean> containsFreeSlotFunction) { + this.containsFreeSlotFunction = containsFreeSlotFunction; + return this; + } + public TestingDeclarativeSlotPoolBuilder setReturnIdleSlotsConsumer( LongConsumer returnIdleSlotsConsumer) { this.returnIdleSlotsConsumer = returnIdleSlotsConsumer; @@ -167,6 +174,7 @@ public class TestingDeclarativeSlotPoolBuilder { reserveFreeSlotFunction, freeReservedSlotFunction, containsSlotsFunction, + containsFreeSlotFunction, returnIdleSlotsConsumer, setResourceRequirementsConsumer); }
