tillrohrmann commented on a change in pull request #15611:
URL: https://github.com/apache/flink/pull/15611#discussion_r613181429



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
##########
@@ -91,6 +92,15 @@ void start(
     /** Suspends the component. This clears the internal state of the slot 
manager. */
     void suspend();
 
+    /**
+     * Notifies the slot manager that the resource requirements for the given 
should job should be

Review comment:
       ```suggestion
        * Notifies the slot manager that the resource requirements for the 
given job should be
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
##########
@@ -1407,16 +1404,52 @@ public void 
testReclaimInactiveSlotsOnEmptyRequirements() throws Exception {
             
slotManager.processResourceRequirements(createResourceRequirements(jobId, 2));
             assertThat(freeInactiveSlotsJobIdFuture.isDone(), is(false));
 
-            // decrease requirements, which should not trigger slots being 
reclaimed
-            
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+            // set requirements to 0, which should not trigger slots being 
reclaimed
+            
slotManager.processResourceRequirements(ResourceRequirements.empty(jobId, 
"foobar"));
             assertThat(freeInactiveSlotsJobIdFuture.isDone(), is(false));
 
             // clear requirements, which should trigger slots being reclaimed
-            
slotManager.processResourceRequirements(ResourceRequirements.empty(jobId, 
"foobar"));
+            slotManager.clearResourceRequirements(jobId);
             assertThat(freeInactiveSlotsJobIdFuture.get(), is(jobId));
         }
     }
 
+    @Test
+    public void testClearRequirements() throws Exception {

Review comment:
       Maybe: `testClearRequirementsAlsoClearsResourceTracker`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -254,10 +262,6 @@ public void 
processResourceRequirements(ResourceRequirements resourceRequirement
                 resourceRequirements.getResourceRequirements());
 
         if (resourceRequirements.getResourceRequirements().isEmpty()) {
-            jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
-
-            maybeReclaimInactiveSlots(resourceRequirements.getJobId());
-        } else {
             jobMasterTargetAddresses.put(
                     resourceRequirements.getJobId(), 
resourceRequirements.getTargetAddress());

Review comment:
       Why are we only adding the `JobMasterTargetAddress` if the resource 
requirements are empty?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -245,6 +245,14 @@ public void close() throws Exception {
     // Public API
     // 
---------------------------------------------------------------------------------------------
 
+    @Override
+    public void clearResourceRequirements(JobID jobId) {

Review comment:
       I think this method will only be called by tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to