xintongsong commented on a change in pull request #12958:
URL: https://github.com/apache/flink/pull/12958#discussion_r461996579



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -299,6 +303,8 @@ public void start(ResourceManagerId newResourceManagerId, 
Executor newMainThread
                        TimeUnit.MILLISECONDS);
 
                registerSlotManagerMetrics();
+
+               allocateRedundantSlots(redundantSlotNum);

Review comment:
       I'd like to explain a bit more about the outcome of the offline 
discussion, for public visibility.
   
   We do not allocate redundant task managers immediately when starting the 
resource manager, to leave some time for task managers from previous attempts 
to register. Otherwise, we may result in holding more task managers than 
expected (required + redundant), and those extra task managers may not be 
easily released if there are allocated slots.
   
   Instead of allocating redundant task managers immediately, we periodically 
check number of task managers, together with checking task manager idleness, 
and allocate redundant task managers if needed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -976,6 +976,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() 
throws Exception {
 
                try (final SlotManager slotManager = createSlotManagerBuilder()
                        .setTaskManagerTimeout(taskManagerTimeout)
+                       .setRedundantSlotNum(0)

Review comment:
       Make sense to me.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,21 @@
                        "for streaming workloads, which may fail if there are 
not enough slots. Note that this configuration option does not take " +
                        "effect for standalone clusters, where how many slots 
are allocated is not controlled by Flink.");
 
+       /**
+        * Define the number of redundant taskmanagers.
+        * They are used to speed up failover in case that taskmanager is 
killed.
+        * Enough redundant taskmanagers are maintained when check timeout 
taskManagers from time to time.
+        * Internally, redundant taskmanagers are calculated based on slots 
instead of physical resources.
+        */
+       public static final ConfigOption<Integer> REDUNDANT_TASK_MANAGER_NUM = 
ConfigOptions
+               .key("slotmanager.redundant-taskmanager-num")
+               .intType()
+               .defaultValue(0)
+               .withDescription("Define the number of redundant taskmanagers. 
" +
+                       "They are used to speed up failover in case that 
taskmanager is killed. " +
+                       "Enough redundant taskmanagers are maintained when 
check timeout taskManagers from time to time. " +
+                       "Internally, redundant taskmanagers are calculated 
based on slots instead of physical resources.");

Review comment:
       I think the users do not need to understand the internally details how 
this feature works. I would suggest the following for the description.
   
   > The number of redundant task managers. Redundant task managers are extra 
task managers started by Flink, in order to speed up job recovery in case of 
failures due to task manager lost. Note that this feature is available only to 
the active deployments (native K8s, Yarn and Mesos).

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
##########
@@ -135,14 +216,80 @@ public void 
testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Ex
                }
        }
 
+       /**
+        * Register four taskManagers that all have two slots.
+        * The difference between the taskManagers is whether the slot is 
allocated.
+        * To maintain redundantTaskManagerNum, SlotManagerImpl may release or 
allocate taskManagers.
+        * @param redundantTaskManagerNum
+        * @throws Exception
+        */
+       private void registerAndCheckMultiTaskManagers(int 
redundantTaskManagerNum) throws Exception {
+               SlotManagerImpl slotManager = 
createAndStartSlotManager(redundantTaskManagerNum, 2);
+
+               // Both slots are free.
+               AtomicReference<CompletableFuture<Boolean>> 
canBeReleasedFuture0 = new AtomicReference<>();
+               registerTaskManagerWithTwoSlots(slotManager, 
canBeReleasedFuture0, true, true);
+
+               // Both slots are allocated.
+               AtomicReference<CompletableFuture<Boolean>> 
canBeReleasedFuture1 = new AtomicReference<>();
+               registerTaskManagerWithTwoSlots(slotManager, 
canBeReleasedFuture1, false, false);
+
+               // One slot is allocated, the other is free.
+               AtomicReference<CompletableFuture<Boolean>> 
canBeReleasedFuture2 = new AtomicReference<>();
+               registerTaskManagerWithTwoSlots(slotManager, 
canBeReleasedFuture2, false, true);
+
+               // One slot is free, the other is allocated.
+               AtomicReference<CompletableFuture<Boolean>> 
canBeReleasedFuture3 = new AtomicReference<>();
+               registerTaskManagerWithTwoSlots(slotManager, 
canBeReleasedFuture3, true, false);
+
+               
checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> 
{},
+                       Arrays.asList(canBeReleasedFuture0, 
canBeReleasedFuture1, canBeReleasedFuture2, canBeReleasedFuture3));

Review comment:
       It seems we don't need custom `canBeReleaseFuture`. The tests should be 
a lot easier with the default `canBeReleasedFuture` of 
`TestingTaskExecutorGatewayBuilder`, by simply adding a 
`checkTaskManagerTimeout` without the custom future argument.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to