This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit abe0fbf433f66bb94f5b42ab46a00348ea951ade Author: Weihua Hu <[email protected]> AuthorDate: Thu Mar 23 14:40:31 2023 +0800 [FLINK-31498][runtime] Only request redundant task manager when all pending slots registered. --- .../slotmanager/TaskExecutorManager.java | 12 +++++-- .../slotmanager/TaskExecutorManagerTest.java | 38 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java index ae163ad0d26..221bd44d46c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java @@ -377,9 +377,15 @@ class TaskExecutorManager implements AutoCloseable { int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - getNumberFreeSlots(); if (slotsDiff > 0) { - // Keep enough redundant taskManagers from time to time. - int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker); - allocateRedundantTaskManagers(requiredTaskManagers); + if (pendingSlots.isEmpty()) { + // Keep enough redundant taskManagers from time to time. + int requiredTaskManagers = + MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker); + allocateRedundantTaskManagers(requiredTaskManagers); + } else { + LOG.debug( + "There are some pending slots, skip allocate redundant task manager and wait them fulfilled."); + } } else { // second we trigger the release resource callback which can decide upon the // resource release diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java index a5e9dfc5158..cee5f7dd5ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; import org.junit.jupiter.api.Test; @@ -261,6 +262,43 @@ class TaskExecutorManagerTest { } } + @Test + void testRequestRedundantTaskManager() { + final ResourceProfile resourceProfile = ResourceProfile.newBuilder().setCpuCores(1).build(); + final AtomicInteger declareResourceCount = new AtomicInteger(0); + final ResourceAllocator resourceAllocator = + new TestingResourceAllocatorBuilder() + .setDeclareResourceNeededConsumer( + (resourceDeclarations) -> declareResourceCount.getAndIncrement()) + .build(); + ManuallyTriggeredScheduledExecutor taskRestartExecutor = + new ManuallyTriggeredScheduledExecutor(); + try (final TaskExecutorManager taskExecutorManager = + new TaskExecutorManagerBuilder(taskRestartExecutor) + .setRedundantTaskManagerNum(1) + .setMaxNumSlots(10) + .setResourceAllocator(resourceAllocator) + .createTaskExecutorManager()) { + + // do not check redundant task managers with no registered + taskRestartExecutor.triggerScheduledTasks(); + assertThat(declareResourceCount).hasValue(0); + + InstanceID taskExecutorId = + createAndRegisterTaskExecutor(taskExecutorManager, 1, resourceProfile); + taskExecutorManager.occupySlot(taskExecutorId); + assertThat(declareResourceCount).hasValue(0); + + // request 1 redundant task manager + taskRestartExecutor.triggerScheduledTasks(); + assertThat(declareResourceCount).hasValue(1); + + // will not trigger new redundant task managers when there are pending slots. + taskRestartExecutor.triggerScheduledTasks(); + assertThat(declareResourceCount).hasValue(1); + } + } + /** * Test that the task executor manager only allocates new workers if their worker spec can * fulfill the requested resource profile.
