This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ea752ff1129168d369f03cf25d5c81f661b4b405 Author: Roc Marshal <[email protected]> AuthorDate: Wed Dec 13 13:19:06 2023 +0800 [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler --- .../generated/expert_scheduling_section.html | 6 + .../generated/job_manager_configuration.html | 6 + .../flink/configuration/JobManagerOptions.java | 7 + .../DefaultSlotPoolServiceSchedulerFactory.java | 20 ++- .../apache/flink/runtime/jobmaster/JobMaster.java | 5 +- .../jobmaster/SlotPoolServiceSchedulerFactory.java | 7 +- .../slotpool/AbstractSlotPoolServiceFactory.java | 6 +- .../slotpool/BlocklistDeclarativeSlotPool.java | 14 +- .../BlocklistDeclarativeSlotPoolFactory.java | 9 +- .../slotpool/DeclarativeSlotPoolBridge.java | 18 ++- .../DeclarativeSlotPoolBridgeServiceFactory.java | 12 +- .../slotpool/DeclarativeSlotPoolFactory.java | 5 +- .../slotpool/DeclarativeSlotPoolService.java | 27 ++-- .../DeclarativeSlotPoolServiceFactory.java | 20 ++- .../slotpool/DefaultDeclarativeSlotPool.java | 49 ++++++- .../DefaultDeclarativeSlotPoolFactory.java | 11 +- .../flink/runtime/jobmaster/slotpool/SlotPool.java | 7 +- .../jobmaster/slotpool/SlotPoolService.java | 6 +- .../jobmaster/slotpool/SlotPoolServiceFactory.java | 5 +- .../executiongraph/ExecutionGraphRestartTest.java | 2 +- .../flink/runtime/jobmaster/JobMasterTest.java | 9 +- .../AbstractDeclarativeSlotPoolBridgeTest.java | 107 +++++++++++++++ .../slotpool/BlocklistDeclarativeSlotPoolTest.java | 53 ++++++-- .../slotpool/DeclarativeSlotPoolBridgeBuilder.java | 42 +++++- ...tiveSlotPoolBridgePreferredAllocationsTest.java | 12 +- ...arativeSlotPoolBridgeRequestCompletionTest.java | 5 +- ...ativeSlotPoolBridgeResourceDeclarationTest.java | 47 +++---- .../slotpool/DeclarativeSlotPoolBridgeTest.java | 98 +++++--------- .../slotpool/DeclarativeSlotPoolServiceTest.java | 11 +- .../DefaultDeclarativeSlotPoolBuilder.java | 22 +++- .../slotpool/DefaultDeclarativeSlotPoolTest.java | 145 ++++++++++++++------- .../DefaultDeclarativeSlotPoolTestBase.java | 45 +++++++ .../slotpool/PhysicalSlotProviderExtension.java | 5 +- ...erImplWithDefaultSlotSelectionStrategyTest.java | 4 +- ...lSlotProviderImplWithSpreadOutStrategyTest.java | 4 +- .../slotpool/SlotPoolBatchSlotRequestTest.java | 6 +- .../slotpool/SlotPoolInteractionsTest.java | 6 +- .../TestingDeclarativeSlotPoolFactory.java | 7 +- .../jobmaster/slotpool/TestingSlotPoolService.java | 15 +-- .../slotpool/TestingSlotPoolServiceBuilder.java | 9 +- .../DefaultSchedulerBatchSchedulingTest.java | 3 +- .../runtime/scheduler/DefaultSchedulerTest.java | 3 +- .../adaptive/AdaptiveSchedulerBuilder.java | 4 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 9 +- .../e2e/SchedulerEndToEndBenchmarkBase.java | 5 +- 45 files changed, 655 insertions(+), 263 deletions(-) diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html index 6be6547547c..5ea42dab4d1 100644 --- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html +++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html @@ -152,6 +152,12 @@ <td>Duration</td> <td>The timeout for a idle slot in Slot Pool.</td> </tr> + <tr> + <td><h5>slot.request.max-interval</h5></td> + <td style="word-wrap: break-word;">20 ms</td> + <td>Duration</td> + <td>The max interval duration for slots request.</td> + </tr> <tr> <td><h5>slot.request.timeout</h5></td> <td style="word-wrap: break-word;">5 min</td> diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html index 3b2ecf56ffe..f81b21209b0 100644 --- a/docs/layouts/shortcodes/generated/job_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html @@ -224,6 +224,12 @@ <td>Duration</td> <td>The timeout for a idle slot in Slot Pool.</td> </tr> + <tr> + <td><h5>slot.request.max-interval</h5></td> + <td style="word-wrap: break-word;">20 ms</td> + <td>Duration</td> + <td>The max interval duration for slots request.</td> + </tr> <tr> <td><h5>slot.request.timeout</h5></td> <td style="word-wrap: break-word;">5 min</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index e1fdb25c48b..6bdb8db7f7c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -447,6 +447,13 @@ public class JobManagerOptions { .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue()) .withDescription("The timeout for a idle slot in Slot Pool."); + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Duration> SLOT_REQUEST_MAX_INTERVAL = + key("slot.request.max-interval") + .durationType() + .defaultValue(Duration.ofMillis(20L)) + .withDescription("The max interval duration for slots request."); + /** Config parameter determining the scheduler implementation. */ @Documentation.Section({ Documentation.Sections.EXPERT_SCHEDULING, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 6474933f962..bd51b9049b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -57,11 +57,15 @@ import org.apache.flink.util.clock.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.time.Duration; import java.util.Collection; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_MAX_INTERVAL; + /** Default {@link SlotPoolServiceSchedulerFactory} implementation. */ public final class DefaultSlotPoolServiceSchedulerFactory implements SlotPoolServiceSchedulerFactory { @@ -86,8 +90,11 @@ public final class DefaultSlotPoolServiceSchedulerFactory @Override public SlotPoolService createSlotPoolService( - JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { - return slotPoolServiceFactory.createSlotPoolService(jid, declarativeSlotPoolFactory); + JobID jid, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { + return slotPoolServiceFactory.createSlotPoolService( + jid, declarativeSlotPoolFactory, componentMainThreadExecutor); } @Override @@ -162,6 +169,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory JobManagerOptions.SchedulerType schedulerType = getSchedulerType(configuration, jobType, isDynamicGraph); + final Duration slotRequestMaxInterval = configuration.get(SLOT_REQUEST_MAX_INTERVAL); + if (configuration .getOptional(JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT) .isPresent()) { @@ -180,13 +189,17 @@ public final class DefaultSlotPoolServiceSchedulerFactory rpcTimeout, slotIdleTimeout, batchSlotTimeout, + slotRequestMaxInterval, getRequestSlotMatchingStrategy(configuration, jobType)); break; case Adaptive: schedulerNGFactory = new AdaptiveSchedulerFactory(); slotPoolServiceFactory = new DeclarativeSlotPoolServiceFactory( - SystemClock.getInstance(), slotIdleTimeout, rpcTimeout); + SystemClock.getInstance(), + slotIdleTimeout, + rpcTimeout, + slotRequestMaxInterval); break; case AdaptiveBatch: schedulerNGFactory = new AdaptiveBatchSchedulerFactory(); @@ -196,6 +209,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory rpcTimeout, slotIdleTimeout, batchSlotTimeout, + slotRequestMaxInterval, getRequestSlotMatchingStrategy(configuration, jobType)); break; default: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index bb681f481cd..8fb28695d2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -361,7 +361,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), + getMainThreadExecutor()); this.partitionTracker = checkNotNull(partitionTrackerFactory) @@ -1146,7 +1147,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> createResourceManagerHeartbeatManager(heartbeatServices); // start the slot pool make sure the slot pool now accepts messages for this leader - slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor()); + slotPoolService.start(getFencingToken(), getAddress()); // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java index 02a610988da..705625af9ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java @@ -39,6 +39,8 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; +import javax.annotation.Nonnull; + import java.util.Collection; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -51,10 +53,13 @@ public interface SlotPoolServiceSchedulerFactory { * * @param jid jid is the JobID to pass to the service * @param declarativeSlotPoolFactory the declarative slot pool factory + * @param componentMainThreadExecutor component main thread executor. * @return created SlotPoolService */ SlotPoolService createSlotPoolService( - JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory); + JobID jid, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor); /** * Returns the scheduler type this factory is creating. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java index ba281d1cd39..484e7fce3b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java @@ -35,14 +35,18 @@ public abstract class AbstractSlotPoolServiceFactory implements SlotPoolServiceF @Nonnull protected final Duration batchSlotTimeout; + @Nonnull protected final Duration slotRequestMaxInterval; + protected AbstractSlotPoolServiceFactory( @Nonnull Clock clock, @Nonnull Duration rpcTimeout, @Nonnull Duration slotIdleTimeout, - @Nonnull Duration batchSlotTimeout) { + @Nonnull Duration batchSlotTimeout, + @Nonnull Duration slotRequestMaxInterval) { this.clock = clock; this.rpcTimeout = rpcTimeout; this.slotIdleTimeout = slotIdleTimeout; this.batchSlotTimeout = batchSlotTimeout; + this.slotRequestMaxInterval = slotRequestMaxInterval; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java index c8f9ada9fdc..7eb95b0d678 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.slots.ResourceRequirement; @@ -56,8 +57,17 @@ public class BlocklistDeclarativeSlotPool extends DefaultDeclarativeSlotPool { Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, BlockedTaskManagerChecker blockedTaskManagerChecker, Duration idleSlotTimeout, - Duration rpcTimeout) { - super(jobId, slotPool, notifyNewResourceRequirements, idleSlotTimeout, rpcTimeout); + Duration rpcTimeout, + Duration slotRequestMaxInterval, + ComponentMainThreadExecutor componentMainThreadExecutor) { + super( + jobId, + slotPool, + notifyNewResourceRequirements, + idleSlotTimeout, + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); this.blockedTaskManagerChecker = checkNotNull(blockedTaskManagerChecker); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java index 7b85f9e4ebd..ceb9b53d249 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.slots.ResourceRequirement; import java.time.Duration; @@ -43,13 +44,17 @@ public class BlocklistDeclarativeSlotPoolFactory implements DeclarativeSlotPoolF JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Duration idleSlotTimeout, - Duration rpcTimeout) { + Duration rpcTimeout, + Duration slotRequestMaxInterval, + ComponentMainThreadExecutor componentMainThreadExecutor) { return new BlocklistDeclarativeSlotPool( jobId, new DefaultAllocatedSlotPool(), notifyNewResourceRequirements, blockedTaskManagerChecker, idleSlotTimeout, - rpcTimeout); + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 444c2aa0759..b04dc25f615 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -66,8 +66,6 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem private final RequestSlotMatchingStrategy requestSlotMatchingStrategy; - @Nullable private ComponentMainThreadExecutor componentMainThreadExecutor; - private final Duration batchSlotTimeout; private boolean isBatchSlotRequestTimeoutCheckDisabled; @@ -80,8 +78,17 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem Duration rpcTimeout, Duration idleSlotTimeout, Duration batchSlotTimeout, - RequestSlotMatchingStrategy requestSlotMatchingStrategy) { - super(jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, rpcTimeout); + RequestSlotMatchingStrategy requestSlotMatchingStrategy, + Duration slotRequestMaxInterval, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { + super( + jobId, + declarativeSlotPoolFactory, + clock, + idleSlotTimeout, + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); this.idleSlotTimeout = idleSlotTimeout; this.batchSlotTimeout = Preconditions.checkNotNull(batchSlotTimeout); @@ -107,8 +114,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem } @Override - protected void onStart(ComponentMainThreadExecutor componentMainThreadExecutor) { - this.componentMainThreadExecutor = componentMainThreadExecutor; + protected void onStart() { getDeclarativeSlotPool().registerNewSlotsListener(this::newSlotsAreAvailable); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java index 279a2d58df3..c446c8727da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.util.clock.Clock; import javax.annotation.Nonnull; @@ -35,15 +36,18 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolSer @Nonnull Duration rpcTimeout, @Nonnull Duration slotIdleTimeout, @Nonnull Duration batchSlotTimeout, + @Nonnull Duration slotRequestMaxInterval, @Nonnull RequestSlotMatchingStrategy requestSlotMatchingStrategy) { - super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout); + super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout, slotRequestMaxInterval); this.requestSlotMatchingStrategy = requestSlotMatchingStrategy; } @Nonnull @Override public SlotPoolService createSlotPoolService( - @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { + @Nonnull JobID jobId, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { return new DeclarativeSlotPoolBridge( jobId, declarativeSlotPoolFactory, @@ -51,6 +55,8 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolSer rpcTimeout, slotIdleTimeout, batchSlotTimeout, - requestSlotMatchingStrategy); + requestSlotMatchingStrategy, + slotRequestMaxInterval, + componentMainThreadExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java index 6630254552c..8a231e267e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.slots.ResourceRequirement; import java.time.Duration; @@ -31,5 +32,7 @@ public interface DeclarativeSlotPoolFactory { JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Duration idleSlotTimeout, - Duration rpcTimeout); + Duration rpcTimeout, + Duration slotRequestMaxInterval, + ComponentMainThreadExecutor componentMainThreadExecutor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index 9f00b8ff0bb..2345a553eb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -41,6 +40,7 @@ import org.apache.flink.util.clock.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.time.Duration; @@ -76,21 +76,30 @@ public class DeclarativeSlotPoolService implements SlotPoolService { @Nullable private String jobManagerAddress; private State state = State.CREATED; + protected final ComponentMainThreadExecutor componentMainThreadExecutor; public DeclarativeSlotPoolService( JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Duration idleSlotTimeout, - Duration rpcTimeout) { + Duration rpcTimeout, + Duration slotRequestMaxInterval, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { this.jobId = jobId; this.clock = clock; this.rpcTimeout = rpcTimeout; this.registeredTaskManagers = new HashSet<>(); + this.componentMainThreadExecutor = componentMainThreadExecutor; this.declarativeSlotPool = declarativeSlotPoolFactory.create( - jobId, this::declareResourceRequirements, idleSlotTimeout, rpcTimeout); + jobId, + this::declareResourceRequirements, + idleSlotTimeout, + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); } protected DeclarativeSlotPool getDeclarativeSlotPool() { @@ -111,9 +120,7 @@ public class DeclarativeSlotPoolService implements SlotPoolService { } @Override - public final void start( - JobMasterId jobMasterId, String address, ComponentMainThreadExecutor mainThreadExecutor) - throws Exception { + public final void start(JobMasterId jobMasterId, String address) throws Exception { Preconditions.checkState( state == State.CREATED, "The DeclarativeSlotPoolService can only be started once."); @@ -122,9 +129,9 @@ public class DeclarativeSlotPoolService implements SlotPoolService { this.resourceRequirementServiceConnectionManager = DefaultDeclareResourceRequirementServiceConnectionManager.create( - mainThreadExecutor); + componentMainThreadExecutor); - onStart(mainThreadExecutor); + onStart(); state = State.STARTED; } @@ -132,10 +139,8 @@ public class DeclarativeSlotPoolService implements SlotPoolService { /** * This method is called when the slot pool service is started. It can be overridden by * subclasses. - * - * @param componentMainThreadExecutor componentMainThreadExecutor used by this slot pool service */ - protected void onStart(ComponentMainThreadExecutor componentMainThreadExecutor) {} + protected void onStart() {} protected void assertHasBeenStarted() { Preconditions.checkState( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java index 0680a9587c3..be26dd496e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.util.clock.Clock; import javax.annotation.Nonnull; @@ -31,19 +32,32 @@ public class DeclarativeSlotPoolServiceFactory implements SlotPoolServiceFactory private final Clock clock; private final Duration idleSlotTimeout; private final Duration rpcTimeout; + private final @Nonnull Duration slotRequestMaxInterval; public DeclarativeSlotPoolServiceFactory( - Clock clock, Duration idleSlotTimeout, Duration rpcTimeout) { + Clock clock, + Duration idleSlotTimeout, + Duration rpcTimeout, + @Nonnull Duration slotRequestMaxInterval) { this.clock = clock; this.idleSlotTimeout = idleSlotTimeout; this.rpcTimeout = rpcTimeout; + this.slotRequestMaxInterval = slotRequestMaxInterval; } @Nonnull @Override public SlotPoolService createSlotPoolService( - @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { + @Nonnull JobID jobId, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { return new DeclarativeSlotPoolService( - jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, rpcTimeout); + jobId, + declarativeSlotPoolFactory, + clock, + idleSlotTimeout, + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index 70061b67fa9..2689aaa606f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.messages.Acknowledge; @@ -49,8 +50,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -104,18 +108,28 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + @Nonnull private final ComponentMainThreadExecutor componentMainThreadExecutor; + + // For slots(resources) requests by batch. + @Nonnull private final Duration slotRequestMaxInterval; + @Nullable private ScheduledFuture<?> slotRequestFuture; + public DefaultDeclarativeSlotPool( JobID jobId, AllocatedSlotPool slotPool, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Duration idleSlotTimeout, - Duration rpcTimeout) { + Duration rpcTimeout, + Duration slotRequestMaxInterval, + ComponentMainThreadExecutor componentMainThreadExecutor) { this.jobId = jobId; this.slotPool = slotPool; this.notifyNewResourceRequirements = notifyNewResourceRequirements; this.idleSlotTimeout = idleSlotTimeout; this.rpcTimeout = rpcTimeout; + this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); + this.slotRequestMaxInterval = Preconditions.checkNotNull(slotRequestMaxInterval); this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); @@ -128,7 +142,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { } totalResourceRequirements = totalResourceRequirements.add(increment); - declareResourceRequirements(); + doDeclareResourceRequirements(); } @Override @@ -138,7 +152,25 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { } totalResourceRequirements = totalResourceRequirements.subtract(decrement); - declareResourceRequirements(); + doDeclareResourceRequirements(); + } + + private void doDeclareResourceRequirements() { + if (slotRequestMaxInterval.toMillis() <= 0L) { + declareResourceRequirements(); + return; + } + + if (slotRequestFuture != null + && !slotRequestFuture.isDone() + && !slotRequestFuture.isCancelled()) { + slotRequestFuture.cancel(true); + } + slotRequestFuture = + componentMainThreadExecutor.schedule( + this::declareResourceRequirements, + slotRequestMaxInterval.toMillis(), + TimeUnit.MILLISECONDS); } @Override @@ -603,4 +635,15 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { ResourceCounter getFulfilledResourceRequirements() { return fulfilledResourceRequirements; } + + @VisibleForTesting + void tryWaitSlotRequestIsDone() { + if (Objects.nonNull(slotRequestFuture)) { + try { + slotRequestFuture.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java index 95655b1fe3a..c476dae1aaf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.slots.ResourceRequirement; +import javax.annotation.Nonnull; + import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; @@ -33,12 +36,16 @@ public class DefaultDeclarativeSlotPoolFactory implements DeclarativeSlotPoolFac JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Duration idleSlotTimeout, - Duration rpcTimeout) { + Duration rpcTimeout, + Duration slotRequestMaxInterval, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { return new DefaultDeclarativeSlotPool( jobId, new DefaultAllocatedSlotPool(), notifyNewResourceRequirements, idleSlotTimeout, - rpcTimeout); + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index edff664625c..e32ab5dc42a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; import org.apache.flink.runtime.jobmaster.JobMasterId; @@ -46,11 +45,7 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable { // lifecycle // ------------------------------------------------------------------------ - void start( - JobMasterId jobMasterId, - String newJobManagerAddress, - ComponentMainThreadExecutor jmMainThreadScheduledExecutor) - throws Exception; + void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception; void close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java index b0322dd2c5f..1a0b3513c96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; import org.apache.flink.runtime.jobmaster.JobMaster; @@ -59,12 +58,9 @@ public interface SlotPoolService extends AutoCloseable { * * @param jobMasterId jobMasterId to start the service with * @param address address of the owner - * @param mainThreadExecutor mainThreadExecutor to run actions in the main thread * @throws Exception if the service cannot be started */ - void start( - JobMasterId jobMasterId, String address, ComponentMainThreadExecutor mainThreadExecutor) - throws Exception; + void start(JobMasterId jobMasterId, String address) throws Exception; /** Close the slot pool service. */ void close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java index bfbac79569c..f987f28a67f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import javax.annotation.Nonnull; @@ -27,5 +28,7 @@ public interface SlotPoolServiceFactory { @Nonnull SlotPoolService createSlotPoolService( - @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory); + @Nonnull JobID jobId, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index f87f7a9172b..d4f9c22dd56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -405,7 +405,7 @@ class ExecutionGraphRestartTest { private static void setupSlotPool(SlotPool slotPool) throws Exception { final String jobManagerAddress = "foobar"; final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - slotPool.start(JobMasterId.generate(), jobManagerAddress, mainThreadExecutor); + slotPool.start(JobMasterId.generate(), jobManagerAddress); slotPool.connectToResourceManager(resourceManagerGateway); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e1f60398a3a..55e252ad42a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -485,7 +485,9 @@ class JobMasterTest { @Nonnull @Override public SlotPoolService createSlotPoolService( - @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { + @Nonnull JobID jobId, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { return new TestingSlotPool(jobId, hasReceivedSlotOffers); } } @@ -505,10 +507,7 @@ class JobMasterTest { } @Override - public void start( - JobMasterId jobMasterId, - String newJobManagerAddress, - ComponentMainThreadExecutor jmMainThreadScheduledExecutor) {} + public void start(JobMasterId jobMasterId, String newJobManagerAddress) {} @Override public void close() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java new file mode 100644 index 00000000000..6d9fff147ab --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.util.clock.SystemClock; + +import javax.annotation.Nonnull; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; + +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; + +/** Test base class for {@link DeclarativeSlotPoolBridge}. */ +abstract class AbstractDeclarativeSlotPoolBridgeTest { + + protected static final Duration RPC_TIMEOUT = Duration.ofSeconds(20); + protected static final JobID JOB_ID = new JobID(); + protected static final JobMasterId JOB_MASTER_ID = JobMasterId.generate(); + protected final ComponentMainThreadExecutor componentMainThreadExecutor = forMainThread(); + + @Parameter protected RequestSlotMatchingStrategy requestSlotMatchingStrategy; + + @Parameter(1) + protected Duration slotRequestMaxInterval; + + @Parameters(name = "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval: {1}") + private static Collection<Object[]> data() { + return Arrays.asList( + new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO}, + new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(50)}, + new Object[] { + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ZERO + }, + new Object[] { + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, Duration.ofMillis(50) + }); + } + + @Nonnull + DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + RequestSlotMatchingStrategy requestSlotMatchingStrategy, + Duration slotRequestMaxInterval) { + return createDeclarativeSlotPoolBridge( + declarativeSlotPoolFactory, + requestSlotMatchingStrategy, + slotRequestMaxInterval, + componentMainThreadExecutor); + } + + @Nonnull + DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + RequestSlotMatchingStrategy requestSlotMatchingStrategy, + Duration slotRequestMaxInterval, + ComponentMainThreadExecutor mainThreadExecutor) { + return new DeclarativeSlotPoolBridge( + JOB_ID, + declarativeSlotPoolFactory, + SystemClock.getInstance(), + RPC_TIMEOUT, + Duration.ofSeconds(20), + Duration.ofSeconds(20), + requestSlotMatchingStrategy, + slotRequestMaxInterval, + mainThreadExecutor); + } + + static PhysicalSlot createAllocatedSlot(AllocationID allocationID) { + return new AllocatedSlot( + allocationID, + new LocalTaskManagerLocation(), + 0, + ResourceProfile.ANY, + new RpcTaskManagerGateway( + new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), + JobMasterId.generate())); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java index f0f7e6b719f..df8194f6b68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; @@ -29,8 +30,11 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; import java.util.ArrayList; @@ -44,6 +48,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.FreeSlotConsumer; import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.NewSlotsService; import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createResourceRequirements; @@ -54,17 +59,18 @@ import static org.apache.flink.shaded.guava32.com.google.common.collect.Iterable import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link BlocklistDeclarativeSlotPool}. */ -class BlocklistDeclarativeSlotPoolTest { +@ExtendWith(ParameterizedTestExtension.class) +class BlocklistDeclarativeSlotPoolTest extends DefaultDeclarativeSlotPoolTestBase { private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.newBuilder().setCpuCores(1.7).build(); - @Test + @TestTemplate void testOfferSlotsFromBlockedTaskManager() throws Exception { testOfferSlots(true); } - @Test + @TestTemplate void testOfferSlotsFromUnblockedTaskManager() throws Exception { testOfferSlots(false); } @@ -78,12 +84,16 @@ class BlocklistDeclarativeSlotPoolTest { BlocklistDeclarativeSlotPoolBuilder.builder() .setBlockedTaskManagerChecker( isBlocked ? taskManager.getResourceID()::equals : ignore -> false) + .setSlotRequestMaxInterval(slotRequestMaxInterval) + .setMainThreadExecutor(componentMainThreadExecutor) .build(); slotPool.registerNewSlotsListener(notifyNewSlots); final ResourceCounter resourceRequirements = createResourceRequirements(); slotPool.increaseResourceRequirementsBy(resourceRequirements); + slotPool.tryWaitSlotRequestIsDone(); + // offer slots on the blocked task manager Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); @@ -107,7 +117,7 @@ class BlocklistDeclarativeSlotPoolTest { } } - @Test + @TestTemplate void testOfferDuplicateSlots() { final TaskManagerLocation taskManager = new LocalTaskManagerLocation(); final List<ResourceID> blockedTaskManagers = new ArrayList<>(); @@ -115,12 +125,15 @@ class BlocklistDeclarativeSlotPoolTest { final BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder() .setBlockedTaskManagerChecker(blockedTaskManagers::contains) + .setSlotRequestMaxInterval(slotRequestMaxInterval) + .setMainThreadExecutor(componentMainThreadExecutor) .build(); - final ResourceCounter resourceRequirements = ResourceCounter.withResource(RESOURCE_PROFILE, 2); slotPool.increaseResourceRequirementsBy(resourceRequirements); + slotPool.tryWaitSlotRequestIsDone(); + SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE); SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE); @@ -154,8 +167,9 @@ class BlocklistDeclarativeSlotPoolTest { BlocklistDeclarativeSlotPoolBuilder.builder() .setBlockedTaskManagerChecker( isBlocked ? taskManager.getResourceID()::equals : ignore -> false) + .setSlotRequestMaxInterval(Duration.ZERO) + .setMainThreadExecutor(componentMainThreadExecutor) .build(); - final int numberSlots = 10; final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements( @@ -195,8 +209,9 @@ class BlocklistDeclarativeSlotPoolTest { final BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder() .setBlockedTaskManagerChecker(blockedTaskManagers::contains) + .setSlotRequestMaxInterval(Duration.ZERO) + .setMainThreadExecutor(componentMainThreadExecutor) .build(); - SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE); SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE); @@ -222,12 +237,12 @@ class BlocklistDeclarativeSlotPoolTest { assertThat(acceptedOffers).containsExactly(slot1); } - @Test + @TestTemplate void testFreeReservedSlotsOnBlockedTaskManager() throws Exception { testFreeReservedSlots(true); } - @Test + @TestTemplate void testFreeReservedSlotsOnUnblockedTaskManager() throws Exception { testFreeReservedSlots(false); } @@ -244,6 +259,8 @@ class BlocklistDeclarativeSlotPoolTest { final BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder() .setBlockedTaskManagerChecker(blockedTaskManagers::contains) + .setSlotRequestMaxInterval(slotRequestMaxInterval) + .setMainThreadExecutor(componentMainThreadExecutor) .build(); slotPool.registerNewSlotsListener(notifyNewSlots); @@ -292,6 +309,8 @@ class BlocklistDeclarativeSlotPoolTest { private static class BlocklistDeclarativeSlotPoolBuilder { private BlockedTaskManagerChecker blockedTaskManagerChecker = resourceID -> false; + private Duration slotRequestMaxInterval = Duration.ZERO; + private ComponentMainThreadExecutor mainThreadExecutor = forMainThread(); public BlocklistDeclarativeSlotPoolBuilder setBlockedTaskManagerChecker( BlockedTaskManagerChecker blockedTaskManagerChecker) { @@ -299,6 +318,18 @@ class BlocklistDeclarativeSlotPoolTest { return this; } + public BlocklistDeclarativeSlotPoolBuilder setSlotRequestMaxInterval( + Duration slotRequestMaxInterval) { + this.slotRequestMaxInterval = slotRequestMaxInterval; + return this; + } + + public BlocklistDeclarativeSlotPoolBuilder setMainThreadExecutor( + ComponentMainThreadExecutor mainThreadExecutor) { + this.mainThreadExecutor = mainThreadExecutor; + return this; + } + public BlocklistDeclarativeSlotPool build() { return new BlocklistDeclarativeSlotPool( new JobID(), @@ -306,7 +337,9 @@ class BlocklistDeclarativeSlotPoolTest { ignored -> {}, blockedTaskManagerChecker, Duration.ofSeconds(20), - Duration.ofSeconds(20)); + Duration.ofSeconds(20), + slotRequestMaxInterval, + mainThreadExecutor); } public static BlocklistDeclarativeSlotPoolBuilder builder() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java index f3a31c4d552..6b267dafd6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java @@ -33,6 +33,9 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_MAX_INTERVAL; +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; + /** Builder for a {@link DeclarativeSlotPoolBridge}. */ public class DeclarativeSlotPoolBridgeBuilder { @@ -40,6 +43,8 @@ public class DeclarativeSlotPoolBridgeBuilder { private Duration batchSlotTimeout = JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue(); private Duration idleSlotTimeout = TestingUtils.infiniteTime().toDuration(); private Clock clock = SystemClock.getInstance(); + private Duration slotRequestMaxInterval = SLOT_REQUEST_MAX_INTERVAL.defaultValue(); + private ComponentMainThreadExecutor mainThreadExecutor = forMainThread(); @Nullable private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); @@ -63,6 +68,12 @@ public class DeclarativeSlotPoolBridgeBuilder { return this; } + public DeclarativeSlotPoolBridgeBuilder setSlotRequestMaxInterval( + Duration slotRequestMaxInterval) { + this.slotRequestMaxInterval = slotRequestMaxInterval; + return this; + } + public DeclarativeSlotPoolBridgeBuilder setClock(Clock clock) { this.clock = clock; return this; @@ -79,6 +90,12 @@ public class DeclarativeSlotPoolBridgeBuilder { return this; } + public DeclarativeSlotPoolBridgeBuilder setMainThreadExecutor( + ComponentMainThreadExecutor mainThreadExecutor) { + this.mainThreadExecutor = mainThreadExecutor; + return this; + } + public DeclarativeSlotPoolBridge build() { return new DeclarativeSlotPoolBridge( jobId, @@ -87,19 +104,30 @@ public class DeclarativeSlotPoolBridgeBuilder { TestingUtils.infiniteTime().toDuration(), idleSlotTimeout, batchSlotTimeout, - requestSlotMatchingStrategy); + requestSlotMatchingStrategy, + slotRequestMaxInterval, + mainThreadExecutor); } - public DeclarativeSlotPoolBridge buildAndStart( - ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception { - final DeclarativeSlotPoolBridge slotPool = build(); - - slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor); + public DeclarativeSlotPoolBridge buildAndStart() throws Exception { + final DeclarativeSlotPoolBridge slotPool = + new DeclarativeSlotPoolBridge( + jobId, + new DefaultDeclarativeSlotPoolFactory(), + clock, + TestingUtils.infiniteTime().toDuration(), + idleSlotTimeout, + batchSlotTimeout, + requestSlotMatchingStrategy, + slotRequestMaxInterval, + mainThreadExecutor); + + slotPool.start(JobMasterId.generate(), "foobar"); if (resourceManagerGateway != null) { CompletableFuture.runAsync( () -> slotPool.connectToResourceManager(resourceManagerGateway), - componentMainThreadExecutor) + mainThreadExecutor) .join(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java index 90ee4b2fb8f..af691d21477 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SlotRequestId; @@ -34,12 +33,14 @@ import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Set; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; import static org.assertj.core.api.Assertions.assertThat; class DeclarativeSlotPoolBridgePreferredAllocationsTest { @@ -54,12 +55,11 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest { TestingUtils.infiniteTime().toDuration(), TestingUtils.infiniteTime().toDuration(), TestingUtils.infiniteTime().toDuration(), - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE); + PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, + Duration.ZERO, + forMainThread()); - declarativeSlotPoolBridge.start( - JobMasterId.generate(), - "localhost", - ComponentMainThreadExecutorServiceAdapter.forMainThread()); + declarativeSlotPoolBridge.start(JobMasterId.generate(), "localhost"); final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java index 6c8d5701952..147ef4a9885 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; @@ -125,7 +124,7 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest { private SlotPool createAndSetUpSlotPool() throws Exception { return new DeclarativeSlotPoolBridgeBuilder() .setResourceManagerGateway(resourceManagerGateway) - .buildAndStart(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + .buildAndStart(); } private void connectToResourceManager(SlotPool slotPool) { @@ -135,6 +134,6 @@ class DeclarativeSlotPoolBridgeRequestCompletionTest { private SlotPool createAndSetUpSlotPoolWithoutResourceManager() throws Exception { return new DeclarativeSlotPoolBridgeBuilder() .setResourceManagerGateway(null) - .buildAndStart(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + .buildAndStart(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java index 33774bac6ca..7aa18057536 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java @@ -23,56 +23,41 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; -import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; -import java.io.IOException; import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; -import static org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest.createAllocatedSlot; -import static org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge; import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link DeclarativeSlotPoolBridge}. */ @ExtendWith(ParameterizedTestExtension.class) -class DeclarativeSlotPoolBridgeResourceDeclarationTest { - - private static final JobMasterId jobMasterId = JobMasterId.generate(); - private final ComponentMainThreadExecutor mainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forMainThread(); - - @Parameter private RequestSlotMatchingStrategy requestSlotMatchingStrategy; +class DeclarativeSlotPoolBridgeResourceDeclarationTest + extends AbstractDeclarativeSlotPoolBridgeTest { private RequirementListener requirementListener; private DeclarativeSlotPoolBridge declarativeSlotPoolBridge; - @Parameters(name = "RequestSlotMatchingStrategy: {0}") - public static Collection<RequestSlotMatchingStrategy> data() throws IOException { - return Arrays.asList( - SimpleRequestSlotMatchingStrategy.INSTANCE, - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE); - } - @BeforeEach void setup() { requirementListener = new RequirementListener(); + constructDeclarativeSlotPoolBridge(componentMainThreadExecutor); + } + + private void constructDeclarativeSlotPoolBridge( + ComponentMainThreadExecutor mainThreadExecutor) { final TestingDeclarativeSlotPoolBuilder slotPoolBuilder = TestingDeclarativeSlotPool.builder() .setIncreaseResourceRequirementsByConsumer( @@ -93,7 +78,10 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest { new TestingDeclarativeSlotPoolFactory(slotPoolBuilder); declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, requestSlotMatchingStrategy); + declarativeSlotPoolFactory, + requestSlotMatchingStrategy, + slotRequestMaxInterval, + mainThreadExecutor); } @AfterEach @@ -105,7 +93,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest { @TestTemplate void testRequirementsIncreasedOnNewAllocation() throws Exception { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); // requesting the allocation of a new slot should increase the requirements declarativeSlotPoolBridge.requestNewAllocatedSlot( @@ -122,7 +110,8 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest { ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( scheduledExecutorService); - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + constructDeclarativeSlotPoolBridge(mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); // requesting the allocation of a new slot increases the requirements final CompletableFuture<PhysicalSlot> allocationFuture = @@ -156,7 +145,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest { @TestTemplate void testRequirementsUnchangedOnNewSlotsNotification() throws Exception { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); // notifications about new slots should not affect requirements final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID()); @@ -167,7 +156,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest { @TestTemplate void testRequirementsIncreasedOnSlotReservation() throws Exception { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID()); declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot)); @@ -182,7 +171,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest { @TestTemplate void testRequirementsDecreasedOnSlotFreeing() throws Exception { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID()); declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot)); @@ -200,7 +189,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest { @TestTemplate void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID()); declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java index 2d97dc3b58b..28c91a46248 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java @@ -18,36 +18,23 @@ package org.apache.flink.runtime.jobmaster.slotpool; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; -import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; -import javax.annotation.Nonnull; - -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -60,21 +47,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link DeclarativeSlotPoolBridge}. */ @ExtendWith(ParameterizedTestExtension.class) -class DeclarativeSlotPoolBridgeTest { - - private static final Duration rpcTimeout = Duration.ofSeconds(20); - private static final JobID jobId = new JobID(); - private static final JobMasterId jobMasterId = JobMasterId.generate(); - private final ComponentMainThreadExecutor mainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forMainThread(); - @Parameter private RequestSlotMatchingStrategy requestSlotMatchingStrategy; - - @Parameters(name = "RequestSlotMatchingStrategy: {0}") - public static Collection<RequestSlotMatchingStrategy> data() throws IOException { - return Arrays.asList( - SimpleRequestSlotMatchingStrategy.INSTANCE, - PreferredAllocationRequestSlotMatchingStrategy.INSTANCE); - } +class DeclarativeSlotPoolBridgeTest extends AbstractDeclarativeSlotPoolBridgeTest { @TestTemplate void testSlotOffer() throws Exception { @@ -86,9 +59,11 @@ class DeclarativeSlotPoolBridgeTest { new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, requestSlotMatchingStrategy)) { + declarativeSlotPoolFactory, + requestSlotMatchingStrategy, + slotRequestMaxInterval)) { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); CompletableFuture<PhysicalSlot> slotAllocationFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot( @@ -108,9 +83,11 @@ class DeclarativeSlotPoolBridgeTest { new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, requestSlotMatchingStrategy)) { + declarativeSlotPoolFactory, + requestSlotMatchingStrategy, + slotRequestMaxInterval)) { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); CompletableFuture<PhysicalSlot> slotAllocationFuture = CompletableFuture.supplyAsync( @@ -119,10 +96,10 @@ class DeclarativeSlotPoolBridgeTest { slotRequestId, ResourceProfile.UNKNOWN, Time.minutes(5)), - mainThreadExecutor) + componentMainThreadExecutor) .get(); - mainThreadExecutor.execute( + componentMainThreadExecutor.execute( () -> declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable( Collections.emptyList())); @@ -154,8 +131,10 @@ class DeclarativeSlotPoolBridgeTest { new TestingDeclarativeSlotPoolFactory(builder); try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, requestSlotMatchingStrategy)) { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolFactory, + requestSlotMatchingStrategy, + slotRequestMaxInterval)) { + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); final SlotRequestId slotRequestId = new SlotRequestId(); @@ -171,9 +150,11 @@ class DeclarativeSlotPoolBridgeTest { void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception { try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( - new DefaultDeclarativeSlotPoolFactory(), requestSlotMatchingStrategy)) { + new DefaultDeclarativeSlotPoolFactory(), + requestSlotMatchingStrategy, + slotRequestMaxInterval)) { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); final List<SlotRequestId> slotRequestIds = Arrays.asList(new SlotRequestId(), new SlotRequestId()); @@ -186,7 +167,7 @@ class DeclarativeSlotPoolBridgeTest { declarativeSlotPoolBridge.requestNewAllocatedSlot( slotRequestId, ResourceProfile.UNKNOWN, - Time.fromDuration(rpcTimeout)); + Time.fromDuration(RPC_TIMEOUT)); slotFuture.whenComplete( (physicalSlot, throwable) -> { if (throwable != null) { @@ -210,15 +191,17 @@ class DeclarativeSlotPoolBridgeTest { void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception { try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( - new DefaultDeclarativeSlotPoolFactory(), requestSlotMatchingStrategy)) { + new DefaultDeclarativeSlotPoolFactory(), + requestSlotMatchingStrategy, + slotRequestMaxInterval)) { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); final CompletableFuture<PhysicalSlot> slotFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot( new SlotRequestId(), ResourceProfile.UNKNOWN, - Time.fromDuration(rpcTimeout)); + Time.fromDuration(RPC_TIMEOUT)); final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); @@ -251,8 +234,10 @@ class DeclarativeSlotPoolBridgeTest { try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge( - declarativeSlotPoolFactory, requestSlotMatchingStrategy)) { - declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor); + declarativeSlotPoolFactory, + requestSlotMatchingStrategy, + slotRequestMaxInterval)) { + declarativeSlotPoolBridge.start(JOB_MASTER_ID, "localhost"); declarativeSlotPoolBridge.setIsJobRestarting(true); @@ -270,29 +255,4 @@ class DeclarativeSlotPoolBridgeTest { registerSlotsCalledFuture.join(); } } - - @Nonnull - static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge( - DeclarativeSlotPoolFactory declarativeSlotPoolFactory, - RequestSlotMatchingStrategy requestSlotMatchingStrategy) { - return new DeclarativeSlotPoolBridge( - jobId, - declarativeSlotPoolFactory, - SystemClock.getInstance(), - rpcTimeout, - Duration.ofSeconds(20), - Duration.ofSeconds(20), - requestSlotMatchingStrategy); - } - - static PhysicalSlot createAllocatedSlot(AllocationID allocationID) { - return new AllocatedSlot( - allocationID, - new LocalTaskManagerLocation(), - 0, - ResourceProfile.ANY, - new RpcTaskManagerGateway( - new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), - JobMasterId.generate())); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java index 91d11eb5d04..7542f045f4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; import org.apache.flink.runtime.jobmaster.JobMasterId; @@ -58,6 +57,7 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link DeclarativeSlotPoolService}. */ @@ -65,8 +65,7 @@ class DeclarativeSlotPoolServiceTest { private static final JobID jobId = new JobID(); private static final JobMasterId jobMasterId = JobMasterId.generate(); - private final ComponentMainThreadExecutor mainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forMainThread(); + private final ComponentMainThreadExecutor mainThreadExecutor = forMainThread(); private static final String address = "localhost"; @Test @@ -360,9 +359,11 @@ class DeclarativeSlotPoolServiceTest { declarativeSlotPoolFactory, SystemClock.getInstance(), Duration.ofSeconds(20L), - Duration.ofSeconds(20L)); + Duration.ofSeconds(20L), + Duration.ZERO, + mainThreadExecutor); - declarativeSlotPoolService.start(jobMasterId, address, mainThreadExecutor); + declarativeSlotPoolService.start(jobMasterId, address); return declarativeSlotPoolService; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java index ea177ff6f85..f976eea5998 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java @@ -18,12 +18,16 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.slots.ResourceRequirement; import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; +import static org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_MAX_INTERVAL; +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; + /** Builder for {@link DefaultDeclarativeSlotPool}. */ final class DefaultDeclarativeSlotPoolBuilder { @@ -32,6 +36,8 @@ final class DefaultDeclarativeSlotPoolBuilder { ignored -> {}; private Duration idleSlotTimeout = Duration.ofSeconds(20); private Duration rpcTimeout = Duration.ofSeconds(20); + private Duration slotRequestMaxInterval = SLOT_REQUEST_MAX_INTERVAL.defaultValue(); + private ComponentMainThreadExecutor componentMainThreadExecutor = forMainThread(); public DefaultDeclarativeSlotPoolBuilder setAllocatedSlotPool( AllocatedSlotPool allocatedSlotPool) { @@ -50,13 +56,27 @@ final class DefaultDeclarativeSlotPoolBuilder { return this; } + public DefaultDeclarativeSlotPoolBuilder setSlotRequestMaxInterval( + Duration slotRequestMaxInterval) { + this.slotRequestMaxInterval = slotRequestMaxInterval; + return this; + } + + public DefaultDeclarativeSlotPoolBuilder setComponentMainThreadExecutor( + ComponentMainThreadExecutor componentMainThreadExecutor) { + this.componentMainThreadExecutor = componentMainThreadExecutor; + return this; + } + public DefaultDeclarativeSlotPool build() { return new DefaultDeclarativeSlotPool( new JobID(), allocatedSlotPool, notifyNewResourceRequirements, idleSlotTimeout, - rpcTimeout); + rpcTimeout, + slotRequestMaxInterval, + componentMainThreadExecutor); } public static DefaultDeclarativeSlotPoolBuilder builder() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index 0244cbadf18..af3e9b66695 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -30,12 +30,15 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.util.FlinkException; import org.apache.flink.util.function.QuadConsumer; import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -58,45 +61,66 @@ import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.crea import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link DefaultDeclarativeSlotPool}. */ -class DefaultDeclarativeSlotPoolTest { +@ExtendWith(ParameterizedTestExtension.class) +class DefaultDeclarativeSlotPoolTest extends DefaultDeclarativeSlotPoolTestBase { private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7).build(); private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build(); - @Test + @TestTemplate void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); - final DeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + final DefaultDeclarativeSlotPool slotPool = + createDefaultDeclarativeSlotPool(requirementsListener, slotRequestMaxInterval); final ResourceCounter increment1 = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); final ResourceCounter increment2 = createResourceRequirements(); slotPool.increaseResourceRequirementsBy(increment1); slotPool.increaseResourceRequirementsBy(increment2); - assertThat(requirementsListener.takeResourceRequirements()) - .isEqualTo(toResourceRequirements(increment1)); + slotPool.tryWaitSlotRequestIsDone(); - final ResourceCounter totalResources = increment1.add(increment2); assertThat(requirementsListener.takeResourceRequirements()) - .isEqualTo(toResourceRequirements(totalResources)); - assertThat(requirementsListener.hasNextResourceRequirements()).isFalse(); + .isEqualTo( + toResourceRequirements(getExpectedResourceCounter(increment1, increment2))); + + if (slotRequestMaxInterval.toMillis() <= 0L) { + final ResourceCounter totalResources = increment1.add(increment2); + assertThat(requirementsListener.takeResourceRequirements()) + .isEqualTo(toResourceRequirements(totalResources)); + assertThat(requirementsListener.hasNextResourceRequirements()).isFalse(); + } } - @Test + private ResourceCounter getExpectedResourceCounter(ResourceCounter... resourceCounters) { + ResourceCounter result = ResourceCounter.empty(); + if (slotRequestMaxInterval.toMillis() > 0L) { + for (ResourceCounter resourceCounter : resourceCounters) { + result = result.add(resourceCounter); + } + return result; + } else { + return resourceCounters[0]; + } + } + + @TestTemplate void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); final DefaultDeclarativeSlotPool slotPool = - createDefaultDeclarativeSlotPool(requirementsListener); + createDefaultDeclarativeSlotPool(requirementsListener, slotRequestMaxInterval); final ResourceCounter increment = ResourceCounter.withResource(RESOURCE_PROFILE_1, 3); slotPool.increaseResourceRequirementsBy(increment); + slotPool.tryWaitSlotRequestIsDone(); + requirementsListener.takeResourceRequirements(); final ResourceCounter decrement = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); @@ -108,10 +132,9 @@ class DefaultDeclarativeSlotPoolTest { assertThat(requirementsListener.hasNextResourceRequirements()).isFalse(); } - @Test + @TestTemplate void testGetResourceRequirements() { - final DefaultDeclarativeSlotPool slotPool = - DefaultDeclarativeSlotPoolBuilder.builder().build(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); assertThat(slotPool.getResourceRequirements()).isEmpty(); @@ -119,11 +142,13 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy(resourceRequirements); + slotPool.tryWaitSlotRequestIsDone(); + assertThat(slotPool.getResourceRequirements()) .isEqualTo(toResourceRequirements(resourceRequirements)); } - @Test + @TestTemplate void testOfferSlots() throws InterruptedException { final NewSlotsService notifyNewSlots = new NewSlotsService(); final DefaultDeclarativeSlotPool slotPool = @@ -133,6 +158,8 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy(resourceRequirements); + slotPool.tryWaitSlotRequestIsDone(); + Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); @@ -177,7 +204,7 @@ class DefaultDeclarativeSlotPoolTest { }); } - @Test + @TestTemplate void testDuplicateSlotOfferings() throws InterruptedException { final NewSlotsService notifyNewSlots = new NewSlotsService(); final DefaultDeclarativeSlotPool slotPool = @@ -187,6 +214,8 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy(resourceRequirements); + slotPool.tryWaitSlotRequestIsDone(); + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); @@ -202,7 +231,7 @@ class DefaultDeclarativeSlotPoolTest { assertThat(notifyNewSlots.hasNextNewSlots()).isFalse(); } - @Test + @TestTemplate void testOfferingTooManySlotsWillRejectSuperfluousSlots() { final NewSlotsService notifyNewSlots = new NewSlotsService(); final DefaultDeclarativeSlotPool slotPool = @@ -212,6 +241,8 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy(resourceRequirements); + slotPool.tryWaitSlotRequestIsDone(); + final ResourceCounter increasedRequirements = resourceRequirements.add(RESOURCE_PROFILE_1, 2); @@ -235,13 +266,13 @@ class DefaultDeclarativeSlotPoolTest { .isEqualTo((long) resourceCount.getValue())); } - @Test + @TestTemplate void testReleaseSlotsRemovesSlots() throws InterruptedException { final NewResourceRequirementsService notifyNewResourceRequirements = new NewResourceRequirementsService(); final DefaultDeclarativeSlotPool slotPool = - createDefaultDeclarativeSlotPool(notifyNewResourceRequirements); - + createDefaultDeclarativeSlotPool( + notifyNewResourceRequirements, slotRequestMaxInterval); final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); increaseRequirementsAndOfferSlotsToSlotPool( slotPool, createResourceRequirements(), taskManagerLocation); @@ -253,11 +284,9 @@ class DefaultDeclarativeSlotPoolTest { assertThat(slotPool.getAllSlotsInformation()).isEmpty(); } - @Test + @TestTemplate void testReleaseSlotsReturnsSlot() { - final DefaultDeclarativeSlotPool slotPool = - DefaultDeclarativeSlotPoolBuilder.builder().build(); - + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); final ResourceCounter resourceRequirements = createResourceRequirements(); final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); @@ -286,7 +315,7 @@ class DefaultDeclarativeSlotPoolTest { .collect(Collectors.toList())); } - @Test + @TestTemplate void testReleaseSlotsOnlyReturnsFulfilledRequirementsOfReservedSlots() { withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles( (slotPool, freeSlot, slotToReserve, taskManagerLocation) -> { @@ -311,7 +340,7 @@ class DefaultDeclarativeSlotPoolTest { }); } - @Test + @TestTemplate void testReleaseSlotOnlyReturnsFulfilledRequirementsOfReservedSlots() { withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles( (slotPool, freeSlot, slotToReserve, ignored) -> { @@ -336,11 +365,10 @@ class DefaultDeclarativeSlotPoolTest { }); } - private static void withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles( + private void withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles( QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation> test) { - final DefaultDeclarativeSlotPool slotPool = - DefaultDeclarativeSlotPoolBuilder.builder().build(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); final ResourceCounter resourceRequirements = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1).add(RESOURCE_PROFILE_2, 1); @@ -366,7 +394,7 @@ class DefaultDeclarativeSlotPoolTest { test.accept(slotPool, slot1, slot2, taskManagerLocation); } - @Test + @TestTemplate void testReleaseSlotDecreasesFulfilledResourceRequirements() throws InterruptedException { final NewSlotsService notifyNewSlots = new NewSlotsService(); final DefaultDeclarativeSlotPool slotPool = @@ -387,7 +415,7 @@ class DefaultDeclarativeSlotPoolTest { .isEqualTo(finalResourceRequirements); } - @Test + @TestTemplate void testReleaseSlotReturnsSlot() throws InterruptedException { final NewSlotsService notifyNewSlots = new NewSlotsService(); final DefaultDeclarativeSlotPool slotPool = @@ -417,13 +445,14 @@ class DefaultDeclarativeSlotPoolTest { assertThat(freedSlot).isEqualTo(physicalSlot.getAllocationId()); } - @Test + @TestTemplate void testReturnIdleSlotsAfterTimeout() { final Duration idleSlotTimeout = Duration.ofSeconds(10); final long offerTime = 0; final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() .setIdleSlotTimeout(idleSlotTimeout) + .setComponentMainThreadExecutor(componentMainThreadExecutor) .build(); final ResourceCounter resourceRequirements = createResourceRequirements(); @@ -461,13 +490,14 @@ class DefaultDeclarativeSlotPoolTest { assertThat(slotPool.getAllSlotsInformation()).isEmpty(); } - @Test + @TestTemplate void testOnlyReturnExcessIdleSlots() { final Duration idleSlotTimeout = Duration.ofSeconds(10); final long offerTime = 0; final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() .setIdleSlotTimeout(idleSlotTimeout) + .setComponentMainThreadExecutor(componentMainThreadExecutor) .build(); final ResourceCounter resourceRequirements = createResourceRequirements(); @@ -475,6 +505,9 @@ class DefaultDeclarativeSlotPoolTest { createSlotOffersForResourceRequirements(resourceRequirements); slotPool.increaseResourceRequirementsBy(resourceRequirements); + + slotPool.tryWaitSlotRequestIsDone(); + final Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots(slotPool, slotOffers); @@ -489,7 +522,7 @@ class DefaultDeclarativeSlotPoolTest { assertThat(slotPool.getFulfilledResourceRequirements()).isEqualTo(requiredResources); } - @Test + @TestTemplate void testFreedSlotWillBeUsedToFulfillOutstandingResourceRequirementsOfSameProfile() throws InterruptedException { final NewSlotsService notifyNewSlots = new NewSlotsService(); @@ -553,9 +586,9 @@ class DefaultDeclarativeSlotPoolTest { .isEqualTo(RESOURCE_PROFILE_1)); } - @Test + @TestTemplate void testFreedSlotWillRemainAssignedToMatchedResourceProfile() { - final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); final ResourceProfile largeResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build(); @@ -564,6 +597,9 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy( ResourceCounter.withResource(largeResourceProfile, 1)); + + slotPool.tryWaitSlotRequestIsDone(); + SlotPoolTestUtils.offerSlots( slotPool, createSlotOffersForResourceRequirements( @@ -580,6 +616,9 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy( ResourceCounter.withResource(smallResourceProfile, 1)); + + slotPool.tryWaitSlotRequestIsDone(); + slotPool.decreaseResourceRequirementsBy( ResourceCounter.withResource(largeResourceProfile, 1)); @@ -598,9 +637,9 @@ class DefaultDeclarativeSlotPoolTest { .isEqualTo(0); } - @Test + @TestTemplate void testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequirements() { - final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); final ResourceProfile largeResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build(); @@ -608,6 +647,9 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy( ResourceCounter.withResource(largeResourceProfile, 1)); + + slotPool.tryWaitSlotRequestIsDone(); + SlotPoolTestUtils.offerSlots( slotPool, createSlotOffersForResourceRequirements( @@ -615,6 +657,8 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy( ResourceCounter.withResource(smallResourceProfile, 1)); + slotPool.tryWaitSlotRequestIsDone(); + final SlotInfo largeSlot = slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream() .filter(slot -> slot.getResourceProfile().equals(largeResourceProfile)) @@ -638,7 +682,7 @@ class DefaultDeclarativeSlotPoolTest { @Test void testSetResourceRequirementsForInitialResourceRequirements() { - final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); final ResourceCounter resourceRequirements = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); @@ -651,7 +695,7 @@ class DefaultDeclarativeSlotPoolTest { @Test void testSetResourceRequirementsOverwritesPreviousValue() { - final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); slotPool.setResourceRequirements(ResourceCounter.withResource(RESOURCE_PROFILE_1, 1)); @@ -663,9 +707,9 @@ class DefaultDeclarativeSlotPoolTest { .isEqualTo(toResourceRequirements(resourceRequirements)); } - @Test + @TestTemplate void testRegisterSlotsDoesNotAffectRequirements() { - final DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(); final ResourceProfile slotProfile = RESOURCE_PROFILE_1; final ResourceProfile requestedProfile = ResourceProfile.UNKNOWN; @@ -683,6 +727,9 @@ class DefaultDeclarativeSlotPoolTest { assertThat(slotPool.getResourceRequirements()).isEmpty(); slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource(requestedProfile, 1)); + + slotPool.tryWaitSlotRequestIsDone(); + slotPool.reserveFreeSlot(allocationId, requestedProfile); slotPool.freeReservedSlot(allocationId, null, 1L); slotPool.decreaseResourceRequirementsBy(ResourceCounter.withResource(requestedProfile, 1)); @@ -711,15 +758,17 @@ class DefaultDeclarativeSlotPoolTest { } @Nonnull - private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool( - NewResourceRequirementsService requirementsListener) { + private DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool( + NewResourceRequirementsService requirementsListener, Duration slotRequestMaxInterval) { return DefaultDeclarativeSlotPoolBuilder.builder() + .setSlotRequestMaxInterval(slotRequestMaxInterval) .setNotifyNewResourceRequirements(requirementsListener) + .setComponentMainThreadExecutor(componentMainThreadExecutor) .build(); } @Nonnull - private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener( + private DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener( DeclarativeSlotPool.NewSlotsListener newSlotsListener) { final DefaultDeclarativeSlotPool declarativeSlotPool = createDefaultDeclarativeSlotPool(); @@ -728,8 +777,10 @@ class DefaultDeclarativeSlotPoolTest { } @Nonnull - private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool() { - return DefaultDeclarativeSlotPoolBuilder.builder().build(); + private DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool() { + return DefaultDeclarativeSlotPoolBuilder.builder() + .setComponentMainThreadExecutor(componentMainThreadExecutor) + .build(); } @Nonnull @@ -752,6 +803,8 @@ class DefaultDeclarativeSlotPoolTest { slotPool.increaseResourceRequirementsBy(resourceRequirements); + slotPool.tryWaitSlotRequestIsDone(); + return slotPool.offerSlots( slotOffers, taskManagerLocation == null ? new LocalTaskManagerLocation() : taskManagerLocation, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java index e69de29bb2d..4300dabdc6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.assertj.core.util.Lists; + +import java.time.Duration; +import java.util.List; + +/** Tests base class for the {@link DefaultDeclarativeSlotPool}. */ +abstract class DefaultDeclarativeSlotPoolTestBase { + + // Enabled the slot batch request if the interval is greater than Duration.ZERO, disabled + // else. + @Parameter protected Duration slotRequestMaxInterval; + + protected ComponentMainThreadExecutor componentMainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forMainThread(); + + @Parameters(name = "slotRequestMaxInterval: {0}") + static List<Duration> getParametersCouples() { + return Lists.newArrayList(Duration.ofMillis(50L), Duration.ZERO); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java index d717d61228e..bf4349e7f70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java @@ -63,7 +63,10 @@ public class PhysicalSlotProviderExtension implements BeforeEachCallback, AfterE this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( singleThreadScheduledExecutorService); - slotPool = new DeclarativeSlotPoolBridgeBuilder().buildAndStart(mainThreadExecutor); + slotPool = + new DeclarativeSlotPoolBridgeBuilder() + .setMainThreadExecutor(mainThreadExecutor) + .buildAndStart(); physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java index ae2d2e48ad6..e7117b3a23d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java @@ -66,7 +66,9 @@ class PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest { throws Exception { DeclarativeSlotPoolBridge slotPool = new DeclarativeSlotPoolBridgeBuilder() - .buildAndStart(physicalSlotProviderExtension.getMainThreadExecutor()); + .setMainThreadExecutor( + physicalSlotProviderExtension.getMainThreadExecutor()) + .buildAndStart(); assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled()).isTrue(); final PhysicalSlotProvider slotProvider = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java index 0c9a808eaa6..848df3cc044 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java @@ -107,7 +107,9 @@ class PhysicalSlotProviderImplWithSpreadOutStrategyTest { throws Exception { DeclarativeSlotPoolBridge slotPool = new DeclarativeSlotPoolBridgeBuilder() - .buildAndStart(physicalSlotProviderExtension.getMainThreadExecutor()); + .setMainThreadExecutor( + physicalSlotProviderExtension.getMainThreadExecutor()) + .buildAndStart(); assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled()).isTrue(); final PhysicalSlotProvider slotProvider = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java index 208e71aa5fb..524fc9064c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java @@ -238,7 +238,8 @@ class SlotPoolBatchSlotRequestTest { return new DeclarativeSlotPoolBridgeBuilder() .setResourceManagerGateway(resourceManagerGateway) .setBatchSlotTimeout(batchSlotTimeout.toDuration()) - .buildAndStart(componentMainThreadExecutor); + .setMainThreadExecutor(componentMainThreadExecutor) + .buildAndStart(); } private DeclarativeSlotPoolBridge createAndSetUpSlotPool( @@ -252,6 +253,7 @@ class SlotPoolBatchSlotRequestTest { .setResourceManagerGateway(resourceManagerGateway) .setBatchSlotTimeout(batchSlotTimeout.toDuration()) .setClock(clock) - .buildAndStart(componentMainThreadExecutor); + .setMainThreadExecutor(componentMainThreadExecutor) + .buildAndStart(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java index a746a216348..a000bd251f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java @@ -116,13 +116,15 @@ class SlotPoolInteractionsTest { private DeclarativeSlotPoolBridge createAndSetUpSlotPool() throws Exception { return new DeclarativeSlotPoolBridgeBuilder() - .buildAndStart(testMainThreadExecutor.getMainThreadExecutor()); + .setMainThreadExecutor(testMainThreadExecutor.getMainThreadExecutor()) + .buildAndStart(); } private DeclarativeSlotPoolBridge createAndSetUpSlotPoolWithoutResourceManager() throws Exception { return new DeclarativeSlotPoolBridgeBuilder() .setResourceManagerGateway(null) - .buildAndStart(testMainThreadExecutor.getMainThreadExecutor()); + .setMainThreadExecutor(testMainThreadExecutor.getMainThreadExecutor()) + .buildAndStart(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java index 3ca17396654..69527b8b602 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.slots.ResourceRequirement; +import javax.annotation.Nonnull; + import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; @@ -39,7 +42,9 @@ final class TestingDeclarativeSlotPoolFactory implements DeclarativeSlotPoolFact JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Duration idleSlotTimeout, - Duration rpcTimeout) { + Duration rpcTimeout, + Duration slotRequestMaxInterval, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { return builder.build(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java index 5d10289686b..9cafb578bc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java @@ -21,14 +21,12 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.function.TriConsumer; import org.apache.flink.util.function.TriFunction; import javax.annotation.Nullable; @@ -44,9 +42,7 @@ public class TestingSlotPoolService implements SlotPoolService { private final JobID jobId; - private final TriConsumer< - ? super JobMasterId, ? super String, ? super ComponentMainThreadExecutor> - startConsumer; + private final BiFunction<? super JobMasterId, ? super String, Void> startConsumer; private final Runnable closeRunnable; @@ -78,8 +74,7 @@ public class TestingSlotPoolService implements SlotPoolService { public TestingSlotPoolService( JobID jobId, - TriConsumer<? super JobMasterId, ? super String, ? super ComponentMainThreadExecutor> - startConsumer, + BiFunction<? super JobMasterId, ? super String, Void> startConsumer, Runnable closeRunnable, TriFunction< ? super TaskManagerLocation, @@ -112,10 +107,8 @@ public class TestingSlotPoolService implements SlotPoolService { } @Override - public void start( - JobMasterId jobMasterId, String address, ComponentMainThreadExecutor mainThreadExecutor) - throws Exception { - startConsumer.accept(jobMasterId, address, mainThreadExecutor); + public void start(JobMasterId jobMasterId, String address) throws Exception { + startConsumer.apply(jobMasterId, address); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java index aed1790d597..2a848b82abe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolServiceBuilder.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.function.TriConsumer; import org.apache.flink.util.function.TriFunction; import javax.annotation.Nonnull; @@ -43,8 +42,8 @@ import java.util.function.Function; /** {@link SlotPoolServiceFactory} which creates a {@link TestingSlotPoolService}. */ public class TestingSlotPoolServiceBuilder implements SlotPoolServiceFactory { - private TriConsumer<? super JobMasterId, ? super String, ? super ComponentMainThreadExecutor> - startConsumer = (ignoredA, ignoredB, ignoredC) -> {}; + private BiFunction<? super JobMasterId, ? super String, Void> startConsumer = + (ignoredA, ignoredB) -> null; private Runnable closeRunnable = () -> {}; private TriFunction< ? super TaskManagerLocation, @@ -71,7 +70,9 @@ public class TestingSlotPoolServiceBuilder implements SlotPoolServiceFactory { @Nonnull @Override public SlotPoolService createSlotPoolService( - @Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { + @Nonnull JobID jobId, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { return new TestingSlotPoolService( jobId, startConsumer, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java index f7a22ac3b65..0693f6b2d89 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java @@ -186,7 +186,8 @@ class DefaultSchedulerBatchSchedulingTest { throws Exception { return new DeclarativeSlotPoolBridgeBuilder() .setBatchSlotTimeout(batchSlotTimeout.toDuration()) - .buildAndStart(mainThreadExecutor); + .setMainThreadExecutor(mainThreadExecutor) + .buildAndStart(); } private JobGraph createBatchJobGraph(int parallelism) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 531d757948b..43384c7f2bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -1600,7 +1600,8 @@ public class DefaultSchedulerTest { final SlotPool slotPool = new DeclarativeSlotPoolBridgeBuilder() .setBatchSlotTimeout(slotTimeout) - .buildAndStart(singleThreadMainThreadExecutor); + .setMainThreadExecutor(singleThreadMainThreadExecutor) + .buildAndStart(); final PhysicalSlotProvider slotProvider = new PhysicalSlotProviderImpl( LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index b3207436a47..433c6d66a90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -118,7 +118,9 @@ public class AdaptiveSchedulerBuilder { new DefaultAllocatedSlotPool(), ignored -> {}, DEFAULT_TIMEOUT, - rpcTimeout); + rpcTimeout, + Duration.ZERO, + mainThreadExecutor); this.executorService = executorService; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index f6e7246687b..2d035775f63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -151,6 +151,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph; @@ -536,7 +537,9 @@ public class AdaptiveSchedulerTest { new DefaultAllocatedSlotPool(), ignored -> {}, Duration.ofMinutes(10), - Duration.ofMinutes(10)); + Duration.ofMinutes(10), + Duration.ZERO, + mainThreadExecutor); final Configuration configuration = createConfigurationWithNoTimeouts(); configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1); @@ -2312,7 +2315,9 @@ public class AdaptiveSchedulerTest { new DefaultAllocatedSlotPool(), ignored -> {}, idleSlotTimeout, - DEFAULT_TIMEOUT); + DEFAULT_TIMEOUT, + Duration.ZERO, + forMainThread()); } private static JobGraph createJobGraph() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java index 425938dceaf..835880ced4f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java @@ -66,7 +66,10 @@ public class SchedulerEndToEndBenchmarkBase extends SchedulerBenchmarkBase { final List<JobVertex> jobVertices = createDefaultJobVertices(jobConfiguration); jobGraph = createJobGraph(jobVertices, jobConfiguration); - slotPool = new DeclarativeSlotPoolBridgeBuilder().buildAndStart(mainThreadExecutor); + slotPool = + new DeclarativeSlotPoolBridgeBuilder() + .setMainThreadExecutor(mainThreadExecutor) + .buildAndStart(); SlotSelectionStrategy slotSelectionStrategy = jobConfiguration.isEvenlySpreadOutSlots() ? LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
