This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e98d251 [FLINK-20901] Add DeclarativeSlotPool.setResourceRequirements
e98d251 is described below
commit e98d251acaa5cb73afca50571d7f9ff7ff4f9e36
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Jan 8 15:49:29 2021 +0100
[FLINK-20901] Add DeclarativeSlotPool.setResourceRequirements
This commits adds DeclarativeSlotPool.setResourceRequirements which sets
the absolutely
required resources. Hence, this method can be used to overwrite the
currently set
resource requirements.
This closes #14589.
---
.../jobmaster/slotpool/DeclarativeSlotPool.java | 7 ++++++
.../slotpool/DefaultDeclarativeSlotPool.java | 7 ++++++
.../slotpool/DefaultDeclarativeSlotPoolTest.java | 29 ++++++++++++++++++++++
.../slotpool/TestingDeclarativeSlotPool.java | 11 +++++++-
.../TestingDeclarativeSlotPoolBuilder.java | 10 +++++++-
5 files changed, 62 insertions(+), 2 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
index 81dc05e..656a2ef 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
@@ -56,6 +56,13 @@ public interface DeclarativeSlotPool {
void decreaseResourceRequirementsBy(ResourceCounter decrement);
/**
+ * Sets the resource requirements to the given resourceRequirements.
+ *
+ * @param resourceRequirements new resource requirements
+ */
+ void setResourceRequirements(ResourceCounter resourceRequirements);
+
+ /**
* Returns the current resource requirements.
*
* @return current resource requirements
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index bd8a27a..279992b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -124,6 +124,13 @@ public class DefaultDeclarativeSlotPool implements
DeclarativeSlotPool {
declareResourceRequirements();
}
+ @Override
+ public void setResourceRequirements(ResourceCounter resourceRequirements) {
+ totalResourceRequirements = resourceRequirements;
+
+ declareResourceRequirements();
+ }
+
private void declareResourceRequirements() {
final Collection<ResourceRequirement> resourceRequirements =
getResourceRequirements();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index 602aca8..60046cf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -515,6 +515,35 @@ public class DefaultDeclarativeSlotPoolTest extends
TestLogger {
hasItems(ResourceRequirement.create(largeResourceProfile, 2)));
}
+ @Test
+ public void testSetResourceRequirementsForInitialResourceRequirements() {
+ final DefaultDeclarativeSlotPool slotPool = new
DefaultDeclarativeSlotPoolBuilder().build();
+
+ final ResourceCounter resourceRequirements =
+ ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
+
+ slotPool.setResourceRequirements(resourceRequirements);
+
+ assertThat(
+ slotPool.getResourceRequirements(),
+ is(toResourceRequirements(resourceRequirements)));
+ }
+
+ @Test
+ public void testSetResourceRequirementsOverwritesPreviousValue() {
+ final DefaultDeclarativeSlotPool slotPool = new
DefaultDeclarativeSlotPoolBuilder().build();
+
+
slotPool.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1,
1));
+
+ final ResourceCounter resourceRequirements =
+ ResourceCounter.withResource(RESOURCE_PROFILE_2, 1);
+ slotPool.setResourceRequirements(resourceRequirements);
+
+ assertThat(
+ slotPool.getResourceRequirements(),
+ is(toResourceRequirements(resourceRequirements)));
+ }
+
@Nonnull
private static ResourceCounter createResourceRequirements() {
final Map<ResourceProfile, Integer> requirements = new HashMap<>();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
index 4828bf2..a2bac2b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java
@@ -72,6 +72,8 @@ final class TestingDeclarativeSlotPool implements
DeclarativeSlotPool {
private final LongConsumer releaseIdleSlotsConsumer;
+ private final Consumer<ResourceCounter> setResourceRequirementsConsumer;
+
TestingDeclarativeSlotPool(
Consumer<ResourceCounter> increaseResourceRequirementsByConsumer,
Consumer<ResourceCounter> decreaseResourceRequirementsByConsumer,
@@ -90,7 +92,8 @@ final class TestingDeclarativeSlotPool implements
DeclarativeSlotPool {
BiFunction<AllocationID, ResourceProfile, PhysicalSlot>
reserveFreeSlotFunction,
TriFunction<AllocationID, Throwable, Long, ResourceCounter>
freeReservedSlotFunction,
Function<ResourceID, Boolean> containsSlotsFunction,
- LongConsumer releaseIdleSlotsConsumer) {
+ LongConsumer releaseIdleSlotsConsumer,
+ Consumer<ResourceCounter> setResourceRequirementsConsumer) {
this.increaseResourceRequirementsByConsumer =
increaseResourceRequirementsByConsumer;
this.decreaseResourceRequirementsByConsumer =
decreaseResourceRequirementsByConsumer;
this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
@@ -103,6 +106,7 @@ final class TestingDeclarativeSlotPool implements
DeclarativeSlotPool {
this.freeReservedSlotFunction = freeReservedSlotFunction;
this.containsSlotsFunction = containsSlotsFunction;
this.releaseIdleSlotsConsumer = releaseIdleSlotsConsumer;
+ this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
}
@Override
@@ -116,6 +120,11 @@ final class TestingDeclarativeSlotPool implements
DeclarativeSlotPool {
}
@Override
+ public void setResourceRequirements(ResourceCounter resourceRequirements) {
+ setResourceRequirementsConsumer.accept(resourceRequirements);
+ }
+
+ @Override
public Collection<ResourceRequirement> getResourceRequirements() {
return getResourceRequirementsSupplier.get();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
index b1bdfc4..84cf951 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java
@@ -66,6 +66,7 @@ public class TestingDeclarativeSlotPoolBuilder {
(ignoredA, ignoredB, ignoredC) -> ResourceCounter.empty();
private Function<ResourceID, Boolean> containsSlotsFunction = ignored ->
false;
private LongConsumer returnIdleSlotsConsumer = ignored -> {};
+ private Consumer<ResourceCounter> setResourceRequirementsConsumer =
ignored -> {};
public TestingDeclarativeSlotPoolBuilder
setIncreaseResourceRequirementsByConsumer(
Consumer<ResourceCounter> increaseResourceRequirementsByConsumer) {
@@ -79,6 +80,12 @@ public class TestingDeclarativeSlotPoolBuilder {
return this;
}
+ public TestingDeclarativeSlotPoolBuilder
setSetResourceRequirementsConsumer(
+ Consumer<ResourceCounter> setResourceRequirementsConsumer) {
+ this.setResourceRequirementsConsumer = setResourceRequirementsConsumer;
+ return this;
+ }
+
public TestingDeclarativeSlotPoolBuilder
setGetResourceRequirementsSupplier(
Supplier<Collection<ResourceRequirement>>
getResourceRequirementsSupplier) {
this.getResourceRequirementsSupplier = getResourceRequirementsSupplier;
@@ -159,6 +166,7 @@ public class TestingDeclarativeSlotPoolBuilder {
reserveFreeSlotFunction,
freeReservedSlotFunction,
containsSlotsFunction,
- returnIdleSlotsConsumer);
+ returnIdleSlotsConsumer,
+ setResourceRequirementsConsumer);
}
}