This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bec6a458970 [FLINK-31297][Runtime] Use processResourceRequirements to
allocate task manager in FineGrainedSlotManagerTest.
bec6a458970 is described below
commit bec6a4589703bb7619cfc04bf69822995b49893f
Author: Weihua Hu <[email protected]>
AuthorDate: Thu Mar 2 23:27:28 2023 +0800
[FLINK-31297][Runtime] Use processResourceRequirements to allocate task
manager in FineGrainedSlotManagerTest.
We use TaskManagerTracker.addPendingTaskManager to allocate task manager
in FineGrainedSlotManagerTest. This make the resource requirements
different between FineGrainedSlotManager and TaskManagerTracker. And the
FineGrainedSlotManager may trigger resource recycling after the
requirementCheckDelay.
So we use processResourceRequirements to allocate task manager. And set
the requirementCheckDelay to 0 to stabilize this test.
---
.../slotmanager/FineGrainedSlotManagerTest.java | 30 +++++++++++++++++-----
.../FineGrainedSlotManagerTestBase.java | 6 -----
2 files changed, 24 insertions(+), 12 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index f4cd9f7f1d0..2a41ab14dbf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -208,6 +208,26 @@ class FineGrainedSlotManagerTest extends
FineGrainedSlotManagerTestBase {
new AllocationID(),
DEFAULT_SLOT_RESOURCE_PROFILE));
new Context() {
{
+
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
+ (jobRequirements, ignore) -> {
+ assertThat(jobRequirements).hasSize(1);
+ JobID jobID =
jobRequirements.keySet().stream().findFirst().get();
+ ResourceAllocationResult.Builder builder =
+ ResourceAllocationResult.builder();
+ PendingTaskManager pendingTaskManager =
+ new PendingTaskManager(
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_NUM_SLOTS_PER_WORKER);
+
builder.addPendingTaskManagerAllocate(pendingTaskManager);
+ builder.addAllocationOnPendingResource(
+ jobID,
+
pendingTaskManager.getPendingTaskManagerId(),
+ DEFAULT_SLOT_RESOURCE_PROFILE);
+ return builder.build();
+ });
+
+
slotManagerConfigurationBuilder.setRequirementCheckDelay(Duration.ZERO);
+
runTest(
() -> {
final
CompletableFuture<SlotManager.RegistrationResult>
@@ -218,11 +238,9 @@ class FineGrainedSlotManagerTest extends
FineGrainedSlotManagerTestBase {
registerTaskManagerFuture3 = new
CompletableFuture<>();
runInMainThread(
() -> {
- getTaskManagerTracker()
- .addPendingTaskManager(
- new PendingTaskManager(
-
DEFAULT_TOTAL_RESOURCE_PROFILE,
-
DEFAULT_NUM_SLOTS_PER_WORKER));
+ getSlotManager()
+ .processResourceRequirements(
+
createResourceRequirementsForSingleSlot());
// task manager with allocated slot
cannot deduct pending
// task manager
registerTaskManagerFuture1.complete(
@@ -576,7 +594,7 @@ class FineGrainedSlotManagerTest extends
FineGrainedSlotManagerTestBase {
}
return ResourceAllocationResult.builder().build();
});
- setRequirementCheckDelay(requirementCheckDelay);
+
slotManagerConfigurationBuilder.setRequirementCheckDelay(requirementCheckDelay);
runTest(
() -> {
final ResourceRequirements resourceRequirements1 =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index f2e8e4f9d76..d63da233b47 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -44,7 +44,6 @@ import org.apache.flink.util.function.RunnableWithException;
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
@@ -152,7 +151,6 @@ abstract class FineGrainedSlotManagerTestBase {
new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
private final Executor mainThreadExecutor =
EXECUTOR_RESOURCE.getExecutor();
private FineGrainedSlotManager slotManager;
- private Duration requirementCheckDelay = Duration.ZERO;
final TestingResourceAllocationStrategy.Builder
resourceAllocationStrategyBuilder =
TestingResourceAllocationStrategy.newBuilder();
@@ -181,10 +179,6 @@ abstract class FineGrainedSlotManagerTestBase {
return resourceManagerId;
}
- public void setRequirementCheckDelay(Duration requirementCheckDelay) {
- this.requirementCheckDelay = requirementCheckDelay;
- }
-
public void setSlotManagerMetricGroup(SlotManagerMetricGroup
slotManagerMetricGroup) {
this.slotManagerMetricGroup = slotManagerMetricGroup;
}