This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea752ff1129168d369f03cf25d5c81f661b4b405
Author: Roc Marshal <[email protected]>
AuthorDate: Wed Dec 13 13:19:06 2023 +0800

    [FLINK-33874][runtime] Support resource request wait mechanism at 
DefaultDeclarativeSlotPool side for Default Scheduler
---
 .../generated/expert_scheduling_section.html       |   6 +
 .../generated/job_manager_configuration.html       |   6 +
 .../flink/configuration/JobManagerOptions.java     |   7 +
 .../DefaultSlotPoolServiceSchedulerFactory.java    |  20 ++-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   5 +-
 .../jobmaster/SlotPoolServiceSchedulerFactory.java |   7 +-
 .../slotpool/AbstractSlotPoolServiceFactory.java   |   6 +-
 .../slotpool/BlocklistDeclarativeSlotPool.java     |  14 +-
 .../BlocklistDeclarativeSlotPoolFactory.java       |   9 +-
 .../slotpool/DeclarativeSlotPoolBridge.java        |  18 ++-
 .../DeclarativeSlotPoolBridgeServiceFactory.java   |  12 +-
 .../slotpool/DeclarativeSlotPoolFactory.java       |   5 +-
 .../slotpool/DeclarativeSlotPoolService.java       |  27 ++--
 .../DeclarativeSlotPoolServiceFactory.java         |  20 ++-
 .../slotpool/DefaultDeclarativeSlotPool.java       |  49 ++++++-
 .../DefaultDeclarativeSlotPoolFactory.java         |  11 +-
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |   7 +-
 .../jobmaster/slotpool/SlotPoolService.java        |   6 +-
 .../jobmaster/slotpool/SlotPoolServiceFactory.java |   5 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |   2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   9 +-
 .../AbstractDeclarativeSlotPoolBridgeTest.java     | 107 +++++++++++++++
 .../slotpool/BlocklistDeclarativeSlotPoolTest.java |  53 ++++++--
 .../slotpool/DeclarativeSlotPoolBridgeBuilder.java |  42 +++++-
 ...tiveSlotPoolBridgePreferredAllocationsTest.java |  12 +-
 ...arativeSlotPoolBridgeRequestCompletionTest.java |   5 +-
 ...ativeSlotPoolBridgeResourceDeclarationTest.java |  47 +++----
 .../slotpool/DeclarativeSlotPoolBridgeTest.java    |  98 +++++---------
 .../slotpool/DeclarativeSlotPoolServiceTest.java   |  11 +-
 .../DefaultDeclarativeSlotPoolBuilder.java         |  22 +++-
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   | 145 ++++++++++++++-------
 .../DefaultDeclarativeSlotPoolTestBase.java        |  45 +++++++
 .../slotpool/PhysicalSlotProviderExtension.java    |   5 +-
 ...erImplWithDefaultSlotSelectionStrategyTest.java |   4 +-
 ...lSlotProviderImplWithSpreadOutStrategyTest.java |   4 +-
 .../slotpool/SlotPoolBatchSlotRequestTest.java     |   6 +-
 .../slotpool/SlotPoolInteractionsTest.java         |   6 +-
 .../TestingDeclarativeSlotPoolFactory.java         |   7 +-
 .../jobmaster/slotpool/TestingSlotPoolService.java |  15 +--
 .../slotpool/TestingSlotPoolServiceBuilder.java    |   9 +-
 .../DefaultSchedulerBatchSchedulingTest.java       |   3 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |   3 +-
 .../adaptive/AdaptiveSchedulerBuilder.java         |   4 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |   9 +-
 .../e2e/SchedulerEndToEndBenchmarkBase.java        |   5 +-
 45 files changed, 655 insertions(+), 263 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 6be6547547c..5ea42dab4d1 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -152,6 +152,12 @@
             <td>Duration</td>
             <td>The timeout for a idle slot in Slot Pool.</td>
         </tr>
+        <tr>
+            <td><h5>slot.request.max-interval</h5></td>
+            <td style="word-wrap: break-word;">20 ms</td>
+            <td>Duration</td>
+            <td>The max interval duration for slots request.</td>
+        </tr>
         <tr>
             <td><h5>slot.request.timeout</h5></td>
             <td style="word-wrap: break-word;">5 min</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html 
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index 3b2ecf56ffe..f81b21209b0 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -224,6 +224,12 @@
             <td>Duration</td>
             <td>The timeout for a idle slot in Slot Pool.</td>
         </tr>
