This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 65fda6e15a1 [FLINK-27921][Runtime] Introduce the
checkResourceRequirementsWithDelay in DeclarativeSlotManager
65fda6e15a1 is described below
commit 65fda6e15a18eff8d81fbd60e8b4708527597c91
Author: Aitozi <[email protected]>
AuthorDate: Mon Jun 6 21:52:23 2022 +0800
[FLINK-27921][Runtime] Introduce the checkResourceRequirementsWithDelay in
DeclarativeSlotManager
This closes #19886.
---
.../configuration/ResourceManagerOptions.java | 8 ++++
.../ResourceManagerRuntimeServices.java | 7 +--
.../slotmanager/DeclarativeSlotManager.java | 56 +++++++++++++++++++---
.../slotmanager/FineGrainedSlotManager.java | 37 +++++++-------
.../slotmanager/SlotManagerConfiguration.java | 13 +++++
.../slotmanager/DeclarativeSlotManagerBuilder.java | 9 ++++
.../slotmanager/DeclarativeSlotManagerTest.java | 42 ++++++++++++++++
.../slotmanager/FineGrainedSlotManagerTest.java | 7 +--
.../FineGrainedSlotManagerTestBase.java | 8 ++--
.../SlotManagerConfigurationBuilder.java | 11 +++++
10 files changed, 162 insertions(+), 36 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 7c07e9148fd..6f8c9ccdb94 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -142,6 +142,14 @@ public class ResourceManagerOptions {
+ START_WORKER_MAX_FAILURE_RATE.key()
+ "') is reached.");
+ @Documentation.ExcludeFromDocumentation(
+ "This is an expert option, that we do not want to expose in the
documentation")
+ public static final ConfigOption<Duration> REQUIREMENTS_CHECK_DELAY =
+ ConfigOptions.key("slotmanager.requirement-check.delay")
+ .durationType()
+ .defaultValue(Duration.ofMillis(50))
+ .withDescription("The delay of the resource requirements
check.");
+
/**
* The timeout for a slot request to be discarded, in milliseconds.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
index d0d537daa69..98919a7eed4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.resourcemanager;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager;
@@ -36,9 +35,6 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
/** Container class for the {@link ResourceManager} services. */
public class ResourceManagerRuntimeServices {
- // We currently make the delay of requirements check a constant time. This
delay might be
- // configurable by user in the future.
- private static final long REQUIREMENTS_CHECK_DELAY_MS = 50L;
private final SlotManager slotManager;
private final JobLeaderIdService jobLeaderIdService;
@@ -93,8 +89,7 @@ public class ResourceManagerRuntimeServices {
new DefaultResourceAllocationStrategy(
SlotManagerUtils.generateTaskManagerTotalResourceProfile(
slotManagerConfiguration.getDefaultWorkerResourceSpec()),
- slotManagerConfiguration.getNumSlotsPerWorker()),
- Time.milliseconds(REQUIREMENTS_CHECK_DELAY_MS));
+ slotManagerConfiguration.getNumSlotsPerWorker()));
} else {
return new DeclarativeSlotManager(
scheduledExecutor,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 73f349feb82..f8d2130fac0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -56,6 +57,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -79,8 +81,14 @@ public class DeclarativeSlotManager implements SlotManager {
private final Map<JobID, String> jobMasterTargetAddresses = new
HashMap<>();
private final Map<SlotID, AllocationID> pendingSlotAllocations;
+ /** Delay of the requirement change check in the slot manager. */
+ private final Duration requirementsCheckDelay;
+
private boolean sendNotEnoughResourceNotifications = true;
+ /** Scheduled executor for timeouts. */
+ private final ScheduledExecutor scheduledExecutor;
+
/** ResourceManager's id. */
@Nullable private ResourceManagerId resourceManagerId;
@@ -90,6 +98,9 @@ public class DeclarativeSlotManager implements SlotManager {
/** Callbacks for resource (de-)allocations. */
@Nullable private ResourceActions resourceActions;
+ /** The future of the requirements delay check. */
+ @Nullable private CompletableFuture<Void> requirementsCheckFuture;
+
/** True iff the component has been started. */
private boolean started;
@@ -104,6 +115,8 @@ public class DeclarativeSlotManager implements SlotManager {
this.taskManagerRequestTimeout =
slotManagerConfiguration.getTaskManagerRequestTimeout();
this.slotManagerMetricGroup =
Preconditions.checkNotNull(slotManagerMetricGroup);
this.resourceTracker = Preconditions.checkNotNull(resourceTracker);
+ this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
+ this.requirementsCheckDelay =
slotManagerConfiguration.getRequirementCheckDelay();
pendingSlotAllocations = new HashMap<>(16);
@@ -162,7 +175,7 @@ public class DeclarativeSlotManager implements SlotManager {
sendNotEnoughResourceNotifications = failUnfulfillableRequest;
if (failUnfulfillableRequest) {
- checkResourceRequirements();
+ checkResourceRequirementsWithDelay();
}
}
@@ -275,7 +288,7 @@ public class DeclarativeSlotManager implements SlotManager {
}
resourceTracker.notifyResourceRequirements(
resourceRequirements.getJobId(),
resourceRequirements.getResourceRequirements());
- checkResourceRequirements();
+ checkResourceRequirementsWithDelay();
}
private void maybeReclaimInactiveSlots(JobID jobId) {
@@ -341,7 +354,7 @@ public class DeclarativeSlotManager implements SlotManager {
slotStatus.getJobID());
}
- checkResourceRequirements();
+ checkResourceRequirementsWithDelay();
return true;
}
}
@@ -355,7 +368,7 @@ public class DeclarativeSlotManager implements SlotManager {
if (taskExecutorManager.isTaskManagerRegistered(instanceId)) {
slotTracker.removeSlots(taskExecutorManager.getSlotsOf(instanceId));
taskExecutorManager.unregisterTaskExecutor(instanceId);
- checkResourceRequirements();
+ checkResourceRequirementsWithDelay();
return true;
} else {
@@ -382,7 +395,7 @@ public class DeclarativeSlotManager implements SlotManager {
if (taskExecutorManager.isTaskManagerRegistered(instanceId)) {
if (slotTracker.notifySlotStatus(slotReport)) {
- checkResourceRequirements();
+ checkResourceRequirementsWithDelay();
}
return true;
} else {
@@ -407,13 +420,39 @@ public class DeclarativeSlotManager implements
SlotManager {
LOG.debug("Freeing slot {}.", slotId);
slotTracker.notifyFree(slotId);
- checkResourceRequirements();
+ checkResourceRequirementsWithDelay();
}
//
---------------------------------------------------------------------------------------------
// Requirement matching
//
---------------------------------------------------------------------------------------------
+ /**
+ * Depending on the implementation of {@link ResourceAllocationStrategy},
checking resource
+ * requirements and potentially making a re-allocation can be heavy. In
order to cover more
+ * changes with each check, thus reduce the frequency of unnecessary
re-allocations, the checks
+ * are performed with a slight delay.
+ */
+ private void checkResourceRequirementsWithDelay() {
+ if (requirementsCheckDelay.toMillis() <= 0) {
+ checkResourceRequirements();
+ } else {
+ if (requirementsCheckFuture == null ||
requirementsCheckFuture.isDone()) {
+ requirementsCheckFuture = new CompletableFuture<>();
+ scheduledExecutor.schedule(
+ () ->
+ mainThreadExecutor.execute(
+ () -> {
+ checkResourceRequirements();
+
Preconditions.checkNotNull(requirementsCheckFuture)
+ .complete(null);
+ }),
+ requirementsCheckDelay.toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
/**
* Matches resource requirements against available resources. In a first
round requirements are
* matched against free slot, and any match results in a slot allocation.
The remaining
@@ -439,6 +478,9 @@ public class DeclarativeSlotManager implements SlotManager {
* absolute worst case, with J jobs, requiring R slots each with a unique
resource profile such
* each pair of these profiles is not matching, and S free/pending slots
that don't fulfill any
* requirement, then this method does a total of J*R*S resource profile
comparisons.
+ *
+ * <p>DO NOT call this method directly. Use {@link
#checkResourceRequirementsWithDelay()}
+ * instead.
*/
private void checkResourceRequirements() {
final Map<JobID, Collection<ResourceRequirement>> missingResources =
@@ -626,7 +668,7 @@ public class DeclarativeSlotManager implements SlotManager {
throwable);
slotTracker.notifyFree(slotId);
}
- checkResourceRequirements();
+ checkResourceRequirementsWithDelay();
}
return null;
},
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 289cc54d162..c7f4e5e4a1a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -80,7 +81,7 @@ public class FineGrainedSlotManager implements SlotManager {
private final Time taskManagerTimeout;
/** Delay of the requirement change check in the slot manager. */
- private final Time requirementsCheckDelay;
+ private final Duration requirementsCheckDelay;
private final SlotManagerMetricGroup slotManagerMetricGroup;
@@ -121,8 +122,7 @@ public class FineGrainedSlotManager implements SlotManager {
ResourceTracker resourceTracker,
TaskManagerTracker taskManagerTracker,
SlotStatusSyncer slotStatusSyncer,
- ResourceAllocationStrategy resourceAllocationStrategy,
- Time requirementCheckDelay) {
+ ResourceAllocationStrategy resourceAllocationStrategy) {
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
@@ -130,7 +130,8 @@ public class FineGrainedSlotManager implements SlotManager {
this.taskManagerTimeout =
slotManagerConfiguration.getTaskManagerTimeout();
this.waitResultConsumedBeforeRelease =
slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
- this.requirementsCheckDelay =
Preconditions.checkNotNull(requirementCheckDelay);
+ this.requirementsCheckDelay =
+
Preconditions.checkNotNull(slotManagerConfiguration.getRequirementCheckDelay());
this.slotManagerMetricGroup =
Preconditions.checkNotNull(slotManagerMetricGroup);
@@ -490,18 +491,22 @@ public class FineGrainedSlotManager implements
SlotManager {
* are performed with a slight delay.
*/
private void checkResourceRequirementsWithDelay() {
- if (requirementsCheckFuture == null ||
requirementsCheckFuture.isDone()) {
- requirementsCheckFuture = new CompletableFuture<>();
- scheduledExecutor.schedule(
- () ->
- mainThreadExecutor.execute(
- () -> {
- checkResourceRequirements();
-
Preconditions.checkNotNull(requirementsCheckFuture)
- .complete(null);
- }),
- requirementsCheckDelay.toMilliseconds(),
- TimeUnit.MILLISECONDS);
+ if (requirementsCheckDelay.toMillis() <= 0) {
+ checkResourceRequirements();
+ } else {
+ if (requirementsCheckFuture == null ||
requirementsCheckFuture.isDone()) {
+ requirementsCheckFuture = new CompletableFuture<>();
+ scheduledExecutor.schedule(
+ () ->
+ mainThreadExecutor.execute(
+ () -> {
+ checkResourceRequirements();
+
Preconditions.checkNotNull(requirementsCheckFuture)
+ .complete(null);
+ }),
+ requirementsCheckDelay.toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index 5703a5a173a..376c7070852 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -34,6 +34,8 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+
/** Configuration for the {@link SlotManager}. */
public class SlotManagerConfiguration {
@@ -42,6 +44,7 @@ public class SlotManagerConfiguration {
private final Time taskManagerRequestTimeout;
private final Time slotRequestTimeout;
private final Time taskManagerTimeout;
+ private final Duration requirementCheckDelay;
private final boolean waitResultConsumedBeforeRelease;
private final SlotMatchingStrategy slotMatchingStrategy;
private final WorkerResourceSpec defaultWorkerResourceSpec;
@@ -55,6 +58,7 @@ public class SlotManagerConfiguration {
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout,
+ Duration requirementCheckDelay,
boolean waitResultConsumedBeforeRelease,
SlotMatchingStrategy slotMatchingStrategy,
WorkerResourceSpec defaultWorkerResourceSpec,
@@ -67,6 +71,7 @@ public class SlotManagerConfiguration {
this.taskManagerRequestTimeout =
Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout =
Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout =
Preconditions.checkNotNull(taskManagerTimeout);
+ this.requirementCheckDelay =
Preconditions.checkNotNull(requirementCheckDelay);
this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
this.slotMatchingStrategy =
Preconditions.checkNotNull(slotMatchingStrategy);
this.defaultWorkerResourceSpec =
Preconditions.checkNotNull(defaultWorkerResourceSpec);
@@ -92,6 +97,10 @@ public class SlotManagerConfiguration {
return taskManagerTimeout;
}
+ public Duration getRequirementCheckDelay() {
+ return requirementCheckDelay;
+ }
+
public boolean isWaitResultConsumedBeforeRelease() {
return waitResultConsumedBeforeRelease;
}
@@ -136,6 +145,9 @@ public class SlotManagerConfiguration {
Time.milliseconds(
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
+ final Duration requirementCheckDelay =
+
configuration.get(ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY);
+
boolean waitResultConsumedBeforeRelease =
configuration.getBoolean(
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
@@ -158,6 +170,7 @@ public class SlotManagerConfiguration {
rpcTimeout,
slotRequestTimeout,
taskManagerTimeout,
+ requirementCheckDelay,
waitResultConsumedBeforeRelease,
slotMatchingStrategy,
defaultWorkerResourceSpec,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index 4e6398456f9..1c3e51540fd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -30,6 +30,7 @@ import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ScheduledExecutor;
+import java.time.Duration;
import java.util.concurrent.Executor;
/** Builder for {@link DeclarativeSlotManager}. */
@@ -47,6 +48,7 @@ public class DeclarativeSlotManagerBuilder {
private int redundantTaskManagerNum;
private ResourceTracker resourceTracker;
private SlotTracker slotTracker;
+ private Duration requirementCheckDelay;
private DeclarativeSlotManagerBuilder(ScheduledExecutor scheduledExecutor)
{
this.slotMatchingStrategy = AnyMatchingSlotMatchingStrategy.INSTANCE;
@@ -64,6 +66,7 @@ public class DeclarativeSlotManagerBuilder {
ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
this.resourceTracker = new DefaultResourceTracker();
this.slotTracker = new DefaultSlotTracker();
+ this.requirementCheckDelay = Duration.ZERO;
}
public static DeclarativeSlotManagerBuilder newBuilder(ScheduledExecutor
scheduledExecutor) {
@@ -135,12 +138,18 @@ public class DeclarativeSlotManagerBuilder {
return this;
}
+ public DeclarativeSlotManagerBuilder setRequirementCheckDelay(Duration
requirementCheckDelay) {
+ this.requirementCheckDelay = requirementCheckDelay;
+ return this;
+ }
+
public DeclarativeSlotManager build() {
final SlotManagerConfiguration slotManagerConfiguration =
new SlotManagerConfiguration(
taskManagerRequestTimeout,
slotRequestTimeout,
taskManagerTimeout,
+ requirementCheckDelay,
waitResultConsumedBeforeRelease,
slotMatchingStrategy,
defaultWorkerResourceSpec,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 5eb6cf7a264..75cfe80d95f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -60,7 +60,9 @@ import
org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -74,6 +76,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1424,6 +1427,45 @@ public class DeclarativeSlotManagerTest extends
TestLogger {
}
}
+ @Test
+ public void testProcessResourceRequirementsWithDelay() throws Exception {
+ final ResourceTracker resourceTracker = new DefaultResourceTracker();
+ final AtomicInteger allocatedResourceCounter = new AtomicInteger(0);
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+ final Duration delay = Duration.ofMillis(500);
+ try (final DeclarativeSlotManager slotManager =
+ createDeclarativeSlotManagerBuilder(scheduledExecutor)
+ .setResourceTracker(resourceTracker)
+ .setRequirementCheckDelay(delay)
+ .buildAndStartWithDirectExec(
+ ResourceManagerId.generate(),
+ new TestingResourceActionsBuilder()
+ .setAllocateResourceConsumer(
+ workerResourceSpec ->
+
allocatedResourceCounter.getAndIncrement())
+ .build())) {
+
+ final JobID jobId = new JobID();
+
+
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+ Assertions.assertEquals(0, allocatedResourceCounter.get());
+ Assertions.assertEquals(
+ 1,
scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+ final ScheduledFuture<?> future =
+
scheduledExecutor.getActiveNonPeriodicScheduledTask().iterator().next();
+ Assertions.assertEquals(delay.toMillis(),
future.getDelay(TimeUnit.MILLISECONDS));
+
+ // the second request is skipped
+
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+ Assertions.assertEquals(
+ 1,
scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+
+ scheduledExecutor.triggerNonPeriodicScheduledTask();
+ Assertions.assertEquals(1, allocatedResourceCounter.get());
+ }
+ }
+
@Test
public void testClearRequirementsClearsResourceTracker() throws Exception {
final ResourceTracker resourceTracker = new DefaultResourceTracker();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 3357e3531b4..b06b024a208 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.Test;
import java.math.BigDecimal;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -576,7 +577,7 @@ public class FineGrainedSlotManagerTest extends
FineGrainedSlotManagerTestBase {
final List<CompletableFuture<Void>> checkRequirementFutures =
new ArrayList<>();
checkRequirementFutures.add(new CompletableFuture<>());
checkRequirementFutures.add(new CompletableFuture<>());
- final long requirementCheckDelay = 50;
+ final Duration requirementCheckDelay = Duration.ofMillis(50);
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
(ignored1, ignored2) -> {
if (checkRequirementFutures.get(0).isDone()) {
@@ -619,13 +620,13 @@ public class FineGrainedSlotManagerTest extends
FineGrainedSlotManagerTestBase {
final long registrationTime = (System.nanoTime() -
start) / 1_000_000;
assumeTrue(
"The time of process requirement and
register task manager must not take longer than the requirement check delay. If
it does, then this indicates a very slow machine.",
- registrationTime < requirementCheckDelay);
+ registrationTime <
requirementCheckDelay.toMillis());
assertFutureCompleteAndReturn(checkRequirementFutures.get(0));
assertFutureNotComplete(checkRequirementFutures.get(1));
// checkTimes will not increase when there's no
events
- Thread.sleep(requirementCheckDelay * 2);
+ Thread.sleep(requirementCheckDelay.toMillis() * 2);
assertFutureNotComplete(checkRequirementFutures.get(1));
// checkTimes will increase again if there's
another
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 1013f8919cc..700c8f0f3c8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -44,6 +44,7 @@ import org.apache.flink.util.function.RunnableWithException;
import org.junit.ClassRule;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
@@ -150,7 +151,7 @@ public abstract class FineGrainedSlotManagerTestBase
extends TestLogger {
new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
private final Executor mainThreadExecutor =
EXECUTOR_RESOURCE.getExecutor();
private FineGrainedSlotManager slotManager;
- private long requirementCheckDelay = 0;
+ private Duration requirementCheckDelay = Duration.ZERO;
final TestingResourceAllocationStrategy.Builder
resourceAllocationStrategyBuilder =
TestingResourceAllocationStrategy.newBuilder();
@@ -176,7 +177,7 @@ public abstract class FineGrainedSlotManagerTestBase
extends TestLogger {
return resourceManagerId;
}
- public void setRequirementCheckDelay(long requirementCheckDelay) {
+ public void setRequirementCheckDelay(Duration requirementCheckDelay) {
this.requirementCheckDelay = requirementCheckDelay;
}
@@ -208,8 +209,7 @@ public abstract class FineGrainedSlotManagerTestBase
extends TestLogger {
taskManagerTracker,
slotStatusSyncer,
getResourceAllocationStrategy()
-
.orElse(resourceAllocationStrategyBuilder.build()),
- Time.milliseconds(requirementCheckDelay));
+
.orElse(resourceAllocationStrategyBuilder.build()));
runInMainThreadAndWait(
() ->
slotManager.start(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
index 9f8035e497c..f87ce6238cc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
@@ -25,11 +25,14 @@ import
org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.testutils.TestingUtils;
+import java.time.Duration;
+
/** Builder for {@link SlotManagerConfiguration}. */
public class SlotManagerConfigurationBuilder {
private Time taskManagerRequestTimeout;
private Time slotRequestTimeout;
private Time taskManagerTimeout;
+ private Duration requirementCheckDelay;
private boolean waitResultConsumedBeforeRelease;
private WorkerResourceSpec defaultWorkerResourceSpec;
private int numSlotsPerWorker;
@@ -42,6 +45,7 @@ public class SlotManagerConfigurationBuilder {
this.taskManagerRequestTimeout = TestingUtils.infiniteTime();
this.slotRequestTimeout = TestingUtils.infiniteTime();
this.taskManagerTimeout = TestingUtils.infiniteTime();
+ this.requirementCheckDelay =
ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY.defaultValue();
this.waitResultConsumedBeforeRelease = true;
this.defaultWorkerResourceSpec = WorkerResourceSpec.ZERO;
this.numSlotsPerWorker = 1;
@@ -72,6 +76,12 @@ public class SlotManagerConfigurationBuilder {
return this;
}
+ public SlotManagerConfigurationBuilder setRequirementCheckDelay(
+ Duration requirementCheckDelay) {
+ this.requirementCheckDelay = requirementCheckDelay;
+ return this;
+ }
+
public SlotManagerConfigurationBuilder setWaitResultConsumedBeforeRelease(
boolean waitResultConsumedBeforeRelease) {
this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
@@ -114,6 +124,7 @@ public class SlotManagerConfigurationBuilder {
taskManagerRequestTimeout,
slotRequestTimeout,
taskManagerTimeout,
+ requirementCheckDelay,
waitResultConsumedBeforeRelease,
AnyMatchingSlotMatchingStrategy.INSTANCE,
defaultWorkerResourceSpec,