This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abe0fbf433f66bb94f5b42ab46a00348ea951ade
Author: Weihua Hu <[email protected]>
AuthorDate: Thu Mar 23 14:40:31 2023 +0800

    [FLINK-31498][runtime] Only request redundant task manager when all pending 
slots registered.
---
 .../slotmanager/TaskExecutorManager.java           | 12 +++++--
 .../slotmanager/TaskExecutorManagerTest.java       | 38 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index ae163ad0d26..221bd44d46c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -377,9 +377,15 @@ class TaskExecutorManager implements AutoCloseable {
 
             int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - 
getNumberFreeSlots();
             if (slotsDiff > 0) {
-                // Keep enough redundant taskManagers from time to time.
-                int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, 
numSlotsPerWorker);
-                allocateRedundantTaskManagers(requiredTaskManagers);
+                if (pendingSlots.isEmpty()) {
+                    // Keep enough redundant taskManagers from time to time.
+                    int requiredTaskManagers =
+                            MathUtils.divideRoundUp(slotsDiff, 
numSlotsPerWorker);
+                    allocateRedundantTaskManagers(requiredTaskManagers);
+                } else {
+                    LOG.debug(
+                            "There are some pending slots, skip allocate 
redundant task manager and wait them fulfilled.");
+                }
             } else {
                 // second we trigger the release resource callback which can 
decide upon the
                 // resource release
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index a5e9dfc5158..cee5f7dd5ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.junit.jupiter.api.Test;
@@ -261,6 +262,43 @@ class TaskExecutorManagerTest {
         }
     }
 
+    @Test
+    void testRequestRedundantTaskManager() {
+        final ResourceProfile resourceProfile = 
ResourceProfile.newBuilder().setCpuCores(1).build();
+        final AtomicInteger declareResourceCount = new AtomicInteger(0);
+        final ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
+                        .setDeclareResourceNeededConsumer(
+                                (resourceDeclarations) -> 
declareResourceCount.getAndIncrement())
+                        .build();
+        ManuallyTriggeredScheduledExecutor taskRestartExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        try (final TaskExecutorManager taskExecutorManager =
+                new TaskExecutorManagerBuilder(taskRestartExecutor)
+                        .setRedundantTaskManagerNum(1)
+                        .setMaxNumSlots(10)
+                        .setResourceAllocator(resourceAllocator)
+                        .createTaskExecutorManager()) {
+
+            // do not check redundant task managers with no registered
+            taskRestartExecutor.triggerScheduledTasks();
+            assertThat(declareResourceCount).hasValue(0);
+
+            InstanceID taskExecutorId =
+                    createAndRegisterTaskExecutor(taskExecutorManager, 1, 
resourceProfile);
+            taskExecutorManager.occupySlot(taskExecutorId);
+            assertThat(declareResourceCount).hasValue(0);
+
+            // request 1 redundant task manager
+            taskRestartExecutor.triggerScheduledTasks();
+            assertThat(declareResourceCount).hasValue(1);
+
+            // will not trigger new redundant task managers when there are 
pending slots.
+            taskRestartExecutor.triggerScheduledTasks();
+            assertThat(declareResourceCount).hasValue(1);
+        }
+    }
+
     /**
      * Test that the task executor manager only allocates new workers if their 
worker spec can
      * fulfill the requested resource profile.

Reply via email to