This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 65fda6e15a1 [FLINK-27921][Runtime] Introduce the 
checkResourceRequirementsWithDelay in DeclarativeSlotManager
65fda6e15a1 is described below

commit 65fda6e15a18eff8d81fbd60e8b4708527597c91
Author: Aitozi <[email protected]>
AuthorDate: Mon Jun 6 21:52:23 2022 +0800

    [FLINK-27921][Runtime] Introduce the checkResourceRequirementsWithDelay in 
DeclarativeSlotManager
    
    This closes #19886.
---
 .../configuration/ResourceManagerOptions.java      |  8 ++++
 .../ResourceManagerRuntimeServices.java            |  7 +--
 .../slotmanager/DeclarativeSlotManager.java        | 56 +++++++++++++++++++---
 .../slotmanager/FineGrainedSlotManager.java        | 37 +++++++-------
 .../slotmanager/SlotManagerConfiguration.java      | 13 +++++
 .../slotmanager/DeclarativeSlotManagerBuilder.java |  9 ++++
 .../slotmanager/DeclarativeSlotManagerTest.java    | 42 ++++++++++++++++
 .../slotmanager/FineGrainedSlotManagerTest.java    |  7 +--
 .../FineGrainedSlotManagerTestBase.java            |  8 ++--
 .../SlotManagerConfigurationBuilder.java           | 11 +++++
 10 files changed, 162 insertions(+), 36 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 7c07e9148fd..6f8c9ccdb94 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -142,6 +142,14 @@ public class ResourceManagerOptions {
                                     + START_WORKER_MAX_FAILURE_RATE.key()
                                     + "') is reached.");
 
