This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e98d251  [FLINK-20901] Add DeclarativeSlotPool.setResourceRequirements
e98d251 is described below

commit e98d251acaa5cb73afca50571d7f9ff7ff4f9e36
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Jan 8 15:49:29 2021 +0100

    [FLINK-20901] Add DeclarativeSlotPool.setResourceRequirements
    
    This commits adds DeclarativeSlotPool.setResourceRequirements which sets 
the absolutely
    required resources. Hence, this method can be used to overwrite the 
currently set
    resource requirements.
    
    This closes #14589.
---
 .../jobmaster/slotpool/DeclarativeSlotPool.java    |  7 ++++++
 .../slotpool/DefaultDeclarativeSlotPool.java       |  7 ++++++
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   | 29 ++++++++++++++++++++++
 .../slotpool/TestingDeclarativeSlotPool.java       | 11 +++++++-
 .../TestingDeclarativeSlotPoolBuilder.java         | 10 +++++++-
 5 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
index 81dc05e..656a2ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
@@ -56,6 +56,13 @@ public interface DeclarativeSlotPool {
     void decreaseResourceRequirementsBy(ResourceCounter decrement);
 
     /**
+     * Sets the resource requirements to the given resourceRequirements.
+     *
+     * @param resourceRequirements new resource requirements
+     */
+    void setResourceRequirements(ResourceCounter resourceRequirements);
+
+    /**
      * Returns the current resource requirements.
      *
      * @return current resource requirements
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index bd8a27a..279992b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -124,6 +124,13 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
         declareResourceRequirements();
     }
 
+    @Override
+    public void setResourceRequirements(ResourceCounter resourceRequirements) {
+        totalResourceRequirements = resourceRequirements;
+
+        declareResourceRequirements();
+    }
+
     private void declareResourceRequirements() {
         final Collection<ResourceRequirement> resourceRequirements = 
getResourceRequirements();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index 602aca8..60046cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -515,6 +515,35 @@ public class DefaultDeclarativeSlotPoolTest extends 
TestLogger {
                 hasItems(ResourceRequirement.create(largeResourceProfile, 2)));
     }
 
+    @Test
+    public void testSetResourceRequirementsForInitialResourceRequirements() {
+        final DefaultDeclarativeSlotPool slotPool = new 
DefaultDeclarativeSlotPoolBuilder().build();
+
+        final ResourceCounter resourceRequirements =
+                ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
+
+        slotPool.setResourceRequirements(resourceRequirements);
+
+        assertThat(
+                slotPool.getResourceRequirements(),
+                is(toResourceRequirements(resourceRequirements)));
+    }
+
+    @Test
+    public void testSetResourceRequirementsOverwritesPreviousValue() {
+        final DefaultDeclarativeSlotPool slotPool = new 
DefaultDeclarativeSlotPoolBuilder().build();
+
+        
slotPool.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1,
 1));
+
+        final ResourceCounter resourceRequirements =
+                ResourceCounter.withResource(RESOURCE_PROFILE_2, 1);
+        slotPool.setResourceRequirements(resourceRequirements);
+
+        assertThat(
+                slotPool.getResourceRequirements(),
+                is(toResourceRequirements(resourceRequirements)));
+    }
+
     @Nonnull
     private static ResourceCounter createResourceRequirements() {
         final Map<ResourceProfile, Integer> requirements = new HashMap<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index 4828bf2..a2bac2b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -72,6 +72,8 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
     private final LongConsumer releaseIdleSlotsConsumer;
 
+    private final Consumer<ResourceCounter> setResourceRequirementsConsumer;
+
     TestingDeclarativeSlotPool(
             Consumer<ResourceCounter> increaseResourceRequirementsByConsumer,
             Consumer<ResourceCounter> decreaseResourceRequirementsByConsumer,
@@ -90,7 +92,8 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
             BiFunction<AllocationID, ResourceProfile, PhysicalSlot> 
reserveFreeSlotFunction,
             TriFunction<AllocationID, Throwable, Long, ResourceCounter> 
freeReservedSlotFunction,
             Function<ResourceID, Boolean> containsSlotsFunction,
-            LongConsumer releaseIdleSlotsConsumer) {
+            LongConsumer releaseIdleSlotsConsumer,
+            Consumer<ResourceCounter> setResourceRequirementsConsumer) {
         this.increaseResourceRequirementsByConsumer = 
increaseResourceRequirementsByConsumer;
         this.decreaseResourceRequirementsByConsumer = 
decreaseResourceRequirementsByConsumer;
         this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
@@ -103,6 +106,7 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
         this.freeReservedSlotFunction = freeReservedSlotFunction;
         this.containsSlotsFunction = containsSlotsFunction;
         this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer;
+        this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
     }
 
     @Override
@@ -116,6 +120,11 @@ final class TestingDeclarativeSlotPool implements 
DeclarativeSlotPool {
     }
 
     @Override
+    public void setResourceRequirements(ResourceCounter resourceRequirements) {
+        setResourceRequirementsConsumer.accept(resourceRequirements);
+    }
+
+    @Override
     public Collection<ResourceRequirement> getResourceRequirements() {
         return getResourceRequirementsSupplier.get();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index b1bdfc4..84cf951 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -66,6 +66,7 @@ public class TestingDeclarativeSlotPoolBuilder {
             (ignoredA, ignoredB, ignoredC) -> ResourceCounter.empty();
     private Function<ResourceID, Boolean> containsSlotsFunction = ignored -> 
false;
     private LongConsumer returnIdleSlotsConsumer = ignored -> {};
+    private Consumer<ResourceCounter> setResourceRequirementsConsumer = 
ignored -> {};
 
     public TestingDeclarativeSlotPoolBuilder 
setIncreaseResourceRequirementsByConsumer(
             Consumer<ResourceCounter> increaseResourceRequirementsByConsumer) {
@@ -79,6 +80,12 @@ public class TestingDeclarativeSlotPoolBuilder {
         return this;
     }
 
+    public TestingDeclarativeSlotPoolBuilder 
setSetResourceRequirementsConsumer(
+            Consumer<ResourceCounter> setResourceRequirementsConsumer) {
+        this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
+        return this;
+    }
+
     public TestingDeclarativeSlotPoolBuilder 
setGetResourceRequirementsSupplier(
             Supplier<Collection<ResourceRequirement>> 
getResourceRequirementsSupplier) {
         this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
@@ -159,6 +166,7 @@ public class TestingDeclarativeSlotPoolBuilder {
                 reserveFreeSlotFunction,
                 freeReservedSlotFunction,
                 containsSlotsFunction,
-                returnIdleSlotsConsumer);
+                returnIdleSlotsConsumer,
+                setResourceRequirementsConsumer);
     }
 }

Reply via email to