tillrohrmann commented on a change in pull request #15611:
URL: https://github.com/apache/flink/pull/15611#discussion_r613181429
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
##########
@@ -91,6 +92,15 @@ void start(
/** Suspends the component. This clears the internal state of the slot
manager. */
void suspend();
+ /**
+ * Notifies the slot manager that the resource requirements for the given
should job should be
Review comment:
```suggestion
* Notifies the slot manager that the resource requirements for the
given job should be
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
##########
@@ -1407,16 +1404,52 @@ public void
testReclaimInactiveSlotsOnEmptyRequirements() throws Exception {
slotManager.processResourceRequirements(createResourceRequirements(jobId, 2));
assertThat(freeInactiveSlotsJobIdFuture.isDone(), is(false));
- // decrease requirements, which should not trigger slots being
reclaimed
-
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+ // set requirements to 0, which should not trigger slots being
reclaimed
+
slotManager.processResourceRequirements(ResourceRequirements.empty(jobId,
"foobar"));
assertThat(freeInactiveSlotsJobIdFuture.isDone(), is(false));
// clear requirements, which should trigger slots being reclaimed
-
slotManager.processResourceRequirements(ResourceRequirements.empty(jobId,
"foobar"));
+ slotManager.clearResourceRequirements(jobId);
assertThat(freeInactiveSlotsJobIdFuture.get(), is(jobId));
}
}
+ @Test
+ public void testClearRequirements() throws Exception {
Review comment:
Maybe: `testClearRequirementsAlsoClearsResourceTracker`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -254,10 +262,6 @@ public void
processResourceRequirements(ResourceRequirements resourceRequirement
resourceRequirements.getResourceRequirements());
if (resourceRequirements.getResourceRequirements().isEmpty()) {
- jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
-
- maybeReclaimInactiveSlots(resourceRequirements.getJobId());
- } else {
jobMasterTargetAddresses.put(
resourceRequirements.getJobId(),
resourceRequirements.getTargetAddress());
Review comment:
Why are we only adding the `JobMasterTargetAddress` if the resource
requirements are empty?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -245,6 +245,14 @@ public void close() throws Exception {
// Public API
//
---------------------------------------------------------------------------------------------
+ @Override
+ public void clearResourceRequirements(JobID jobId) {
Review comment:
I think this method will only be called by tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]