+    @Documentation.ExcludeFromDocumentation(
+            "This is an expert option, that we do not want to expose in the 
documentation")
+    public static final ConfigOption<Duration> REQUIREMENTS_CHECK_DELAY =
+            ConfigOptions.key("slotmanager.requirement-check.delay")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(50))
+                    .withDescription("The delay of the resource requirements 
check.");
+
     /**
      * The timeout for a slot request to be discarded, in milliseconds.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
index d0d537daa69..98919a7eed4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
 import 
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager;
@@ -36,9 +35,6 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 /** Container class for the {@link ResourceManager} services. */
 public class ResourceManagerRuntimeServices {
-    // We currently make the delay of requirements check a constant time. This 
delay might be
-    // configurable by user in the future.
-    private static final long REQUIREMENTS_CHECK_DELAY_MS = 50L;
 
     private final SlotManager slotManager;
     private final JobLeaderIdService jobLeaderIdService;
@@ -93,8 +89,7 @@ public class ResourceManagerRuntimeServices {
                     new DefaultResourceAllocationStrategy(
                             
SlotManagerUtils.generateTaskManagerTotalResourceProfile(
                                     
slotManagerConfiguration.getDefaultWorkerResourceSpec()),
-                            slotManagerConfiguration.getNumSlotsPerWorker()),
-                    Time.milliseconds(REQUIREMENTS_CHECK_DELAY_MS));
+                            slotManagerConfiguration.getNumSlotsPerWorker()));
         } else {
             return new DeclarativeSlotManager(
                     scheduledExecutor,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 73f349feb82..f8d2130fac0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,6 +57,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -79,8 +81,14 @@ public class DeclarativeSlotManager implements SlotManager {
     private final Map<JobID, String> jobMasterTargetAddresses = new 
HashMap<>();
     private final Map<SlotID, AllocationID> pendingSlotAllocations;
 
+    /** Delay of the requirement change check in the slot manager. */
+    private final Duration requirementsCheckDelay;
+
     private boolean sendNotEnoughResourceNotifications = true;
 
+    /** Scheduled executor for timeouts. */
+    private final ScheduledExecutor scheduledExecutor;
+
     /** ResourceManager's id. */
     @Nullable private ResourceManagerId resourceManagerId;
 
@@ -90,6 +98,9 @@ public class DeclarativeSlotManager implements SlotManager {
     /** Callbacks for resource (de-)allocations. */
     @Nullable private ResourceActions resourceActions;
 
+    /** The future of the requirements delay check. */
+    @Nullable private CompletableFuture<Void> requirementsCheckFuture;
+
     /** True iff the component has been started. */
     private boolean started;
 
@@ -104,6 +115,8 @@ public class DeclarativeSlotManager implements SlotManager {
         this.taskManagerRequestTimeout = 
slotManagerConfiguration.getTaskManagerRequestTimeout();
         this.slotManagerMetricGroup = 
Preconditions.checkNotNull(slotManagerMetricGroup);
         this.resourceTracker = Preconditions.checkNotNull(resourceTracker);
+        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
+        this.requirementsCheckDelay = 
slotManagerConfiguration.getRequirementCheckDelay();
 
         pendingSlotAllocations = new HashMap<>(16);
 
@@ -162,7 +175,7 @@ public class DeclarativeSlotManager implements SlotManager {
         sendNotEnoughResourceNotifications = failUnfulfillableRequest;
 
         if (failUnfulfillableRequest) {
-            checkResourceRequirements();
+            checkResourceRequirementsWithDelay();
         }
     }
 
@@ -275,7 +288,7 @@ public class DeclarativeSlotManager implements SlotManager {
         }
         resourceTracker.notifyResourceRequirements(
                 resourceRequirements.getJobId(), 
resourceRequirements.getResourceRequirements());
-        checkResourceRequirements();
+        checkResourceRequirementsWithDelay();
     }
 
     private void maybeReclaimInactiveSlots(JobID jobId) {
@@ -341,7 +354,7 @@ public class DeclarativeSlotManager implements SlotManager {
                         slotStatus.getJobID());
             }
 
-            checkResourceRequirements();
+            checkResourceRequirementsWithDelay();
             return true;
         }
     }
@@ -355,7 +368,7 @@ public class DeclarativeSlotManager implements SlotManager {
         if (taskExecutorManager.isTaskManagerRegistered(instanceId)) {
             
slotTracker.removeSlots(taskExecutorManager.getSlotsOf(instanceId));
             taskExecutorManager.unregisterTaskExecutor(instanceId);
-            checkResourceRequirements();
+            checkResourceRequirementsWithDelay();
 
             return true;
         } else {
@@ -382,7 +395,7 @@ public class DeclarativeSlotManager implements SlotManager {
 
         if (taskExecutorManager.isTaskManagerRegistered(instanceId)) {
             if (slotTracker.notifySlotStatus(slotReport)) {
-                checkResourceRequirements();
+                checkResourceRequirementsWithDelay();
             }
             return true;
         } else {
@@ -407,13 +420,39 @@ public class DeclarativeSlotManager implements 
SlotManager {
         LOG.debug("Freeing slot {}.", slotId);
 
         slotTracker.notifyFree(slotId);
-        checkResourceRequirements();
+        checkResourceRequirementsWithDelay();
     }
 
     // 
---------------------------------------------------------------------------------------------
     // Requirement matching
     // 
---------------------------------------------------------------------------------------------
 
+    /**
+     * Depending on the implementation of {@link ResourceAllocationStrategy}, 
checking resource
+     * requirements and potentially making a re-allocation can be heavy. In 
order to cover more
+     * changes with each check, thus reduce the frequency of unnecessary 
re-allocations, the checks
+     * are performed with a slight delay.
+     */
+    private void checkResourceRequirementsWithDelay() {
+        if (requirementsCheckDelay.toMillis() <= 0) {
+            checkResourceRequirements();
+        } else {
+            if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
+                requirementsCheckFuture = new CompletableFuture<>();
+                scheduledExecutor.schedule(
+                        () ->
+                                mainThreadExecutor.execute(
+                                        () -> {
+                                            checkResourceRequirements();
+                                            
Preconditions.checkNotNull(requirementsCheckFuture)
+                                                    .complete(null);
+                                        }),
+                        requirementsCheckDelay.toMillis(),
+                        TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
     /**
      * Matches resource requirements against available resources. In a first 
round requirements are
      * matched against free slot, and any match results in a slot allocation. 
The remaining
@@ -439,6 +478,9 @@ public class DeclarativeSlotManager implements SlotManager {
      * absolute worst case, with J jobs, requiring R slots each with a unique 
resource profile such
      * each pair of these profiles is not matching, and S free/pending slots 
that don't fulfill any
      * requirement, then this method does a total of J*R*S resource profile 
comparisons.
+     *
+     * <p>DO NOT call this method directly. Use {@link 
#checkResourceRequirementsWithDelay()}
+     * instead.
      */
     private void checkResourceRequirements() {
         final Map<JobID, Collection<ResourceRequirement>> missingResources =
@@ -626,7 +668,7 @@ public class DeclarativeSlotManager implements SlotManager {
                                             throwable);
                                     slotTracker.notifyFree(slotId);
                                 }
-                                checkResourceRequirements();
+                                checkResourceRequirementsWithDelay();
                             }
                             return null;
                         },
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 289cc54d162..c7f4e5e4a1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -80,7 +81,7 @@ public class FineGrainedSlotManager implements SlotManager {
     private final Time taskManagerTimeout;
 
     /** Delay of the requirement change check in the slot manager. */
-    private final Time requirementsCheckDelay;
+    private final Duration requirementsCheckDelay;
 
     private final SlotManagerMetricGroup slotManagerMetricGroup;
 
@@ -121,8 +122,7 @@ public class FineGrainedSlotManager implements SlotManager {
             ResourceTracker resourceTracker,
             TaskManagerTracker taskManagerTracker,
             SlotStatusSyncer slotStatusSyncer,
-            ResourceAllocationStrategy resourceAllocationStrategy,
-            Time requirementCheckDelay) {
+            ResourceAllocationStrategy resourceAllocationStrategy) {
 
         this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
 
@@ -130,7 +130,8 @@ public class FineGrainedSlotManager implements SlotManager {
         this.taskManagerTimeout = 
slotManagerConfiguration.getTaskManagerTimeout();
         this.waitResultConsumedBeforeRelease =
                 slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
-        this.requirementsCheckDelay = 
Preconditions.checkNotNull(requirementCheckDelay);
+        this.requirementsCheckDelay =
+                
Preconditions.checkNotNull(slotManagerConfiguration.getRequirementCheckDelay());
 
         this.slotManagerMetricGroup = 
Preconditions.checkNotNull(slotManagerMetricGroup);
 
@@ -490,18 +491,22 @@ public class FineGrainedSlotManager implements 
SlotManager {
      * are performed with a slight delay.
      */
     private void checkResourceRequirementsWithDelay() {
-        if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
-            requirementsCheckFuture = new CompletableFuture<>();
-            scheduledExecutor.schedule(
-                    () ->
-                            mainThreadExecutor.execute(
-                                    () -> {
-                                        checkResourceRequirements();
-                                        
Preconditions.checkNotNull(requirementsCheckFuture)
-                                                .complete(null);
-                                    }),
-                    requirementsCheckDelay.toMilliseconds(),
-                    TimeUnit.MILLISECONDS);
+        if (requirementsCheckDelay.toMillis() <= 0) {
+            checkResourceRequirements();
+        } else {
+            if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
+                requirementsCheckFuture = new CompletableFuture<>();
+                scheduledExecutor.schedule(
+                        () ->
+                                mainThreadExecutor.execute(
+                                        () -> {
+                                            checkResourceRequirements();
+                                            
Preconditions.checkNotNull(requirementsCheckFuture)
+                                                    .complete(null);
+                                        }),
+                        requirementsCheckDelay.toMillis(),
+                        TimeUnit.MILLISECONDS);
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index 5703a5a173a..376c7070852 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -34,6 +34,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+
 /** Configuration for the {@link SlotManager}. */
 public class SlotManagerConfiguration {
 
@@ -42,6 +44,7 @@ public class SlotManagerConfiguration {
     private final Time taskManagerRequestTimeout;
     private final Time slotRequestTimeout;
     private final Time taskManagerTimeout;
+    private final Duration requirementCheckDelay;
     private final boolean waitResultConsumedBeforeRelease;
     private final SlotMatchingStrategy slotMatchingStrategy;
     private final WorkerResourceSpec defaultWorkerResourceSpec;
@@ -55,6 +58,7 @@ public class SlotManagerConfiguration {
             Time taskManagerRequestTimeout,
             Time slotRequestTimeout,
             Time taskManagerTimeout,
+            Duration requirementCheckDelay,
             boolean waitResultConsumedBeforeRelease,
             SlotMatchingStrategy slotMatchingStrategy,
             WorkerResourceSpec defaultWorkerResourceSpec,
@@ -67,6 +71,7 @@ public class SlotManagerConfiguration {
         this.taskManagerRequestTimeout = 
Preconditions.checkNotNull(taskManagerRequestTimeout);
         this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
         this.taskManagerTimeout = 
Preconditions.checkNotNull(taskManagerTimeout);
+        this.requirementCheckDelay = 
Preconditions.checkNotNull(requirementCheckDelay);
         this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
         this.slotMatchingStrategy = 
Preconditions.checkNotNull(slotMatchingStrategy);
         this.defaultWorkerResourceSpec = 
Preconditions.checkNotNull(defaultWorkerResourceSpec);
@@ -92,6 +97,10 @@ public class SlotManagerConfiguration {
         return taskManagerTimeout;
     }
 
+    public Duration getRequirementCheckDelay() {
+        return requirementCheckDelay;
+    }
+
     public boolean isWaitResultConsumedBeforeRelease() {
         return waitResultConsumedBeforeRelease;
     }
@@ -136,6 +145,9 @@ public class SlotManagerConfiguration {
                 Time.milliseconds(
                         
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
 
+        final Duration requirementCheckDelay =
+                
configuration.get(ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY);
+
         boolean waitResultConsumedBeforeRelease =
                 configuration.getBoolean(
                         
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
@@ -158,6 +170,7 @@ public class SlotManagerConfiguration {
                 rpcTimeout,
                 slotRequestTimeout,
                 taskManagerTimeout,
+                requirementCheckDelay,
                 waitResultConsumedBeforeRelease,
                 slotMatchingStrategy,
                 defaultWorkerResourceSpec,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index 4e6398456f9..1c3e51540fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -30,6 +30,7 @@ import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
+import java.time.Duration;
 import java.util.concurrent.Executor;
 
 /** Builder for {@link DeclarativeSlotManager}. */
@@ -47,6 +48,7 @@ public class DeclarativeSlotManagerBuilder {
     private int redundantTaskManagerNum;
     private ResourceTracker resourceTracker;
     private SlotTracker slotTracker;
+    private Duration requirementCheckDelay;
 
     private DeclarativeSlotManagerBuilder(ScheduledExecutor scheduledExecutor) 
{
         this.slotMatchingStrategy = AnyMatchingSlotMatchingStrategy.INSTANCE;
@@ -64,6 +66,7 @@ public class DeclarativeSlotManagerBuilder {
                 
ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
         this.resourceTracker = new DefaultResourceTracker();
         this.slotTracker = new DefaultSlotTracker();
+        this.requirementCheckDelay = Duration.ZERO;
     }
 
     public static DeclarativeSlotManagerBuilder newBuilder(ScheduledExecutor 
scheduledExecutor) {
@@ -135,12 +138,18 @@ public class DeclarativeSlotManagerBuilder {
         return this;
     }
 
+    public DeclarativeSlotManagerBuilder setRequirementCheckDelay(Duration 
requirementCheckDelay) {
+        this.requirementCheckDelay = requirementCheckDelay;
+        return this;
+    }
+
     public DeclarativeSlotManager build() {
         final SlotManagerConfiguration slotManagerConfiguration =
                 new SlotManagerConfiguration(
                         taskManagerRequestTimeout,
                         slotRequestTimeout,
                         taskManagerTimeout,
+                        requirementCheckDelay,
                         waitResultConsumedBeforeRelease,
                         slotMatchingStrategy,
                         defaultWorkerResourceSpec,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 5eb6cf7a264..75cfe80d95f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -60,7 +60,9 @@ import 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -74,6 +76,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1424,6 +1427,45 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
         }
     }
 
+    @Test
+    public void testProcessResourceRequirementsWithDelay() throws Exception {
+        final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final AtomicInteger allocatedResourceCounter = new AtomicInteger(0);
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final Duration delay = Duration.ofMillis(500);
+        try (final DeclarativeSlotManager slotManager =
+                createDeclarativeSlotManagerBuilder(scheduledExecutor)
+                        .setResourceTracker(resourceTracker)
+                        .setRequirementCheckDelay(delay)
+                        .buildAndStartWithDirectExec(
+                                ResourceManagerId.generate(),
+                                new TestingResourceActionsBuilder()
+                                        .setAllocateResourceConsumer(
+                                                workerResourceSpec ->
+                                                        
allocatedResourceCounter.getAndIncrement())
+                                        .build())) {
+
+            final JobID jobId = new JobID();
+
+            
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+            Assertions.assertEquals(0, allocatedResourceCounter.get());
+            Assertions.assertEquals(
+                    1, 
scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+            final ScheduledFuture<?> future =
+                    
scheduledExecutor.getActiveNonPeriodicScheduledTask().iterator().next();
+            Assertions.assertEquals(delay.toMillis(), 
future.getDelay(TimeUnit.MILLISECONDS));
+
+            // the second request is skipped
+            
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+            Assertions.assertEquals(
+                    1, 
scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+
+            scheduledExecutor.triggerNonPeriodicScheduledTask();
+            Assertions.assertEquals(1, allocatedResourceCounter.get());
+        }
+    }
+
     @Test
     public void testClearRequirementsClearsResourceTracker() throws Exception {
         final ResourceTracker resourceTracker = new DefaultResourceTracker();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 3357e3531b4..b06b024a208 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.util.function.ThrowingConsumer;
 import org.junit.Test;
 
 import java.math.BigDecimal;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -576,7 +577,7 @@ public class FineGrainedSlotManagerTest extends 
FineGrainedSlotManagerTestBase {
                 final List<CompletableFuture<Void>> checkRequirementFutures = 
new ArrayList<>();
                 checkRequirementFutures.add(new CompletableFuture<>());
                 checkRequirementFutures.add(new CompletableFuture<>());
-                final long requirementCheckDelay = 50;
+                final Duration requirementCheckDelay = Duration.ofMillis(50);
                 
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
                         (ignored1, ignored2) -> {
                             if (checkRequirementFutures.get(0).isDone()) {
@@ -619,13 +620,13 @@ public class FineGrainedSlotManagerTest extends 
FineGrainedSlotManagerTestBase {
                             final long registrationTime = (System.nanoTime() - 
start) / 1_000_000;
                             assumeTrue(
                                     "The time of process requirement and 
register task manager must not take longer than the requirement check delay. If 
it does, then this indicates a very slow machine.",
-                                    registrationTime < requirementCheckDelay);
+                                    registrationTime < 
requirementCheckDelay.toMillis());
 
                             
assertFutureCompleteAndReturn(checkRequirementFutures.get(0));
                             
assertFutureNotComplete(checkRequirementFutures.get(1));
 
                             // checkTimes will not increase when there's no 
events
-                            Thread.sleep(requirementCheckDelay * 2);
+                            Thread.sleep(requirementCheckDelay.toMillis() * 2);
                             
assertFutureNotComplete(checkRequirementFutures.get(1));
 
                             // checkTimes will increase again if there's 
another
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 1013f8919cc..700c8f0f3c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -44,6 +44,7 @@ import org.apache.flink.util.function.RunnableWithException;
 
 import org.junit.ClassRule;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
@@ -150,7 +151,7 @@ public abstract class FineGrainedSlotManagerTestBase 
extends TestLogger {
                 new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
         private final Executor mainThreadExecutor = 
EXECUTOR_RESOURCE.getExecutor();
         private FineGrainedSlotManager slotManager;
-        private long requirementCheckDelay = 0;
+        private Duration requirementCheckDelay = Duration.ZERO;
 
         final TestingResourceAllocationStrategy.Builder 
resourceAllocationStrategyBuilder =
                 TestingResourceAllocationStrategy.newBuilder();
@@ -176,7 +177,7 @@ public abstract class FineGrainedSlotManagerTestBase 
extends TestLogger {
             return resourceManagerId;
         }
 
-        public void setRequirementCheckDelay(long requirementCheckDelay) {
+        public void setRequirementCheckDelay(Duration requirementCheckDelay) {
             this.requirementCheckDelay = requirementCheckDelay;
         }
 
@@ -208,8 +209,7 @@ public abstract class FineGrainedSlotManagerTestBase 
extends TestLogger {
                             taskManagerTracker,
                             slotStatusSyncer,
                             getResourceAllocationStrategy()
-                                    
.orElse(resourceAllocationStrategyBuilder.build()),
-                            Time.milliseconds(requirementCheckDelay));
+                                    
.orElse(resourceAllocationStrategyBuilder.build()));
             runInMainThreadAndWait(
                     () ->
                             slotManager.start(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
index 9f8035e497c..f87ce6238cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
@@ -25,11 +25,14 @@ import 
org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.testutils.TestingUtils;
 
+import java.time.Duration;
+
 /** Builder for {@link SlotManagerConfiguration}. */
 public class SlotManagerConfigurationBuilder {
     private Time taskManagerRequestTimeout;
     private Time slotRequestTimeout;
     private Time taskManagerTimeout;
+    private Duration requirementCheckDelay;
     private boolean waitResultConsumedBeforeRelease;
     private WorkerResourceSpec defaultWorkerResourceSpec;
     private int numSlotsPerWorker;
@@ -42,6 +45,7 @@ public class SlotManagerConfigurationBuilder {
         this.taskManagerRequestTimeout = TestingUtils.infiniteTime();
         this.slotRequestTimeout = TestingUtils.infiniteTime();
         this.taskManagerTimeout = TestingUtils.infiniteTime();
+        this.requirementCheckDelay = 
ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY.defaultValue();
         this.waitResultConsumedBeforeRelease = true;
         this.defaultWorkerResourceSpec = WorkerResourceSpec.ZERO;
         this.numSlotsPerWorker = 1;
@@ -72,6 +76,12 @@ public class SlotManagerConfigurationBuilder {
         return this;
     }
 
+    public SlotManagerConfigurationBuilder setRequirementCheckDelay(
+            Duration requirementCheckDelay) {
+        this.requirementCheckDelay = requirementCheckDelay;
+        return this;
+    }
+
     public SlotManagerConfigurationBuilder setWaitResultConsumedBeforeRelease(
             boolean waitResultConsumedBeforeRelease) {
         this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
@@ -114,6 +124,7 @@ public class SlotManagerConfigurationBuilder {
                 taskManagerRequestTimeout,
                 slotRequestTimeout,
                 taskManagerTimeout,
+                requirementCheckDelay,
                 waitResultConsumedBeforeRelease,
                 AnyMatchingSlotMatchingStrategy.INSTANCE,
                 defaultWorkerResourceSpec,

Reply via email to