wanglijie95 commented on code in PR #20218:
URL: https://github.com/apache/flink/pull/20218#discussion_r925391448


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java:
##########
@@ -150,4 +150,31 @@ void testUnfulfillableRequirement() {
         assertThat(result.getUnfulfillableJobs()).containsExactly(jobId);
         assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
     }
+
+    /** Tests that blocked task manager cannot fulfill requirements. */
+    @Test
+    void testBlockedTaskManagerCannotFulfillRequirements() {
+        final TaskManagerInfo taskManager =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE);
+        final JobID jobId = new JobID();
+        final List<ResourceRequirement> requirements = new ArrayList<>();
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(() -> 
Collections.singleton(taskManager))
+                        .build();
+        requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 
10));
+
+        final ResourceAllocationResult result =
+                STRATEGY.tryFulfillRequirements(
+                        Collections.singletonMap(jobId, requirements),
+                        taskManagerResourceInfoProvider,
+                        
taskManager.getTaskExecutorConnection().getResourceID()::equals);
+
+        assertThat(result.getUnfulfillableJobs()).isEmpty();
+        
assertThat(result.getAllocationsOnRegisteredResources().keySet()).isEmpty();
+        assertThat(result.getPendingTaskManagersToAllocate()).hasSize(2);

Review Comment:
   Because the number of slots per task manager(`NUM_OF_SLOTS`) is 5. I have 
changed this test to be easier to understand.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -207,6 +208,49 @@ void 
testRequirementDeclarationWithoutFreeSlotsTriggersWorkerAllocation() throws
         }
     }
 
+    /**
+     * Tests that blocked slots cannot be used to fulfill requirements, will 
trigger the new
+     * resource allocation.
+     */
+    @Test
+    void testRequirementDeclarationWithBlockedSlotsTriggersWorkerAllocation() 
throws Exception {
+

Review Comment:
   Fixed



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java:
##########
@@ -204,6 +204,49 @@ private void 
testRequirementDeclaration(RequirementDeclarationScenario scenario)
         };
     }
 
+    /**
+     * Tests that blocked slots cannot be used to fulfill requirements, will 
trigger the new
+     * resource allocation.
+     */
+    @Test
+    void testRequirementDeclarationWithBlockedSlotsTriggersWorkerAllocation() 
throws Exception {
+

Review Comment:
   Fixed



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java:
##########
@@ -79,11 +80,27 @@ public interface SlotManager extends AutoCloseable {
      * @param newResourceManagerId to use for communication with the task 
managers
      * @param newMainThreadExecutor to use to run code in the 
ResourceManager's main thread
      * @param newResourceActions to use for resource (de-)allocations
+     * @param newBlockedTaskManagerChecker to query whether a task manager is 
blocked
      */
     void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions);
+            ResourceActions newResourceActions,
+            BlockedTaskManagerChecker newBlockedTaskManagerChecker);
+
+    /**
+     * Starts the slot manager with the given leader id and resource manager 
actions.
+     *
+     * @param newResourceManagerId to use for communication with the task 
managers
+     * @param newMainThreadExecutor to use to run code in the 
ResourceManager's main thread
+     * @param newResourceActions to use for resource (de-)allocations
+     */
+    default void start(

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to