xintongsong commented on a change in pull request #12958:
URL: https://github.com/apache/flink/pull/12958#discussion_r459236877



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,12 @@
                        "for streaming workloads, which may fail if there are 
not enough slots. Note that this configuration option does not take " +
                        "effect for standalone clusters, where how many slots 
are allocated is not controlled by Flink.");
 
+       public static final ConfigOption<Integer> REDUNDANT_SLOT_NUM = 
ConfigOptions

Review comment:
       JavaDoc is missing

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,12 @@
                        "for streaming workloads, which may fail if there are 
not enough slots. Note that this configuration option does not take " +
                        "effect for standalone clusters, where how many slots 
are allocated is not controlled by Flink.");
 
+       public static final ConfigOption<Integer> REDUNDANT_SLOT_NUM = 
ConfigOptions

Review comment:
       I thought the design was to let users configure the number of redundant 
task managers, rather than redundant slots.
   
   What I suggested in the jira discussion was to use number of redundant slots 
for deciding whether there are enough redundant task managers. We could 
calculate number of redundant slots from number of redundant task managers and 
number of slots per task manager. I did not mean having the users directly 
configure the number of redundant slots. Sorry for not making my point clear.
   
   IMO, number of redundant task managers should be more straight forward for 
the users to decide, because job failures are more likely related to task 
manager failures rather than slot failures. Otherwise, user would need to 
calculate the relationship between number of redundant task managers / slots 
themselves.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -299,6 +303,8 @@ public void start(ResourceManagerId newResourceManagerId, 
Executor newMainThread
                        TimeUnit.MILLISECONDS);
 
                registerSlotManagerMetrics();
+
+               allocateRedundantSlots(redundantSlotNum);

Review comment:
       What happens if there's a JM failover without any TM lost?
   It could happen that SM first requests the redundant TMs, then all the TMs 
(needed & redundant) from previous attempt are recovered and register to SM. As 
a result, we may have twice as many redundant TMs as expected.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -1208,15 +1233,16 @@ public static ResourceProfile 
generateDefaultSlotResourceProfile(WorkerResourceS
        }
 
        // 
---------------------------------------------------------------------------------------------
-       // Internal timeout methods
+       // Internal periodic check methods
        // 
---------------------------------------------------------------------------------------------
 
        @VisibleForTesting
-       void checkTaskManagerTimeouts() {
+       void checkValidTaskManagers() {

Review comment:
       The method name is not very descriptive.
   I would suggest something like `checkTaskManagerTimeoutsAndRedundancy`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerValidateInSlotManagerTest.java
##########
@@ -69,13 +70,15 @@
        private CompletableFuture<InstanceID> releaseFuture;
        private ResourceActions resourceManagerActions;
        private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
+       private final AtomicInteger allocateResourceCalls = new 
AtomicInteger(0);

Review comment:
       We should reset this to `0` in `setup()`.
   Please be aware that there's no guarantee on the orders that the test cases 
are executed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -915,6 +921,25 @@ private boolean isMaxSlotNumExceededAfterAdding(int 
numNewSlot) {
                return getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numNewSlot > maxSlotNum;
        }
 
+       private void allocateRedundantSlots(int slotNum) {
+               int workerNum = (slotNum + numSlotsPerWorker - 1) / 
numSlotsPerWorker;

Review comment:
       There's a util method `MathUtils#divideRoundUp` that we can reuse.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,12 @@
                        "for streaming workloads, which may fail if there are 
not enough slots. Note that this configuration option does not take " +
                        "effect for standalone clusters, where how many slots 
are allocated is not controlled by Flink.");
 
+       public static final ConfigOption<Integer> REDUNDANT_SLOT_NUM = 
ConfigOptions
+               .key("slotmanager.redundant-slot-num")
+               .intType()
+               .defaultValue(0)
+               .withDescription("Defines the number of redundant slots");

Review comment:
       The description appears in user docs. It is important to explain a bit 
more about how this option works. Without any background, the term `redundant 
slots` alone is hard for users to understand.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -976,6 +976,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() 
throws Exception {
 
                try (final SlotManager slotManager = createSlotManagerBuilder()
                        .setTaskManagerTimeout(taskManagerTimeout)
+                       .setRedundantSlotNum(0)

Review comment:
       Why do we need this? Isn't the default already 0?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -915,6 +921,25 @@ private boolean isMaxSlotNumExceededAfterAdding(int 
numNewSlot) {
                return getNumberRegisteredSlots() + 
getNumberPendingTaskManagerSlots() + numNewSlot > maxSlotNum;
        }
 
+       private void allocateRedundantSlots(int slotNum) {
+               int workerNum = (slotNum + numSlotsPerWorker - 1) / 
numSlotsPerWorker;
+               int allocatedWorkerNum = allocateResources(workerNum);
+               if (workerNum != allocatedWorkerNum) {
+                       LOG.warn("Expect to allocate {} workers for {} 
redundant slots. Actually allocate {} workers.",
+                               workerNum, slotNum, allocatedWorkerNum);
+               }
+       }
+
+       private int allocateResources(int workerNum) {
+               int allocatedWorkerNum = 0;
+               for (int i = 0; i < workerNum; ++i) {
+                       if 
(allocateResource(defaultSlotResourceProfile).isPresent()) {

Review comment:
       Should we break the for-loop early if the condition is not met?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -1225,15 +1251,27 @@ void checkTaskManagerTimeouts() {
                                        // ResourceActions.releaseResource call
                                        
timedOutTaskManagers.add(taskManagerRegistration);
                                }
+                               totalNumberFreeSlots += 
taskManagerRegistration.getNumberFreeSlots();

Review comment:
       No need to calculate `totalNumberFreeSlots`. We can get it with 
`freeSlots.size()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to