+        <tr>
+            <td><h5>slot.request.max-interval</h5></td>
+            <td style="word-wrap: break-word;">20 ms</td>
+            <td>Duration</td>
+            <td>The max interval duration for slots request.</td>
+        </tr>
         <tr>
             <td><h5>slot.request.timeout</h5></td>
             <td style="word-wrap: break-word;">5 min</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index e1fdb25c48b..6bdb8db7f7c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -447,6 +447,13 @@ public class JobManagerOptions {
                     
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
                     .withDescription("The timeout for a idle slot in Slot 
Pool.");
 
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Duration> SLOT_REQUEST_MAX_INTERVAL =
+            key("slot.request.max-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(20L))
+                    .withDescription("The max interval duration for slots 
request.");
+
     /** Config parameter determining the scheduler implementation. */
     @Documentation.Section({
         Documentation.Sections.EXPERT_SCHEDULING,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 6474933f962..bd51b9049b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -57,11 +57,15 @@ import org.apache.flink.util.clock.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_MAX_INTERVAL;
+
 /** Default {@link SlotPoolServiceSchedulerFactory} implementation. */
 public final class DefaultSlotPoolServiceSchedulerFactory
         implements SlotPoolServiceSchedulerFactory {
@@ -86,8 +90,11 @@ public final class DefaultSlotPoolServiceSchedulerFactory
 
     @Override
     public SlotPoolService createSlotPoolService(
-            JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
-        return slotPoolServiceFactory.createSlotPoolService(jid, 
declarativeSlotPoolFactory);
+            JobID jid,
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
+        return slotPoolServiceFactory.createSlotPoolService(
+                jid, declarativeSlotPoolFactory, componentMainThreadExecutor);
     }
 
     @Override
@@ -162,6 +169,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory
         JobManagerOptions.SchedulerType schedulerType =
                 getSchedulerType(configuration, jobType, isDynamicGraph);
 
+        final Duration slotRequestMaxInterval = 
configuration.get(SLOT_REQUEST_MAX_INTERVAL);
+
         if (configuration
                 
.getOptional(JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT)
                 .isPresent()) {
@@ -180,13 +189,17 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                                 rpcTimeout,
                                 slotIdleTimeout,
                                 batchSlotTimeout,
+                                slotRequestMaxInterval,
                                 getRequestSlotMatchingStrategy(configuration, 
jobType));
                 break;
             case Adaptive:
                 schedulerNGFactory = new AdaptiveSchedulerFactory();
                 slotPoolServiceFactory =
                         new DeclarativeSlotPoolServiceFactory(
-                                SystemClock.getInstance(), slotIdleTimeout, 
rpcTimeout);
+                                SystemClock.getInstance(),
+                                slotIdleTimeout,
+                                rpcTimeout,
+                                slotRequestMaxInterval);
                 break;
             case AdaptiveBatch:
                 schedulerNGFactory = new AdaptiveBatchSchedulerFactory();
@@ -196,6 +209,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                                 rpcTimeout,
                                 slotIdleTimeout,
                                 batchSlotTimeout,
+                                slotRequestMaxInterval,
                                 getRequestSlotMatchingStrategy(configuration, 
jobType));
                 break;
             default:
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index bb681f481cd..8fb28695d2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -361,7 +361,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                         .createSlotPoolService(
                                 jid,
                                 createDeclarativeSlotPoolFactory(
-                                        
jobMasterConfiguration.getConfiguration()));
+                                        
jobMasterConfiguration.getConfiguration()),
+                                getMainThreadExecutor());
 
         this.partitionTracker =
                 checkNotNull(partitionTrackerFactory)
@@ -1146,7 +1147,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                     createResourceManagerHeartbeatManager(heartbeatServices);
 
             // start the slot pool make sure the slot pool now accepts 
messages for this leader
-            slotPoolService.start(getFencingToken(), getAddress(), 
getMainThreadExecutor());
+            slotPoolService.start(getFencingToken(), getAddress());
 
             // job is ready to go, try to establish connection with resource 
manager
             //   - activate leader retrieval for the resource manager
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
index 02a610988da..705625af9ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
@@ -39,6 +39,8 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -51,10 +53,13 @@ public interface SlotPoolServiceSchedulerFactory {
      *
      * @param jid jid is the JobID to pass to the service
      * @param declarativeSlotPoolFactory the declarative slot pool factory
+     * @param componentMainThreadExecutor component main thread executor.
      * @return created SlotPoolService
      */
     SlotPoolService createSlotPoolService(
-            JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory);
+            JobID jid,
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor);
 
     /**
      * Returns the scheduler type this factory is creating.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
index ba281d1cd39..484e7fce3b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
@@ -35,14 +35,18 @@ public abstract class AbstractSlotPoolServiceFactory 
implements SlotPoolServiceF
 
     @Nonnull protected final Duration batchSlotTimeout;
 
+    @Nonnull protected final Duration slotRequestMaxInterval;
+
     protected AbstractSlotPoolServiceFactory(
             @Nonnull Clock clock,
             @Nonnull Duration rpcTimeout,
             @Nonnull Duration slotIdleTimeout,
-            @Nonnull Duration batchSlotTimeout) {
+            @Nonnull Duration batchSlotTimeout,
+            @Nonnull Duration slotRequestMaxInterval) {
         this.clock = clock;
         this.rpcTimeout = rpcTimeout;
         this.slotIdleTimeout = slotIdleTimeout;
         this.batchSlotTimeout = batchSlotTimeout;
+        this.slotRequestMaxInterval = slotRequestMaxInterval;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
index c8f9ada9fdc..7eb95b0d678 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -56,8 +57,17 @@ public class BlocklistDeclarativeSlotPool extends 
DefaultDeclarativeSlotPool {
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
             BlockedTaskManagerChecker blockedTaskManagerChecker,
             Duration idleSlotTimeout,
-            Duration rpcTimeout) {
-        super(jobId, slotPool, notifyNewResourceRequirements, idleSlotTimeout, 
rpcTimeout);
+            Duration rpcTimeout,
+            Duration slotRequestMaxInterval,
+            ComponentMainThreadExecutor componentMainThreadExecutor) {
+        super(
+                jobId,
+                slotPool,
+                notifyNewResourceRequirements,
+                idleSlotTimeout,
+                rpcTimeout,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
         this.blockedTaskManagerChecker = 
checkNotNull(blockedTaskManagerChecker);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
index 7b85f9e4ebd..ceb9b53d249 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import java.time.Duration;
@@ -43,13 +44,17 @@ public class BlocklistDeclarativeSlotPoolFactory implements 
DeclarativeSlotPoolF
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
             Duration idleSlotTimeout,
-            Duration rpcTimeout) {
+            Duration rpcTimeout,
+            Duration slotRequestMaxInterval,
+            ComponentMainThreadExecutor componentMainThreadExecutor) {
         return new BlocklistDeclarativeSlotPool(
                 jobId,
                 new DefaultAllocatedSlotPool(),
                 notifyNewResourceRequirements,
                 blockedTaskManagerChecker,
                 idleSlotTimeout,
-                rpcTimeout);
+                rpcTimeout,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 444c2aa0759..b04dc25f615 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -66,8 +66,6 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 
     private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
 
-    @Nullable private ComponentMainThreadExecutor componentMainThreadExecutor;
-
     private final Duration batchSlotTimeout;
     private boolean isBatchSlotRequestTimeoutCheckDisabled;
 
@@ -80,8 +78,17 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
             Duration rpcTimeout,
             Duration idleSlotTimeout,
             Duration batchSlotTimeout,
-            RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
-        super(jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, 
rpcTimeout);
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
+            Duration slotRequestMaxInterval,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
+        super(
+                jobId,
+                declarativeSlotPoolFactory,
+                clock,
+                idleSlotTimeout,
+                rpcTimeout,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
 
         this.idleSlotTimeout = idleSlotTimeout;
         this.batchSlotTimeout = Preconditions.checkNotNull(batchSlotTimeout);
@@ -107,8 +114,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
     }
 
     @Override
-    protected void onStart(ComponentMainThreadExecutor 
componentMainThreadExecutor) {
-        this.componentMainThreadExecutor = componentMainThreadExecutor;
+    protected void onStart() {
 
         
getDeclarativeSlotPool().registerNewSlotsListener(this::newSlotsAreAvailable);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
index 279a2d58df3..c446c8727da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.util.clock.Clock;
 
 import javax.annotation.Nonnull;
@@ -35,15 +36,18 @@ public class DeclarativeSlotPoolBridgeServiceFactory 
extends AbstractSlotPoolSer
             @Nonnull Duration rpcTimeout,
             @Nonnull Duration slotIdleTimeout,
             @Nonnull Duration batchSlotTimeout,
+            @Nonnull Duration slotRequestMaxInterval,
             @Nonnull RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
-        super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout);
+        super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout, 
slotRequestMaxInterval);
         this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
     }
 
     @Nonnull
     @Override
     public SlotPoolService createSlotPoolService(
-            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
+            @Nonnull JobID jobId,
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         return new DeclarativeSlotPoolBridge(
                 jobId,
                 declarativeSlotPoolFactory,
@@ -51,6 +55,8 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends 
AbstractSlotPoolSer
                 rpcTimeout,
                 slotIdleTimeout,
                 batchSlotTimeout,
-                requestSlotMatchingStrategy);
+                requestSlotMatchingStrategy,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
index 6630254552c..8a231e267e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import java.time.Duration;
@@ -31,5 +32,7 @@ public interface DeclarativeSlotPoolFactory {
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
             Duration idleSlotTimeout,
-            Duration rpcTimeout);
+            Duration rpcTimeout,
+            Duration slotRequestMaxInterval,
+            ComponentMainThreadExecutor componentMainThreadExecutor);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index 9f00b8ff0bb..2345a553eb7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -41,6 +40,7 @@ import org.apache.flink.util.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
@@ -76,21 +76,30 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
     @Nullable private String jobManagerAddress;
 
     private State state = State.CREATED;
+    protected final ComponentMainThreadExecutor componentMainThreadExecutor;
 
     public DeclarativeSlotPoolService(
             JobID jobId,
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
             Clock clock,
             Duration idleSlotTimeout,
-            Duration rpcTimeout) {
+            Duration rpcTimeout,
+            Duration slotRequestMaxInterval,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         this.jobId = jobId;
         this.clock = clock;
         this.rpcTimeout = rpcTimeout;
         this.registeredTaskManagers = new HashSet<>();
+        this.componentMainThreadExecutor = componentMainThreadExecutor;
 
         this.declarativeSlotPool =
                 declarativeSlotPoolFactory.create(
-                        jobId, this::declareResourceRequirements, 
idleSlotTimeout, rpcTimeout);
+                        jobId,
+                        this::declareResourceRequirements,
+                        idleSlotTimeout,
+                        rpcTimeout,
+                        slotRequestMaxInterval,
+                        componentMainThreadExecutor);
     }
 
     protected DeclarativeSlotPool getDeclarativeSlotPool() {
@@ -111,9 +120,7 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
     }
 
     @Override
-    public final void start(
-            JobMasterId jobMasterId, String address, 
ComponentMainThreadExecutor mainThreadExecutor)
-            throws Exception {
+    public final void start(JobMasterId jobMasterId, String address) throws 
Exception {
         Preconditions.checkState(
                 state == State.CREATED, "The DeclarativeSlotPoolService can 
only be started once.");
 
@@ -122,9 +129,9 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
 
         this.resourceRequirementServiceConnectionManager =
                 
DefaultDeclareResourceRequirementServiceConnectionManager.create(
-                        mainThreadExecutor);
+                        componentMainThreadExecutor);
 
-        onStart(mainThreadExecutor);
+        onStart();
 
         state = State.STARTED;
     }
@@ -132,10 +139,8 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
     /**
      * This method is called when the slot pool service is started. It can be 
overridden by
      * subclasses.
-     *
-     * @param componentMainThreadExecutor componentMainThreadExecutor used by 
this slot pool service
      */
-    protected void onStart(ComponentMainThreadExecutor 
componentMainThreadExecutor) {}
+    protected void onStart() {}
 
     protected void assertHasBeenStarted() {
         Preconditions.checkState(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
index 0680a9587c3..be26dd496e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.util.clock.Clock;
 
 import javax.annotation.Nonnull;
@@ -31,19 +32,32 @@ public class DeclarativeSlotPoolServiceFactory implements 
SlotPoolServiceFactory
     private final Clock clock;
     private final Duration idleSlotTimeout;
     private final Duration rpcTimeout;
+    private final @Nonnull Duration slotRequestMaxInterval;
 
     public DeclarativeSlotPoolServiceFactory(
-            Clock clock, Duration idleSlotTimeout, Duration rpcTimeout) {
+            Clock clock,
+            Duration idleSlotTimeout,
+            Duration rpcTimeout,
+            @Nonnull Duration slotRequestMaxInterval) {
         this.clock = clock;
         this.idleSlotTimeout = idleSlotTimeout;
         this.rpcTimeout = rpcTimeout;
+        this.slotRequestMaxInterval = slotRequestMaxInterval;
     }
 
     @Nonnull
     @Override
     public SlotPoolService createSlotPoolService(
-            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
+            @Nonnull JobID jobId,
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         return new DeclarativeSlotPoolService(
-                jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, 
rpcTimeout);
+                jobId,
+                declarativeSlotPoolFactory,
+                clock,
+                idleSlotTimeout,
+                rpcTimeout,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index 70061b67fa9..2689aaa606f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -49,8 +50,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -104,18 +108,28 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
     private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
 
+    @Nonnull private final ComponentMainThreadExecutor 
componentMainThreadExecutor;
+
+    // For slots(resources) requests by batch.
+    @Nonnull private final Duration slotRequestMaxInterval;
+    @Nullable private ScheduledFuture<?> slotRequestFuture;
+
     public DefaultDeclarativeSlotPool(
             JobID jobId,
             AllocatedSlotPool slotPool,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
             Duration idleSlotTimeout,
-            Duration rpcTimeout) {
+            Duration rpcTimeout,
+            Duration slotRequestMaxInterval,
+            ComponentMainThreadExecutor componentMainThreadExecutor) {
 
         this.jobId = jobId;
         this.slotPool = slotPool;
         this.notifyNewResourceRequirements = notifyNewResourceRequirements;
         this.idleSlotTimeout = idleSlotTimeout;
         this.rpcTimeout = rpcTimeout;
+        this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+        this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
         this.totalResourceRequirements = ResourceCounter.empty();
         this.fulfilledResourceRequirements = ResourceCounter.empty();
         this.slotToRequirementProfileMappings = new HashMap<>();
@@ -128,7 +142,7 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
         }
         totalResourceRequirements = totalResourceRequirements.add(increment);
 
-        declareResourceRequirements();
+        doDeclareResourceRequirements();
     }
 
     @Override
@@ -138,7 +152,25 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
         }
         totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
 
-        declareResourceRequirements();
+        doDeclareResourceRequirements();
+    }
+
+    private void doDeclareResourceRequirements() {
+        if (slotRequestMaxInterval.toMillis() <= 0L) {
+            declareResourceRequirements();
+            return;
+        }
+
+        if (slotRequestFuture != null
+                && !slotRequestFuture.isDone()
+                && !slotRequestFuture.isCancelled()) {
+            slotRequestFuture.cancel(true);
+        }
+        slotRequestFuture =
+                componentMainThreadExecutor.schedule(
+                        this::declareResourceRequirements,
+                        slotRequestMaxInterval.toMillis(),
+                        TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -603,4 +635,15 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
     ResourceCounter getFulfilledResourceRequirements() {
         return fulfilledResourceRequirements;
     }
+
+    @VisibleForTesting
+    void tryWaitSlotRequestIsDone() {
+        if (Objects.nonNull(slotRequestFuture)) {
+            try {
+                slotRequestFuture.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
index 95655b1fe3a..c476dae1aaf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
+import javax.annotation.Nonnull;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
@@ -33,12 +36,16 @@ public class DefaultDeclarativeSlotPoolFactory implements 
DeclarativeSlotPoolFac
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
             Duration idleSlotTimeout,
-            Duration rpcTimeout) {
+            Duration rpcTimeout,
+            Duration slotRequestMaxInterval,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         return new DefaultDeclarativeSlotPool(
                 jobId,
                 new DefaultAllocatedSlotPool(),
                 notifyNewResourceRequirements,
                 idleSlotTimeout,
-                rpcTimeout);
+                rpcTimeout,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index edff664625c..e32ab5dc42a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
@@ -46,11 +45,7 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
     //  lifecycle
     // ------------------------------------------------------------------------
 
-    void start(
-            JobMasterId jobMasterId,
-            String newJobManagerAddress,
-            ComponentMainThreadExecutor jmMainThreadScheduledExecutor)
-            throws Exception;
+    void start(JobMasterId jobMasterId, String newJobManagerAddress) throws 
Exception;
 
     void close();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
index b0322dd2c5f..1a0b3513c96 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -59,12 +58,9 @@ public interface SlotPoolService extends AutoCloseable {
      *
      * @param jobMasterId jobMasterId to start the service with
      * @param address address of the owner
-     * @param mainThreadExecutor mainThreadExecutor to run actions in the main 
thread
      * @throws Exception if the service cannot be started
      */
-    void start(
-            JobMasterId jobMasterId, String address, 
ComponentMainThreadExecutor mainThreadExecutor)
-            throws Exception;
+    void start(JobMasterId jobMasterId, String address) throws Exception;
 
     /** Close the slot pool service. */
     void close();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index bfbac79569c..f987f28a67f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 
 import javax.annotation.Nonnull;
 
@@ -27,5 +28,7 @@ public interface SlotPoolServiceFactory {
 
     @Nonnull
     SlotPoolService createSlotPoolService(
-            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory);
+            @Nonnull JobID jobId,
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index f87f7a9172b..d4f9c22dd56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -405,7 +405,7 @@ class ExecutionGraphRestartTest {
     private static void setupSlotPool(SlotPool slotPool) throws Exception {
         final String jobManagerAddress = "foobar";
         final ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
-        slotPool.start(JobMasterId.generate(), jobManagerAddress, 
mainThreadExecutor);
+        slotPool.start(JobMasterId.generate(), jobManagerAddress);
         slotPool.connectToResourceManager(resourceManagerGateway);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e1f60398a3a..55e252ad42a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -485,7 +485,9 @@ class JobMasterTest {
         @Nonnull
         @Override
         public SlotPoolService createSlotPoolService(
-                @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
+                @Nonnull JobID jobId,
+                DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+                @Nonnull ComponentMainThreadExecutor 
componentMainThreadExecutor) {
             return new TestingSlotPool(jobId, hasReceivedSlotOffers);
         }
     }
@@ -505,10 +507,7 @@ class JobMasterTest {
         }
 
         @Override
-        public void start(
-                JobMasterId jobMasterId,
-                String newJobManagerAddress,
-                ComponentMainThreadExecutor jmMainThreadScheduledExecutor) {}
+        public void start(JobMasterId jobMasterId, String 
newJobManagerAddress) {}
 
         @Override
         public void close() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java
new file mode 100644
index 00000000000..6d9fff147ab
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
+/** Test base class for {@link DeclarativeSlotPoolBridge}. */
+abstract class AbstractDeclarativeSlotPoolBridgeTest {
+
+    protected static final Duration RPC_TIMEOUT = Duration.ofSeconds(20);
+    protected static final JobID JOB_ID = new JobID();
+    protected static final JobMasterId JOB_MASTER_ID = JobMasterId.generate();
+    protected final ComponentMainThreadExecutor componentMainThreadExecutor = 
forMainThread();
+
+    @Parameter protected RequestSlotMatchingStrategy 
requestSlotMatchingStrategy;
+
+    @Parameter(1)
+    protected Duration slotRequestMaxInterval;
+
+    @Parameters(name = "requestSlotMatchingStrategy: {0}, 
slotRequestMaxInterval: {1}")
+    private static Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO},
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)},
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO
+                },
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)
+                });
+    }
+
+    @Nonnull
+    DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
+            Duration slotRequestMaxInterval) {
+        return createDeclarativeSlotPoolBridge(
+                declarativeSlotPoolFactory,
+                requestSlotMatchingStrategy,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
+    }
+
+    @Nonnull
+    DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
+            Duration slotRequestMaxInterval,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        return new DeclarativeSlotPoolBridge(
+                JOB_ID,
+                declarativeSlotPoolFactory,
+                SystemClock.getInstance(),
+                RPC_TIMEOUT,
+                Duration.ofSeconds(20),
+                Duration.ofSeconds(20),
+                requestSlotMatchingStrategy,
+                slotRequestMaxInterval,
+                mainThreadExecutor);
+    }
+
+    static PhysicalSlot createAllocatedSlot(AllocationID allocationID) {
+        return new AllocatedSlot(
+                allocationID,
+                new LocalTaskManagerLocation(),
+                0,
+                ResourceProfile.ANY,
+                new RpcTaskManagerGateway(
+                        new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(),
+                        JobMasterId.generate()));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
index f0f7e6b719f..df8194f6b68 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
@@ -29,8 +30,11 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ResourceCounter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -44,6 +48,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.FreeSlotConsumer;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.NewSlotsService;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createResourceRequirements;
@@ -54,17 +59,18 @@ import static 
org.apache.flink.shaded.guava32.com.google.common.collect.Iterable
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link BlocklistDeclarativeSlotPool}. */
-class BlocklistDeclarativeSlotPoolTest {
+@ExtendWith(ParameterizedTestExtension.class)
+class BlocklistDeclarativeSlotPoolTest extends 
DefaultDeclarativeSlotPoolTestBase {
 
     private static final ResourceProfile RESOURCE_PROFILE =
             ResourceProfile.newBuilder().setCpuCores(1.7).build();
 
-    @Test
+    @TestTemplate
     void testOfferSlotsFromBlockedTaskManager() throws Exception {
         testOfferSlots(true);
     }
 
-    @Test
+    @TestTemplate
     void testOfferSlotsFromUnblockedTaskManager() throws Exception {
         testOfferSlots(false);
     }
@@ -78,12 +84,16 @@ class BlocklistDeclarativeSlotPoolTest {
                 BlocklistDeclarativeSlotPoolBuilder.builder()
                         .setBlockedTaskManagerChecker(
                                 isBlocked ? 
taskManager.getResourceID()::equals : ignore -> false)
+                        .setSlotRequestMaxInterval(slotRequestMaxInterval)
+                        .setMainThreadExecutor(componentMainThreadExecutor)
                         .build();
         slotPool.registerNewSlotsListener(notifyNewSlots);
 
         final ResourceCounter resourceRequirements = 
createResourceRequirements();
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         // offer slots on the blocked task manager
         Collection<SlotOffer> slotOffers =
                 createSlotOffersForResourceRequirements(resourceRequirements);
@@ -107,7 +117,7 @@ class BlocklistDeclarativeSlotPoolTest {
         }
     }
 
-    @Test
+    @TestTemplate
     void testOfferDuplicateSlots() {
         final TaskManagerLocation taskManager = new LocalTaskManagerLocation();
         final List<ResourceID> blockedTaskManagers = new ArrayList<>();
@@ -115,12 +125,15 @@ class BlocklistDeclarativeSlotPoolTest {
         final BlocklistDeclarativeSlotPool slotPool =
                 BlocklistDeclarativeSlotPoolBuilder.builder()
                         
.setBlockedTaskManagerChecker(blockedTaskManagers::contains)
+                        .setSlotRequestMaxInterval(slotRequestMaxInterval)
+                        .setMainThreadExecutor(componentMainThreadExecutor)
                         .build();
-
         final ResourceCounter resourceRequirements =
                 ResourceCounter.withResource(RESOURCE_PROFILE, 2);
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, 
RESOURCE_PROFILE);
         SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, 
RESOURCE_PROFILE);
 
@@ -154,8 +167,9 @@ class BlocklistDeclarativeSlotPoolTest {
                 BlocklistDeclarativeSlotPoolBuilder.builder()
                         .setBlockedTaskManagerChecker(
                                 isBlocked ? 
taskManager.getResourceID()::equals : ignore -> false)
+                        .setSlotRequestMaxInterval(Duration.ZERO)
+                        .setMainThreadExecutor(componentMainThreadExecutor)
                         .build();
-
         final int numberSlots = 10;
         final Collection<SlotOffer> slotOffers =
                 createSlotOffersForResourceRequirements(
@@ -195,8 +209,9 @@ class BlocklistDeclarativeSlotPoolTest {
         final BlocklistDeclarativeSlotPool slotPool =
                 BlocklistDeclarativeSlotPoolBuilder.builder()
                         
.setBlockedTaskManagerChecker(blockedTaskManagers::contains)
+                        .setSlotRequestMaxInterval(Duration.ZERO)
+                        .setMainThreadExecutor(componentMainThreadExecutor)
                         .build();
-
         SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, 
RESOURCE_PROFILE);
         SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, 
RESOURCE_PROFILE);
 
@@ -222,12 +237,12 @@ class BlocklistDeclarativeSlotPoolTest {
         assertThat(acceptedOffers).containsExactly(slot1);
     }
 
-    @Test
+    @TestTemplate
     void testFreeReservedSlotsOnBlockedTaskManager() throws Exception {
         testFreeReservedSlots(true);
     }
 
-    @Test
+    @TestTemplate
     void testFreeReservedSlotsOnUnblockedTaskManager() throws Exception {
         testFreeReservedSlots(false);
     }
@@ -244,6 +259,8 @@ class BlocklistDeclarativeSlotPoolTest {
         final BlocklistDeclarativeSlotPool slotPool =
                 BlocklistDeclarativeSlotPoolBuilder.builder()
                         
.setBlockedTaskManagerChecker(blockedTaskManagers::contains)
+                        .setSlotRequestMaxInterval(slotRequestMaxInterval)
+                        .setMainThreadExecutor(componentMainThreadExecutor)
                         .build();
         slotPool.registerNewSlotsListener(notifyNewSlots);
 
@@ -292,6 +309,8 @@ class BlocklistDeclarativeSlotPoolTest {
 
     private static class BlocklistDeclarativeSlotPoolBuilder {
         private BlockedTaskManagerChecker blockedTaskManagerChecker = 
resourceID -> false;
+        private Duration slotRequestMaxInterval = Duration.ZERO;
+        private ComponentMainThreadExecutor mainThreadExecutor = 
forMainThread();
 
         public BlocklistDeclarativeSlotPoolBuilder 
setBlockedTaskManagerChecker(
                 BlockedTaskManagerChecker blockedTaskManagerChecker) {
@@ -299,6 +318,18 @@ class BlocklistDeclarativeSlotPoolTest {
             return this;
         }
 
+        public BlocklistDeclarativeSlotPoolBuilder setSlotRequestMaxInterval(
+                Duration slotRequestMaxInterval) {
+            this.slotRequestMaxInterval = slotRequestMaxInterval;
+            return this;
+        }
+
+        public BlocklistDeclarativeSlotPoolBuilder setMainThreadExecutor(
+                ComponentMainThreadExecutor mainThreadExecutor) {
+            this.mainThreadExecutor = mainThreadExecutor;
+            return this;
+        }
+
         public BlocklistDeclarativeSlotPool build() {
             return new BlocklistDeclarativeSlotPool(
                     new JobID(),
@@ -306,7 +337,9 @@ class BlocklistDeclarativeSlotPoolTest {
                     ignored -> {},
                     blockedTaskManagerChecker,
                     Duration.ofSeconds(20),
-                    Duration.ofSeconds(20));
+                    Duration.ofSeconds(20),
+                    slotRequestMaxInterval,
+                    mainThreadExecutor);
         }
 
         public static BlocklistDeclarativeSlotPoolBuilder builder() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
index f3a31c4d552..6b267dafd6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
@@ -33,6 +33,9 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_MAX_INTERVAL;
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
 /** Builder for a {@link DeclarativeSlotPoolBridge}. */
 public class DeclarativeSlotPoolBridgeBuilder {
 
@@ -40,6 +43,8 @@ public class DeclarativeSlotPoolBridgeBuilder {
     private Duration batchSlotTimeout = 
JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue();
     private Duration idleSlotTimeout = 
TestingUtils.infiniteTime().toDuration();
     private Clock clock = SystemClock.getInstance();
+    private Duration slotRequestMaxInterval = 
SLOT_REQUEST_MAX_INTERVAL.defaultValue();
+    private ComponentMainThreadExecutor mainThreadExecutor = forMainThread();
 
     @Nullable
     private ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
@@ -63,6 +68,12 @@ public class DeclarativeSlotPoolBridgeBuilder {
         return this;
     }
 
+    public DeclarativeSlotPoolBridgeBuilder setSlotRequestMaxInterval(
+            Duration slotRequestMaxInterval) {
+        this.slotRequestMaxInterval = slotRequestMaxInterval;
+        return this;
+    }
+
     public DeclarativeSlotPoolBridgeBuilder setClock(Clock clock) {
         this.clock = clock;
         return this;
@@ -79,6 +90,12 @@ public class DeclarativeSlotPoolBridgeBuilder {
         return this;
     }
 
+    public DeclarativeSlotPoolBridgeBuilder setMainThreadExecutor(
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+        return this;
+    }
+
     public DeclarativeSlotPoolBridge build() {
         return new DeclarativeSlotPoolBridge(
                 jobId,
@@ -87,19 +104,30 @@ public class DeclarativeSlotPoolBridgeBuilder {
                 TestingUtils.infiniteTime().toDuration(),
                 idleSlotTimeout,
                 batchSlotTimeout,
-                requestSlotMatchingStrategy);
+                requestSlotMatchingStrategy,
+                slotRequestMaxInterval,
+                mainThreadExecutor);
     }
 
-    public DeclarativeSlotPoolBridge buildAndStart(
-            ComponentMainThreadExecutor componentMainThreadExecutor) throws 
Exception {
-        final DeclarativeSlotPoolBridge slotPool = build();
-
-        slotPool.start(JobMasterId.generate(), "foobar", 
componentMainThreadExecutor);
+    public DeclarativeSlotPoolBridge buildAndStart() throws Exception {
+        final DeclarativeSlotPoolBridge slotPool =
+                new DeclarativeSlotPoolBridge(
+                        jobId,
+                        new DefaultDeclarativeSlotPoolFactory(),
+                        clock,
+                        TestingUtils.infiniteTime().toDuration(),
+                        idleSlotTimeout,
+                        batchSlotTimeout,
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval,
+                        mainThreadExecutor);
+
+        slotPool.start(JobMasterId.generate(), "foobar");
 
         if (resourceManagerGateway != null) {
             CompletableFuture.runAsync(
                             () -> 
slotPool.connectToResourceManager(resourceManagerGateway),
-                            componentMainThreadExecutor)
+                            mainThreadExecutor)
                     .join();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
index 90ee4b2fb8f..af691d21477 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -34,12 +33,14 @@ import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class DeclarativeSlotPoolBridgePreferredAllocationsTest {
@@ -54,12 +55,11 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest {
                         TestingUtils.infiniteTime().toDuration(),
                         TestingUtils.infiniteTime().toDuration(),
                         TestingUtils.infiniteTime().toDuration(),
-                        
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+                        
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+                        Duration.ZERO,
+                        forMainThread());
 
-        declarativeSlotPoolBridge.start(
-                JobMasterId.generate(),
-                "localhost",
-                ComponentMainThreadExecutorServiceAdapter.forMainThread());
+        declarativeSlotPoolBridge.start(JobMasterId.generate(), "localhost");
 
         final LocalTaskManagerLocation localTaskManagerLocation = new 
LocalTaskManagerLocation();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
index 6c8d5701952..147ef4a9885 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
@@ -125,7 +124,7 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest {
     private SlotPool createAndSetUpSlotPool() throws Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
                 .setResourceManagerGateway(resourceManagerGateway)
-                
.buildAndStart(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+                .buildAndStart();
     }
 
     private void connectToResourceManager(SlotPool slotPool) {
@@ -135,6 +134,6 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest {
     private SlotPool createAndSetUpSlotPoolWithoutResourceManager() throws 
Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
                 .setResourceManagerGateway(null)
-                
.buildAndStart(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+                .buildAndStart();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
index 33774bac6ca..7aa18057536 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
@@ -23,56 +23,41 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.io.IOException;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest.createAllocatedSlot;
-import static 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
 @ExtendWith(ParameterizedTestExtension.class)
-class DeclarativeSlotPoolBridgeResourceDeclarationTest {
-
-    private static final JobMasterId jobMasterId = JobMasterId.generate();
-    private final ComponentMainThreadExecutor mainThreadExecutor =
-            ComponentMainThreadExecutorServiceAdapter.forMainThread();
-
-    @Parameter private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
+class DeclarativeSlotPoolBridgeResourceDeclarationTest
+        extends AbstractDeclarativeSlotPoolBridgeTest {
 
     private RequirementListener requirementListener;
     private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;
 
-    @Parameters(name = "RequestSlotMatchingStrategy: {0}")
-    public static Collection<RequestSlotMatchingStrategy> data() throws 
IOException {
-        return Arrays.asList(
-                SimpleRequestSlotMatchingStrategy.INSTANCE,
-                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
-    }
-
     @BeforeEach
     void setup() {
         requirementListener = new RequirementListener();
 
+        constructDeclarativeSlotPoolBridge(componentMainThreadExecutor);
+    }
+
+    private void constructDeclarativeSlotPoolBridge(
+            ComponentMainThreadExecutor mainThreadExecutor) {
         final TestingDeclarativeSlotPoolBuilder slotPoolBuilder =
                 TestingDeclarativeSlotPool.builder()
                         .setIncreaseResourceRequirementsByConsumer(
@@ -93,7 +78,10 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest {
                 new TestingDeclarativeSlotPoolFactory(slotPoolBuilder);
         declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory, 
requestSlotMatchingStrategy);
+                        declarativeSlotPoolFactory,
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval,
+                        mainThreadExecutor);
     }
 
     @AfterEach
@@ -105,7 +93,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest {
 
     @TestTemplate
     void testRequirementsIncreasedOnNewAllocation() throws Exception {
-        declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+        declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
         // requesting the allocation of a new slot should increase the 
requirements
         declarativeSlotPoolBridge.requestNewAllocatedSlot(
@@ -122,7 +110,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest {
             ComponentMainThreadExecutor mainThreadExecutor =
                     
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
                             scheduledExecutorService);
-            declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+            constructDeclarativeSlotPoolBridge(mainThreadExecutor);
+            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             // requesting the allocation of a new slot increases the 
requirements
             final CompletableFuture<PhysicalSlot> allocationFuture =
@@ -156,7 +145,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest {
 
     @TestTemplate
     void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
-        declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+        declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
         // notifications about new slots should not affect requirements
         final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
@@ -167,7 +156,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest {
 
     @TestTemplate
     void testRequirementsIncreasedOnSlotReservation() throws Exception {
-        declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+        declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
         final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
         
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
@@ -182,7 +171,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest {
 
     @TestTemplate
     void testRequirementsDecreasedOnSlotFreeing() throws Exception {
-        declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+        declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
         final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
         
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
@@ -200,7 +189,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest {
 
     @TestTemplate
     void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
-        declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+        declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
         final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
         
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
index 2d97dc3b58b..28c91a46248 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
@@ -18,36 +18,23 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import javax.annotation.Nonnull;
-
-import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -60,21 +47,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
 @ExtendWith(ParameterizedTestExtension.class)
-class DeclarativeSlotPoolBridgeTest {
-
-    private static final Duration rpcTimeout = Duration.ofSeconds(20);
-    private static final JobID jobId = new JobID();
-    private static final JobMasterId jobMasterId = JobMasterId.generate();
-    private final ComponentMainThreadExecutor mainThreadExecutor =
-            ComponentMainThreadExecutorServiceAdapter.forMainThread();
-    @Parameter private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
-
-    @Parameters(name = "RequestSlotMatchingStrategy: {0}")
-    public static Collection<RequestSlotMatchingStrategy> data() throws 
IOException {
-        return Arrays.asList(
-                SimpleRequestSlotMatchingStrategy.INSTANCE,
-                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
-    }
+class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTest {
 
     @TestTemplate
     void testSlotOffer() throws Exception {
@@ -86,9 +59,11 @@ class DeclarativeSlotPoolBridgeTest {
                 new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory, 
requestSlotMatchingStrategy)) {
+                        declarativeSlotPoolFactory,
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval)) {
 
-            declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             CompletableFuture<PhysicalSlot> slotAllocationFuture =
                     declarativeSlotPoolBridge.requestNewAllocatedSlot(
@@ -108,9 +83,11 @@ class DeclarativeSlotPoolBridgeTest {
                 new 
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory, 
requestSlotMatchingStrategy)) {
+                        declarativeSlotPoolFactory,
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval)) {
 
-            declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             CompletableFuture<PhysicalSlot> slotAllocationFuture =
                     CompletableFuture.supplyAsync(
@@ -119,10 +96,10 @@ class DeclarativeSlotPoolBridgeTest {
                                                     slotRequestId,
                                                     ResourceProfile.UNKNOWN,
                                                     Time.minutes(5)),
-                                    mainThreadExecutor)
+                                    componentMainThreadExecutor)
                             .get();
 
-            mainThreadExecutor.execute(
+            componentMainThreadExecutor.execute(
                     () ->
                             
declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(
                                     Collections.emptyList()));
@@ -154,8 +131,10 @@ class DeclarativeSlotPoolBridgeTest {
                 new TestingDeclarativeSlotPoolFactory(builder);
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory, 
requestSlotMatchingStrategy)) {
-            declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+                        declarativeSlotPoolFactory,
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval)) {
+            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             final SlotRequestId slotRequestId = new SlotRequestId();
 
@@ -171,9 +150,11 @@ class DeclarativeSlotPoolBridgeTest {
     void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws 
Exception {
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
-                        new DefaultDeclarativeSlotPoolFactory(), 
requestSlotMatchingStrategy)) {
+                        new DefaultDeclarativeSlotPoolFactory(),
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval)) {
 
-            declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             final List<SlotRequestId> slotRequestIds =
                     Arrays.asList(new SlotRequestId(), new SlotRequestId());
@@ -186,7 +167,7 @@ class DeclarativeSlotPoolBridgeTest {
                                                 
declarativeSlotPoolBridge.requestNewAllocatedSlot(
                                                         slotRequestId,
                                                         
ResourceProfile.UNKNOWN,
-                                                        
Time.fromDuration(rpcTimeout));
+                                                        
Time.fromDuration(RPC_TIMEOUT));
                                         slotFuture.whenComplete(
                                                 (physicalSlot, throwable) -> {
                                                     if (throwable != null) {
@@ -210,15 +191,17 @@ class DeclarativeSlotPoolBridgeTest {
     void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws 
Exception {
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
-                        new DefaultDeclarativeSlotPoolFactory(), 
requestSlotMatchingStrategy)) {
+                        new DefaultDeclarativeSlotPoolFactory(),
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval)) {
 
-            declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             final CompletableFuture<PhysicalSlot> slotFuture =
                     declarativeSlotPoolBridge.requestNewAllocatedSlot(
                             new SlotRequestId(),
                             ResourceProfile.UNKNOWN,
-                            Time.fromDuration(rpcTimeout));
+                            Time.fromDuration(RPC_TIMEOUT));
 
             final LocalTaskManagerLocation localTaskManagerLocation =
                     new LocalTaskManagerLocation();
@@ -251,8 +234,10 @@ class DeclarativeSlotPoolBridgeTest {
 
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
                 createDeclarativeSlotPoolBridge(
-                        declarativeSlotPoolFactory, 
requestSlotMatchingStrategy)) {
-            declarativeSlotPoolBridge.start(jobMasterId, "localhost", 
mainThreadExecutor);
+                        declarativeSlotPoolFactory,
+                        requestSlotMatchingStrategy,
+                        slotRequestMaxInterval)) {
+            declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost");
 
             declarativeSlotPoolBridge.setIsJobRestarting(true);
 
@@ -270,29 +255,4 @@ class DeclarativeSlotPoolBridgeTest {
             registerSlotsCalledFuture.join();
         }
     }
-
-    @Nonnull
-    static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
-            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
-            RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
-        return new DeclarativeSlotPoolBridge(
-                jobId,
-                declarativeSlotPoolFactory,
-                SystemClock.getInstance(),
-                rpcTimeout,
-                Duration.ofSeconds(20),
-                Duration.ofSeconds(20),
-                requestSlotMatchingStrategy);
-    }
-
-    static PhysicalSlot createAllocatedSlot(AllocationID allocationID) {
-        return new AllocatedSlot(
-                allocationID,
-                new LocalTaskManagerLocation(),
-                0,
-                ResourceProfile.ANY,
-                new RpcTaskManagerGateway(
-                        new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(),
-                        JobMasterId.generate()));
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 91d11eb5d04..7542f045f4d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
@@ -58,6 +57,7 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DeclarativeSlotPoolService}. */
@@ -65,8 +65,7 @@ class DeclarativeSlotPoolServiceTest {
 
     private static final JobID jobId = new JobID();
     private static final JobMasterId jobMasterId = JobMasterId.generate();
-    private final ComponentMainThreadExecutor mainThreadExecutor =
-            ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    private final ComponentMainThreadExecutor mainThreadExecutor = 
forMainThread();
     private static final String address = "localhost";
 
     @Test
@@ -360,9 +359,11 @@ class DeclarativeSlotPoolServiceTest {
                         declarativeSlotPoolFactory,
                         SystemClock.getInstance(),
                         Duration.ofSeconds(20L),
-                        Duration.ofSeconds(20L));
+                        Duration.ofSeconds(20L),
+                        Duration.ZERO,
+                        mainThreadExecutor);
 
-        declarativeSlotPoolService.start(jobMasterId, address, 
mainThreadExecutor);
+        declarativeSlotPoolService.start(jobMasterId, address);
 
         return declarativeSlotPoolService;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
index ea177ff6f85..f976eea5998 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
@@ -18,12 +18,16 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
 
+import static 
org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_MAX_INTERVAL;
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
 /** Builder for {@link DefaultDeclarativeSlotPool}. */
 final class DefaultDeclarativeSlotPoolBuilder {
 
@@ -32,6 +36,8 @@ final class DefaultDeclarativeSlotPoolBuilder {
             ignored -> {};
     private Duration idleSlotTimeout = Duration.ofSeconds(20);
     private Duration rpcTimeout = Duration.ofSeconds(20);
+    private Duration slotRequestMaxInterval = 
SLOT_REQUEST_MAX_INTERVAL.defaultValue();
+    private ComponentMainThreadExecutor componentMainThreadExecutor = 
forMainThread();
 
     public DefaultDeclarativeSlotPoolBuilder setAllocatedSlotPool(
             AllocatedSlotPool allocatedSlotPool) {
@@ -50,13 +56,27 @@ final class DefaultDeclarativeSlotPoolBuilder {
         return this;
     }
 
+    public DefaultDeclarativeSlotPoolBuilder setSlotRequestMaxInterval(
+            Duration slotRequestMaxInterval) {
+        this.slotRequestMaxInterval = slotRequestMaxInterval;
+        return this;
+    }
+
+    public DefaultDeclarativeSlotPoolBuilder setComponentMainThreadExecutor(
+            ComponentMainThreadExecutor componentMainThreadExecutor) {
+        this.componentMainThreadExecutor = componentMainThreadExecutor;
+        return this;
+    }
+
     public DefaultDeclarativeSlotPool build() {
         return new DefaultDeclarativeSlotPool(
                 new JobID(),
                 allocatedSlotPool,
                 notifyNewResourceRequirements,
                 idleSlotTimeout,
-                rpcTimeout);
+                rpcTimeout,
+                slotRequestMaxInterval,
+                componentMainThreadExecutor);
     }
 
     public static DefaultDeclarativeSlotPoolBuilder builder() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index 0244cbadf18..af3e9b66695 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -30,12 +30,15 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ResourceCounter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.function.QuadConsumer;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -58,45 +61,66 @@ import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.crea
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DefaultDeclarativeSlotPool}. */
-class DefaultDeclarativeSlotPoolTest {
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultDeclarativeSlotPoolTest extends 
DefaultDeclarativeSlotPoolTestBase {
 
     private static final ResourceProfile RESOURCE_PROFILE_1 =
             ResourceProfile.newBuilder().setCpuCores(1.7).build();
     private static final ResourceProfile RESOURCE_PROFILE_2 =
             ResourceProfile.newBuilder().setManagedMemoryMB(100).build();
 
-    @Test
+    @TestTemplate
     void 
testIncreasingResourceRequirementsWillSendResourceRequirementNotification()
             throws InterruptedException {
         final NewResourceRequirementsService requirementsListener =
                 new NewResourceRequirementsService();
-        final DeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+        final DefaultDeclarativeSlotPool slotPool =
+                createDefaultDeclarativeSlotPool(requirementsListener, 
slotRequestMaxInterval);
 
         final ResourceCounter increment1 = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
         final ResourceCounter increment2 = createResourceRequirements();
         slotPool.increaseResourceRequirementsBy(increment1);
         slotPool.increaseResourceRequirementsBy(increment2);
 
-        assertThat(requirementsListener.takeResourceRequirements())
-                .isEqualTo(toResourceRequirements(increment1));
+        slotPool.tryWaitSlotRequestIsDone();
 
-        final ResourceCounter totalResources = increment1.add(increment2);
         assertThat(requirementsListener.takeResourceRequirements())
-                .isEqualTo(toResourceRequirements(totalResources));
-        
assertThat(requirementsListener.hasNextResourceRequirements()).isFalse();
+                .isEqualTo(
+                        
toResourceRequirements(getExpectedResourceCounter(increment1, increment2)));
+
+        if (slotRequestMaxInterval.toMillis() <= 0L) {
+            final ResourceCounter totalResources = increment1.add(increment2);
+            assertThat(requirementsListener.takeResourceRequirements())
+                    .isEqualTo(toResourceRequirements(totalResources));
+            
assertThat(requirementsListener.hasNextResourceRequirements()).isFalse();
+        }
     }
 
-    @Test
+    private ResourceCounter getExpectedResourceCounter(ResourceCounter... 
resourceCounters) {
+        ResourceCounter result = ResourceCounter.empty();
+        if (slotRequestMaxInterval.toMillis() > 0L) {
+            for (ResourceCounter resourceCounter : resourceCounters) {
+                result = result.add(resourceCounter);
+            }
+            return result;
+        } else {
+            return resourceCounters[0];
+        }
+    }
+
+    @TestTemplate
     void 
testDecreasingResourceRequirementsWillSendResourceRequirementNotification()
             throws InterruptedException {
         final NewResourceRequirementsService requirementsListener =
                 new NewResourceRequirementsService();
         final DefaultDeclarativeSlotPool slotPool =
-                createDefaultDeclarativeSlotPool(requirementsListener);
+                createDefaultDeclarativeSlotPool(requirementsListener, 
slotRequestMaxInterval);
 
         final ResourceCounter increment = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 3);
         slotPool.increaseResourceRequirementsBy(increment);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         requirementsListener.takeResourceRequirements();
 
         final ResourceCounter decrement = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
@@ -108,10 +132,9 @@ class DefaultDeclarativeSlotPoolTest {
         
assertThat(requirementsListener.hasNextResourceRequirements()).isFalse();
     }
 
-    @Test
+    @TestTemplate
     void testGetResourceRequirements() {
-        final DefaultDeclarativeSlotPool slotPool =
-                DefaultDeclarativeSlotPoolBuilder.builder().build();
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
 
         assertThat(slotPool.getResourceRequirements()).isEmpty();
 
@@ -119,11 +142,13 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         assertThat(slotPool.getResourceRequirements())
                 .isEqualTo(toResourceRequirements(resourceRequirements));
     }
 
-    @Test
+    @TestTemplate
     void testOfferSlots() throws InterruptedException {
         final NewSlotsService notifyNewSlots = new NewSlotsService();
         final DefaultDeclarativeSlotPool slotPool =
@@ -133,6 +158,8 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         Collection<SlotOffer> slotOffers =
                 createSlotOffersForResourceRequirements(resourceRequirements);
 
@@ -177,7 +204,7 @@ class DefaultDeclarativeSlotPoolTest {
                         });
     }
 
-    @Test
+    @TestTemplate
     void testDuplicateSlotOfferings() throws InterruptedException {
         final NewSlotsService notifyNewSlots = new NewSlotsService();
         final DefaultDeclarativeSlotPool slotPool =
@@ -187,6 +214,8 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         final Collection<SlotOffer> slotOffers =
                 createSlotOffersForResourceRequirements(resourceRequirements);
 
@@ -202,7 +231,7 @@ class DefaultDeclarativeSlotPoolTest {
         assertThat(notifyNewSlots.hasNextNewSlots()).isFalse();
     }
 
-    @Test
+    @TestTemplate
     void testOfferingTooManySlotsWillRejectSuperfluousSlots() {
         final NewSlotsService notifyNewSlots = new NewSlotsService();
         final DefaultDeclarativeSlotPool slotPool =
@@ -212,6 +241,8 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         final ResourceCounter increasedRequirements =
                 resourceRequirements.add(RESOURCE_PROFILE_1, 2);
 
@@ -235,13 +266,13 @@ class DefaultDeclarativeSlotPoolTest {
                                         .isEqualTo((long) 
resourceCount.getValue()));
     }
 
-    @Test
+    @TestTemplate
     void testReleaseSlotsRemovesSlots() throws InterruptedException {
         final NewResourceRequirementsService notifyNewResourceRequirements =
                 new NewResourceRequirementsService();
         final DefaultDeclarativeSlotPool slotPool =
-                
createDefaultDeclarativeSlotPool(notifyNewResourceRequirements);
-
+                createDefaultDeclarativeSlotPool(
+                        notifyNewResourceRequirements, slotRequestMaxInterval);
         final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
         increaseRequirementsAndOfferSlotsToSlotPool(
                 slotPool, createResourceRequirements(), taskManagerLocation);
@@ -253,11 +284,9 @@ class DefaultDeclarativeSlotPoolTest {
         assertThat(slotPool.getAllSlotsInformation()).isEmpty();
     }
 
-    @Test
+    @TestTemplate
     void testReleaseSlotsReturnsSlot() {
-        final DefaultDeclarativeSlotPool slotPool =
-                DefaultDeclarativeSlotPoolBuilder.builder().build();
-
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
         final ResourceCounter resourceRequirements = 
createResourceRequirements();
 
         final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
@@ -286,7 +315,7 @@ class DefaultDeclarativeSlotPoolTest {
                                 .collect(Collectors.toList()));
     }
 
-    @Test
+    @TestTemplate
     void testReleaseSlotsOnlyReturnsFulfilledRequirementsOfReservedSlots() {
         
withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles(
                 (slotPool, freeSlot, slotToReserve, taskManagerLocation) -> {
@@ -311,7 +340,7 @@ class DefaultDeclarativeSlotPoolTest {
                 });
     }
 
-    @Test
+    @TestTemplate
     void testReleaseSlotOnlyReturnsFulfilledRequirementsOfReservedSlots() {
         
withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles(
                 (slotPool, freeSlot, slotToReserve, ignored) -> {
@@ -336,11 +365,10 @@ class DefaultDeclarativeSlotPoolTest {
                 });
     }
 
-    private static void 
withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles(
+    private void 
withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles(
             QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, 
TaskManagerLocation>
                     test) {
-        final DefaultDeclarativeSlotPool slotPool =
-                DefaultDeclarativeSlotPoolBuilder.builder().build();
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
 
         final ResourceCounter resourceRequirements =
                 ResourceCounter.withResource(RESOURCE_PROFILE_1, 
1).add(RESOURCE_PROFILE_2, 1);
@@ -366,7 +394,7 @@ class DefaultDeclarativeSlotPoolTest {
         test.accept(slotPool, slot1, slot2, taskManagerLocation);
     }
 
-    @Test
+    @TestTemplate
     void testReleaseSlotDecreasesFulfilledResourceRequirements() throws 
InterruptedException {
         final NewSlotsService notifyNewSlots = new NewSlotsService();
         final DefaultDeclarativeSlotPool slotPool =
@@ -387,7 +415,7 @@ class DefaultDeclarativeSlotPoolTest {
                 .isEqualTo(finalResourceRequirements);
     }
 
-    @Test
+    @TestTemplate
     void testReleaseSlotReturnsSlot() throws InterruptedException {
         final NewSlotsService notifyNewSlots = new NewSlotsService();
         final DefaultDeclarativeSlotPool slotPool =
@@ -417,13 +445,14 @@ class DefaultDeclarativeSlotPoolTest {
         assertThat(freedSlot).isEqualTo(physicalSlot.getAllocationId());
     }
 
-    @Test
+    @TestTemplate
     void testReturnIdleSlotsAfterTimeout() {
         final Duration idleSlotTimeout = Duration.ofSeconds(10);
         final long offerTime = 0;
         final DefaultDeclarativeSlotPool slotPool =
                 DefaultDeclarativeSlotPoolBuilder.builder()
                         .setIdleSlotTimeout(idleSlotTimeout)
+                        
.setComponentMainThreadExecutor(componentMainThreadExecutor)
                         .build();
 
         final ResourceCounter resourceRequirements = 
createResourceRequirements();
@@ -461,13 +490,14 @@ class DefaultDeclarativeSlotPoolTest {
         assertThat(slotPool.getAllSlotsInformation()).isEmpty();
     }
 
-    @Test
+    @TestTemplate
     void testOnlyReturnExcessIdleSlots() {
         final Duration idleSlotTimeout = Duration.ofSeconds(10);
         final long offerTime = 0;
         final DefaultDeclarativeSlotPool slotPool =
                 DefaultDeclarativeSlotPoolBuilder.builder()
                         .setIdleSlotTimeout(idleSlotTimeout)
+                        
.setComponentMainThreadExecutor(componentMainThreadExecutor)
                         .build();
 
         final ResourceCounter resourceRequirements = 
createResourceRequirements();
@@ -475,6 +505,9 @@ class DefaultDeclarativeSlotPoolTest {
                 createSlotOffersForResourceRequirements(resourceRequirements);
 
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+        slotPool.tryWaitSlotRequestIsDone();
+
         final Collection<SlotOffer> acceptedSlots =
                 SlotPoolTestUtils.offerSlots(slotPool, slotOffers);
 
@@ -489,7 +522,7 @@ class DefaultDeclarativeSlotPoolTest {
         
assertThat(slotPool.getFulfilledResourceRequirements()).isEqualTo(requiredResources);
     }
 
-    @Test
+    @TestTemplate
     void 
testFreedSlotWillBeUsedToFulfillOutstandingResourceRequirementsOfSameProfile()
             throws InterruptedException {
         final NewSlotsService notifyNewSlots = new NewSlotsService();
@@ -553,9 +586,9 @@ class DefaultDeclarativeSlotPoolTest {
                                         .isEqualTo(RESOURCE_PROFILE_1));
     }
 
-    @Test
+    @TestTemplate
     void testFreedSlotWillRemainAssignedToMatchedResourceProfile() {
-        final DefaultDeclarativeSlotPool slotPool = new 
DefaultDeclarativeSlotPoolBuilder().build();
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
 
         final ResourceProfile largeResourceProfile =
                 ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
@@ -564,6 +597,9 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(
                 ResourceCounter.withResource(largeResourceProfile, 1));
+
+        slotPool.tryWaitSlotRequestIsDone();
+
         SlotPoolTestUtils.offerSlots(
                 slotPool,
                 createSlotOffersForResourceRequirements(
@@ -580,6 +616,9 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(
                 ResourceCounter.withResource(smallResourceProfile, 1));
+
+        slotPool.tryWaitSlotRequestIsDone();
+
         slotPool.decreaseResourceRequirementsBy(
                 ResourceCounter.withResource(largeResourceProfile, 1));
 
@@ -598,9 +637,9 @@ class DefaultDeclarativeSlotPoolTest {
                 .isEqualTo(0);
     }
 
-    @Test
+    @TestTemplate
     void 
testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequirements() {
-        final DefaultDeclarativeSlotPool slotPool = new 
DefaultDeclarativeSlotPoolBuilder().build();
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
 
         final ResourceProfile largeResourceProfile =
                 ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
@@ -608,6 +647,9 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(
                 ResourceCounter.withResource(largeResourceProfile, 1));
+
+        slotPool.tryWaitSlotRequestIsDone();
+
         SlotPoolTestUtils.offerSlots(
                 slotPool,
                 createSlotOffersForResourceRequirements(
@@ -615,6 +657,8 @@ class DefaultDeclarativeSlotPoolTest {
         slotPool.increaseResourceRequirementsBy(
                 ResourceCounter.withResource(smallResourceProfile, 1));
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         final SlotInfo largeSlot =
                 
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream()
                         .filter(slot -> 
slot.getResourceProfile().equals(largeResourceProfile))
@@ -638,7 +682,7 @@ class DefaultDeclarativeSlotPoolTest {
 
     @Test
     void testSetResourceRequirementsForInitialResourceRequirements() {
-        final DefaultDeclarativeSlotPool slotPool = new 
DefaultDeclarativeSlotPoolBuilder().build();
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
 
         final ResourceCounter resourceRequirements =
                 ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
@@ -651,7 +695,7 @@ class DefaultDeclarativeSlotPoolTest {
 
     @Test
     void testSetResourceRequirementsOverwritesPreviousValue() {
-        final DefaultDeclarativeSlotPool slotPool = new 
DefaultDeclarativeSlotPoolBuilder().build();
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
 
         
slotPool.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1,
 1));
 
@@ -663,9 +707,9 @@ class DefaultDeclarativeSlotPoolTest {
                 .isEqualTo(toResourceRequirements(resourceRequirements));
     }
 
-    @Test
+    @TestTemplate
     void testRegisterSlotsDoesNotAffectRequirements() {
-        final DefaultDeclarativeSlotPool slotPool = new 
DefaultDeclarativeSlotPoolBuilder().build();
+        final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool();
 
         final ResourceProfile slotProfile = RESOURCE_PROFILE_1;
         final ResourceProfile requestedProfile = ResourceProfile.UNKNOWN;
@@ -683,6 +727,9 @@ class DefaultDeclarativeSlotPoolTest {
         assertThat(slotPool.getResourceRequirements()).isEmpty();
 
         
slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource(requestedProfile,
 1));
+
+        slotPool.tryWaitSlotRequestIsDone();
+
         slotPool.reserveFreeSlot(allocationId, requestedProfile);
         slotPool.freeReservedSlot(allocationId, null, 1L);
         
slotPool.decreaseResourceRequirementsBy(ResourceCounter.withResource(requestedProfile,
 1));
@@ -711,15 +758,17 @@ class DefaultDeclarativeSlotPoolTest {
     }
 
     @Nonnull
-    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool(
-            NewResourceRequirementsService requirementsListener) {
+    private DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool(
+            NewResourceRequirementsService requirementsListener, Duration 
slotRequestMaxInterval) {
         return DefaultDeclarativeSlotPoolBuilder.builder()
+                .setSlotRequestMaxInterval(slotRequestMaxInterval)
                 .setNotifyNewResourceRequirements(requirementsListener)
+                .setComponentMainThreadExecutor(componentMainThreadExecutor)
                 .build();
     }
 
     @Nonnull
-    private static DefaultDeclarativeSlotPool 
createDefaultDeclarativeSlotPoolWithNewSlotsListener(
+    private DefaultDeclarativeSlotPool 
createDefaultDeclarativeSlotPoolWithNewSlotsListener(
             DeclarativeSlotPool.NewSlotsListener newSlotsListener) {
         final DefaultDeclarativeSlotPool declarativeSlotPool = 
createDefaultDeclarativeSlotPool();
 
@@ -728,8 +777,10 @@ class DefaultDeclarativeSlotPoolTest {
     }
 
     @Nonnull
-    private static DefaultDeclarativeSlotPool 
createDefaultDeclarativeSlotPool() {
-        return DefaultDeclarativeSlotPoolBuilder.builder().build();
+    private DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool() {
+        return DefaultDeclarativeSlotPoolBuilder.builder()
+                .setComponentMainThreadExecutor(componentMainThreadExecutor)
+                .build();
     }
 
     @Nonnull
@@ -752,6 +803,8 @@ class DefaultDeclarativeSlotPoolTest {
 
         slotPool.increaseResourceRequirementsBy(resourceRequirements);
 
+        slotPool.tryWaitSlotRequestIsDone();
+
         return slotPool.offerSlots(
                 slotOffers,
                 taskManagerLocation == null ? new LocalTaskManagerLocation() : 
taskManagerLocation,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java
index e69de29bb2d..4300dabdc6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.assertj.core.util.Lists;
+
+import java.time.Duration;
+import java.util.List;
+
+/** Tests base class for the {@link DefaultDeclarativeSlotPool}. */
+abstract class DefaultDeclarativeSlotPoolTestBase {
+
+    // Enabled the slot batch request if the interval is greater than 
Duration.ZERO, disabled
+    // else.
+    @Parameter protected Duration slotRequestMaxInterval;
+
+    protected ComponentMainThreadExecutor componentMainThreadExecutor =
+            ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+    @Parameters(name = "slotRequestMaxInterval: {0}")
+    static List<Duration> getParametersCouples() {
+        return Lists.newArrayList(Duration.ofMillis(50L), Duration.ZERO);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
index d717d61228e..bf4349e7f70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java
@@ -63,7 +63,10 @@ public class PhysicalSlotProviderExtension implements 
BeforeEachCallback, AfterE
         this.mainThreadExecutor =
                 
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
                         singleThreadScheduledExecutorService);
-        slotPool = new 
DeclarativeSlotPoolBridgeBuilder().buildAndStart(mainThreadExecutor);
+        slotPool =
+                new DeclarativeSlotPoolBridgeBuilder()
+                        .setMainThreadExecutor(mainThreadExecutor)
+                        .buildAndStart();
         physicalSlotProvider = new 
PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
index ae2d2e48ad6..e7117b3a23d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
@@ -66,7 +66,9 @@ class 
PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest {
             throws Exception {
         DeclarativeSlotPoolBridge slotPool =
                 new DeclarativeSlotPoolBridgeBuilder()
-                        
.buildAndStart(physicalSlotProviderExtension.getMainThreadExecutor());
+                        .setMainThreadExecutor(
+                                
physicalSlotProviderExtension.getMainThreadExecutor())
+                        .buildAndStart();
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
 
         final PhysicalSlotProvider slotProvider =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
index 0c9a808eaa6..848df3cc044 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
@@ -107,7 +107,9 @@ class PhysicalSlotProviderImplWithSpreadOutStrategyTest {
             throws Exception {
         DeclarativeSlotPoolBridge slotPool =
                 new DeclarativeSlotPoolBridgeBuilder()
-                        
.buildAndStart(physicalSlotProviderExtension.getMainThreadExecutor());
+                        .setMainThreadExecutor(
+                                
physicalSlotProviderExtension.getMainThreadExecutor())
+                        .buildAndStart();
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
 
         final PhysicalSlotProvider slotProvider =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
index 208e71aa5fb..524fc9064c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
@@ -238,7 +238,8 @@ class SlotPoolBatchSlotRequestTest {
         return new DeclarativeSlotPoolBridgeBuilder()
                 .setResourceManagerGateway(resourceManagerGateway)
                 .setBatchSlotTimeout(batchSlotTimeout.toDuration())
-                .buildAndStart(componentMainThreadExecutor);
+                .setMainThreadExecutor(componentMainThreadExecutor)
+                .buildAndStart();
     }
 
     private DeclarativeSlotPoolBridge createAndSetUpSlotPool(
@@ -252,6 +253,7 @@ class SlotPoolBatchSlotRequestTest {
                 .setResourceManagerGateway(resourceManagerGateway)
                 .setBatchSlotTimeout(batchSlotTimeout.toDuration())
                 .setClock(clock)
-                .buildAndStart(componentMainThreadExecutor);
+                .setMainThreadExecutor(componentMainThreadExecutor)
+                .buildAndStart();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
index a746a216348..a000bd251f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
@@ -116,13 +116,15 @@ class SlotPoolInteractionsTest {
 
     private DeclarativeSlotPoolBridge createAndSetUpSlotPool() throws 
Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
-                .buildAndStart(testMainThreadExecutor.getMainThreadExecutor());
+                
.setMainThreadExecutor(testMainThreadExecutor.getMainThreadExecutor())
+                .buildAndStart();
     }
 
     private DeclarativeSlotPoolBridge 
createAndSetUpSlotPoolWithoutResourceManager()
             throws Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
                 .setResourceManagerGateway(null)
-                .buildAndStart(testMainThreadExecutor.getMainThreadExecutor());
+                
.setMainThreadExecutor(testMainThreadExecutor.getMainThreadExecutor())
+                .buildAndStart();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
index 3ca17396654..69527b8b602 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
+import javax.annotation.Nonnull;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
@@ -39,7 +42,9 @@ final class TestingDeclarativeSlotPoolFactory implements 
DeclarativeSlotPoolFact
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
             Duration idleSlotTimeout,
-            Duration rpcTimeout) {
+            Duration rpcTimeout,
+            Duration slotRequestMaxInterval,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         return builder.build();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
index 5d10289686b..9cafb578bc5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
@@ -21,14 +21,12 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.function.TriConsumer;
 import org.apache.flink.util.function.TriFunction;
 
 import javax.annotation.Nullable;
@@ -44,9 +42,7 @@ public class TestingSlotPoolService implements 
SlotPoolService {
 
     private final JobID jobId;
 
-    private final TriConsumer<
-                    ? super JobMasterId, ? super String, ? super 
ComponentMainThreadExecutor>
-            startConsumer;
+    private final BiFunction<? super JobMasterId, ? super String, Void> 
startConsumer;
 
     private final Runnable closeRunnable;
 
@@ -78,8 +74,7 @@ public class TestingSlotPoolService implements 
SlotPoolService {
 
     public TestingSlotPoolService(
             JobID jobId,
-            TriConsumer<? super JobMasterId, ? super String, ? super 
ComponentMainThreadExecutor>
-                    startConsumer,
+            BiFunction<? super JobMasterId, ? super String, Void> 
startConsumer,
             Runnable closeRunnable,
             TriFunction<
                             ? super TaskManagerLocation,
@@ -112,10 +107,8 @@ public class TestingSlotPoolService implements 
SlotPoolService {
     }
 
     @Override
-    public void start(
-            JobMasterId jobMasterId, String address, 
ComponentMainThreadExecutor mainThreadExecutor)
-            throws Exception {
-        startConsumer.accept(jobMasterId, address, mainThreadExecutor);
+    public void start(JobMasterId jobMasterId, String address) throws 
Exception {
+        startConsumer.apply(jobMasterId, address);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
index aed1790d597..2a848b82abe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.function.TriConsumer;
 import org.apache.flink.util.function.TriFunction;
 
 import javax.annotation.Nonnull;
@@ -43,8 +42,8 @@ import java.util.function.Function;
 /** {@link SlotPoolServiceFactory} which creates a {@link 
TestingSlotPoolService}. */
 public class TestingSlotPoolServiceBuilder implements SlotPoolServiceFactory {
 
-    private TriConsumer<? super JobMasterId, ? super String, ? super 
ComponentMainThreadExecutor>
-            startConsumer = (ignoredA, ignoredB, ignoredC) -> {};
+    private BiFunction<? super JobMasterId, ? super String, Void> 
startConsumer =
+            (ignoredA, ignoredB) -> null;
     private Runnable closeRunnable = () -> {};
     private TriFunction<
                     ? super TaskManagerLocation,
@@ -71,7 +70,9 @@ public class TestingSlotPoolServiceBuilder implements 
SlotPoolServiceFactory {
     @Nonnull
     @Override
     public SlotPoolService createSlotPoolService(
-            @Nonnull JobID jobId, DeclarativeSlotPoolFactory 
declarativeSlotPoolFactory) {
+            @Nonnull JobID jobId,
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
         return new TestingSlotPoolService(
                 jobId,
                 startConsumer,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
index f7a22ac3b65..0693f6b2d89 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
@@ -186,7 +186,8 @@ class DefaultSchedulerBatchSchedulingTest {
             throws Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
                 .setBatchSlotTimeout(batchSlotTimeout.toDuration())
-                .buildAndStart(mainThreadExecutor);
+                .setMainThreadExecutor(mainThreadExecutor)
+                .buildAndStart();
     }
 
     private JobGraph createBatchJobGraph(int parallelism) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 531d757948b..43384c7f2bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1600,7 +1600,8 @@ public class DefaultSchedulerTest {
         final SlotPool slotPool =
                 new DeclarativeSlotPoolBridgeBuilder()
                         .setBatchSlotTimeout(slotTimeout)
-                        .buildAndStart(singleThreadMainThreadExecutor);
+                        .setMainThreadExecutor(singleThreadMainThreadExecutor)
+                        .buildAndStart();
         final PhysicalSlotProvider slotProvider =
                 new PhysicalSlotProviderImpl(
                         
LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index b3207436a47..433c6d66a90 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -118,7 +118,9 @@ public class AdaptiveSchedulerBuilder {
                         new DefaultAllocatedSlotPool(),
                         ignored -> {},
                         DEFAULT_TIMEOUT,
-                        rpcTimeout);
+                        rpcTimeout,
+                        Duration.ZERO,
+                        mainThreadExecutor);
         this.executorService = executorService;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index f6e7246687b..2d035775f63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -151,6 +151,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
@@ -536,7 +537,9 @@ public class AdaptiveSchedulerTest {
                         new DefaultAllocatedSlotPool(),
                         ignored -> {},
                         Duration.ofMinutes(10),
-                        Duration.ofMinutes(10));
+                        Duration.ofMinutes(10),
+                        Duration.ZERO,
+                        mainThreadExecutor);
 
         final Configuration configuration = 
createConfigurationWithNoTimeouts();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
@@ -2312,7 +2315,9 @@ public class AdaptiveSchedulerTest {
                 new DefaultAllocatedSlotPool(),
                 ignored -> {},
                 idleSlotTimeout,
-                DEFAULT_TIMEOUT);
+                DEFAULT_TIMEOUT,
+                Duration.ZERO,
+                forMainThread());
     }
 
     private static JobGraph createJobGraph() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
index 425938dceaf..835880ced4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
@@ -66,7 +66,10 @@ public class SchedulerEndToEndBenchmarkBase extends 
SchedulerBenchmarkBase {
         final List<JobVertex> jobVertices = 
createDefaultJobVertices(jobConfiguration);
         jobGraph = createJobGraph(jobVertices, jobConfiguration);
 
-        slotPool = new 
DeclarativeSlotPoolBridgeBuilder().buildAndStart(mainThreadExecutor);
+        slotPool =
+                new DeclarativeSlotPoolBridgeBuilder()
+                        .setMainThreadExecutor(mainThreadExecutor)
+                        .buildAndStart();
         SlotSelectionStrategy slotSelectionStrategy =
                 jobConfiguration.isEvenlySpreadOutSlots()
                         ? 
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()

Reply via email to