This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new c71d880dd01 [FLINK-31297][Runtime] Use processResourceRequirements to 
allocate task manager in FineGrainedSlotManagerTest.
c71d880dd01 is described below

commit c71d880dd01bddf926b4cebc33beb6c92b9aa25d
Author: Weihua Hu <[email protected]>
AuthorDate: Thu Mar 2 23:27:28 2023 +0800

    [FLINK-31297][Runtime] Use processResourceRequirements to allocate task 
manager in FineGrainedSlotManagerTest.
    
    We use TaskManagerTracker.addPendingTaskManager to allocate task manager
    in FineGrainedSlotManagerTest. This make the resource requirements
    different between FineGrainedSlotManager and TaskManagerTracker. And the
    FineGrainedSlotManager may trigger resource recycling after the 
requirementCheckDelay.
    
    So we use processResourceRequirements to allocate task manager. And set
    the requirementCheckDelay to 0 to stabilize this test.
    
    (cherry picked from commit bec6a4589703bb7619cfc04bf69822995b49893f)
---
 .../slotmanager/FineGrainedSlotManagerTest.java    | 30 +++++++++++++++++-----
 .../FineGrainedSlotManagerTestBase.java            |  6 -----
 2 files changed, 24 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index f4cd9f7f1d0..2a41ab14dbf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -208,6 +208,26 @@ class FineGrainedSlotManagerTest extends 
FineGrainedSlotManagerTestBase {
                                 new AllocationID(), 
DEFAULT_SLOT_RESOURCE_PROFILE));
         new Context() {
             {
+                
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
+                        (jobRequirements, ignore) -> {
+                            assertThat(jobRequirements).hasSize(1);
+                            JobID jobID = 
jobRequirements.keySet().stream().findFirst().get();
+                            ResourceAllocationResult.Builder builder =
+                                    ResourceAllocationResult.builder();
+                            PendingTaskManager pendingTaskManager =
+                                    new PendingTaskManager(
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_NUM_SLOTS_PER_WORKER);
+                            
builder.addPendingTaskManagerAllocate(pendingTaskManager);
+                            builder.addAllocationOnPendingResource(
+                                    jobID,
+                                    
pendingTaskManager.getPendingTaskManagerId(),
+                                    DEFAULT_SLOT_RESOURCE_PROFILE);
+                            return builder.build();
+                        });
+
+                
slotManagerConfigurationBuilder.setRequirementCheckDelay(Duration.ZERO);
+
                 runTest(
                         () -> {
                             final 
CompletableFuture<SlotManager.RegistrationResult>
@@ -218,11 +238,9 @@ class FineGrainedSlotManagerTest extends 
FineGrainedSlotManagerTestBase {
                                     registerTaskManagerFuture3 = new 
CompletableFuture<>();
                             runInMainThread(
                                     () -> {
-                                        getTaskManagerTracker()
-                                                .addPendingTaskManager(
-                                                        new PendingTaskManager(
-                                                                
DEFAULT_TOTAL_RESOURCE_PROFILE,
-                                                                
DEFAULT_NUM_SLOTS_PER_WORKER));
+                                        getSlotManager()
+                                                .processResourceRequirements(
+                                                        
createResourceRequirementsForSingleSlot());
                                         // task manager with allocated slot 
cannot deduct pending
                                         // task manager
                                         registerTaskManagerFuture1.complete(
@@ -576,7 +594,7 @@ class FineGrainedSlotManagerTest extends 
FineGrainedSlotManagerTestBase {
                             }
                             return ResourceAllocationResult.builder().build();
                         });
-                setRequirementCheckDelay(requirementCheckDelay);
+                
slotManagerConfigurationBuilder.setRequirementCheckDelay(requirementCheckDelay);
                 runTest(
                         () -> {
                             final ResourceRequirements resourceRequirements1 =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index f2e8e4f9d76..d63da233b47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -44,7 +44,6 @@ import org.apache.flink.util.function.RunnableWithException;
 
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
@@ -152,7 +151,6 @@ abstract class FineGrainedSlotManagerTestBase {
                 new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
         private final Executor mainThreadExecutor = 
EXECUTOR_RESOURCE.getExecutor();
         private FineGrainedSlotManager slotManager;
-        private Duration requirementCheckDelay = Duration.ZERO;
 
         final TestingResourceAllocationStrategy.Builder 
resourceAllocationStrategyBuilder =
                 TestingResourceAllocationStrategy.newBuilder();
@@ -181,10 +179,6 @@ abstract class FineGrainedSlotManagerTestBase {
             return resourceManagerId;
         }
 
-        public void setRequirementCheckDelay(Duration requirementCheckDelay) {
-            this.requirementCheckDelay = requirementCheckDelay;
-        }
-
         public void setSlotManagerMetricGroup(SlotManagerMetricGroup 
slotManagerMetricGroup) {
             this.slotManagerMetricGroup = slotManagerMetricGroup;
         }

Reply via email to