This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7a4203b433267a4fa9c0073e3d61893639b3e3e
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Mar 15 12:12:13 2021 +0100

    [FLINK-21602] Add DeclarativeSlotPool. and 
AllocatedSlotPool.containsFreeSlot
    
    The newly introduced method containsFreeSlot allows to check whether an 
assumption that
    a given slot is still free holds true or not. This allows to continue 
reserving slots
    from the DeclarativeSlotPool or not.
---
 .../jobmaster/slotpool/AllocatedSlotPool.java      | 10 ++++++++
 .../jobmaster/slotpool/DeclarativeSlotPool.java    | 10 ++++++++
 .../slotpool/DefaultAllocatedSlotPool.java         |  5 ++++
 .../slotpool/DefaultDeclarativeSlotPool.java       |  5 ++++
 .../slotpool/DefaultAllocatedSlotPoolTest.java     | 28 ++++++++++++++++++++++
 .../slotpool/TestingDeclarativeSlotPool.java       |  9 +++++++
 .../TestingDeclarativeSlotPoolBuilder.java         |  8 +++++++
 7 files changed, 75 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
index 3eed88f..221fa9d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java
@@ -74,6 +74,16 @@ public interface AllocatedSlotPool {
     boolean containsSlot(AllocationID allocationId);
 
     /**
+     * Checks whether the slot pool contains a slot with the given {@link 
AllocationID} and if it is
+     * free.
+     *
+     * @param allocationId allocationId specifies the slot to check for
+     * @return {@code true} if the slot pool contains a free slot registered 
under the given
+     *     allocation id; otherwise {@code false}
+     */
+    boolean containsFreeSlot(AllocationID allocationId);
+
+    /**
      * Reserves the free slot specified by the given allocationId.
      *
      * @param allocationId allocationId identifying the free slot to reserve
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
index 752181f..39849bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
@@ -102,6 +102,16 @@ public interface DeclarativeSlotPool {
     Collection<? extends SlotInfo> getAllSlotsInformation();
 
     /**
+     * Checks whether the slot pool contains a slot with the given {@link 
AllocationID} and if it is
+     * free.
+     *
+     * @param allocationId allocationId specifies the slot to check for
+     * @return {@code true} if the slot pool contains a free slot registered 
under the given
+     *     allocation id; otherwise {@code false}
+     */
+    boolean containsFreeSlot(AllocationID allocationId);
+
+    /**
      * Reserves the free slot identified by the given allocationId and maps it 
to the given
      * requiredSlotProfile.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
index c188171..a585d6b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java
@@ -140,6 +140,11 @@ public class DefaultAllocatedSlotPool implements 
AllocatedSlotPool {
     }
 
     @Override
+    public boolean containsFreeSlot(AllocationID allocationId) {
+        return freeSlotsSince.containsKey(allocationId);
+    }
+
+    @Override
     public AllocatedSlot reserveFreeSlot(AllocationID allocationId) {
         LOG.debug("Reserve free slot with allocation id {}.", allocationId);
         Preconditions.checkState(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index 24ad556..65a2f40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -478,6 +478,11 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     }
 
     @Override
+    public boolean containsFreeSlot(AllocationID allocationId) {
+        return slotPool.containsFreeSlot(allocationId);
+    }
+
+    @Override
     public boolean containsSlots(ResourceID owner) {
         return slotPool.containsSlots(owner);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
index 548dea0..2407eaa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java
@@ -234,6 +234,34 @@ public class DefaultAllocatedSlotPoolTest extends 
TestLogger {
         slotPool.removeSlots(ResourceID.generate());
     }
 
+    @Test
+    public void testContainsFreeSlotReturnsTrueIfSlotIsFree() {
+        final DefaultAllocatedSlotPool slotPool = new 
DefaultAllocatedSlotPool();
+        final AllocatedSlot allocatedSlot = 
createAllocatedSlot(ResourceID.generate());
+
+        slotPool.addSlots(Collections.singleton(allocatedSlot), 0);
+
+        assertTrue(slotPool.containsFreeSlot(allocatedSlot.getAllocationId()));
+    }
+
+    @Test
+    public void testContainsFreeSlotReturnsFalseIfSlotDoesNotExist() {
+        final DefaultAllocatedSlotPool slotPool = new 
DefaultAllocatedSlotPool();
+
+        assertFalse(slotPool.containsFreeSlot(new AllocationID()));
+    }
+
+    @Test
+    public void testContainsFreeSlotReturnsFalseIfSlotIsReserved() {
+        final DefaultAllocatedSlotPool slotPool = new 
DefaultAllocatedSlotPool();
+        final AllocatedSlot allocatedSlot = 
createAllocatedSlot(ResourceID.generate());
+
+        slotPool.addSlots(Collections.singleton(allocatedSlot), 0);
+        slotPool.reserveFreeSlot(allocatedSlot.getAllocationId());
+
+        
assertFalse(slotPool.containsFreeSlot(allocatedSlot.getAllocationId()));
+    }
+
     private void assertSlotPoolContainsSlots(
             DefaultAllocatedSlotPool slotPool, Collection<AllocatedSlot> 
slots) {
         assertThat(slotPool.getAllSlotsInformation(), hasSize(slots.size()));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index 40a5687..a22a877 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -71,6 +71,8 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
     private final Function<ResourceID, Boolean> containsSlotsFunction;
 
+    private final Function<AllocationID, Boolean> containsFreeSlotFunction;
+
     private final LongConsumer releaseIdleSlotsConsumer;
 
     private final Consumer<ResourceCounter> setResourceRequirementsConsumer;
@@ -93,6 +95,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
             BiFunction<AllocationID, ResourceProfile, PhysicalSlot> 
reserveFreeSlotFunction,
             TriFunction<AllocationID, Throwable, Long, ResourceCounter> 
freeReservedSlotFunction,
             Function<ResourceID, Boolean> containsSlotsFunction,
+            Function<AllocationID, Boolean> containsFreeSlotFunction,
             LongConsumer releaseIdleSlotsConsumer,
             Consumer<ResourceCounter> setResourceRequirementsConsumer) {
         this.increaseResourceRequirementsByConsumer = 
increaseResourceRequirementsByConsumer;
@@ -106,6 +109,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
         this.reserveFreeSlotFunction = reserveFreeSlotFunction;
         this.freeReservedSlotFunction = freeReservedSlotFunction;
         this.containsSlotsFunction = containsSlotsFunction;
+        this.containsFreeSlotFunction = containsFreeSlotFunction;
         this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer;
         this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
     }
@@ -151,6 +155,11 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
     }
 
     @Override
+    public boolean containsFreeSlot(AllocationID allocationId) {
+        return containsFreeSlotFunction.apply(allocationId);
+    }
+
+    @Override
     public ResourceCounter releaseSlots(ResourceID owner, Exception cause) {
         return releaseSlotsFunction.apply(owner, cause);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index 7982858..e2ecb34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -68,6 +68,7 @@ public class TestingDeclarativeSlotPoolBuilder {
     private Function<ResourceID, Boolean> containsSlotsFunction = ignored -> 
false;
     private LongConsumer returnIdleSlotsConsumer = ignored -> {};
     private Consumer<ResourceCounter> setResourceRequirementsConsumer = 
ignored -> {};
+    private Function<AllocationID, Boolean> containsFreeSlotFunction = ignored 
-> false;
 
     public TestingDeclarativeSlotPoolBuilder 
setIncreaseResourceRequirementsByConsumer(
             Consumer<ResourceCounter> increaseResourceRequirementsByConsumer) {
@@ -148,6 +149,12 @@ public class TestingDeclarativeSlotPoolBuilder {
         return this;
     }
 
+    public TestingDeclarativeSlotPoolBuilder setContainsFreeSlotFunction(
+            Function<AllocationID, Boolean> containsFreeSlotFunction) {
+        this.containsFreeSlotFunction = containsFreeSlotFunction;
+        return this;
+    }
+
     public TestingDeclarativeSlotPoolBuilder setReturnIdleSlotsConsumer(
             LongConsumer returnIdleSlotsConsumer) {
         this.returnIdleSlotsConsumer = returnIdleSlotsConsumer;
@@ -167,6 +174,7 @@ public class TestingDeclarativeSlotPoolBuilder {
                 reserveFreeSlotFunction,
                 freeReservedSlotFunction,
                 containsSlotsFunction,
+                containsFreeSlotFunction,
                 returnIdleSlotsConsumer,
                 setResourceRequirementsConsumer);
     }

Reply via email to