xintongsong commented on a change in pull request #12958:
URL: https://github.com/apache/flink/pull/12958#discussion_r459236877
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,12 @@
"for streaming workloads, which may fail if there are
not enough slots. Note that this configuration option does not take " +
"effect for standalone clusters, where how many slots
are allocated is not controlled by Flink.");
+ public static final ConfigOption<Integer> REDUNDANT_SLOT_NUM =
ConfigOptions
Review comment:
JavaDoc is missing
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,12 @@
"for streaming workloads, which may fail if there are
not enough slots. Note that this configuration option does not take " +
"effect for standalone clusters, where how many slots
are allocated is not controlled by Flink.");
+ public static final ConfigOption<Integer> REDUNDANT_SLOT_NUM =
ConfigOptions
Review comment:
I thought the design was to let users configure the number of redundant
task managers, rather than redundant slots.
What I suggested in the jira discussion was to use number of redundant slots
for deciding whether there are enough redundant task managers. We could
calculate number of redundant slots from number of redundant task managers and
number of slots per task manager. I did not mean having the users directly
configure the number of redundant slots. Sorry for not making my point clear.
IMO, number of redundant task managers should be more straight forward for
the users to decide, because job failures are more likely related to task
manager failures rather than slot failures. Otherwise, user would need to
calculate the relationship between number of redundant task managers / slots
themselves.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -299,6 +303,8 @@ public void start(ResourceManagerId newResourceManagerId,
Executor newMainThread
TimeUnit.MILLISECONDS);
registerSlotManagerMetrics();
+
+ allocateRedundantSlots(redundantSlotNum);
Review comment:
What happens if there's a JM failover without any TM lost?
It could happen that SM first requests the redundant TMs, then all the TMs
(needed & redundant) from previous attempt are recovered and register to SM. As
a result, we may have twice as many redundant TMs as expected.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -1208,15 +1233,16 @@ public static ResourceProfile
generateDefaultSlotResourceProfile(WorkerResourceS
}
//
---------------------------------------------------------------------------------------------
- // Internal timeout methods
+ // Internal periodic check methods
//
---------------------------------------------------------------------------------------------
@VisibleForTesting
- void checkTaskManagerTimeouts() {
+ void checkValidTaskManagers() {
Review comment:
The method name is not very descriptive.
I would suggest something like `checkTaskManagerTimeoutsAndRedundancy`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerValidateInSlotManagerTest.java
##########
@@ -69,13 +70,15 @@
private CompletableFuture<InstanceID> releaseFuture;
private ResourceActions resourceManagerActions;
private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
+ private final AtomicInteger allocateResourceCalls = new
AtomicInteger(0);
Review comment:
We should reset this to `0` in `setup()`.
Please be aware that there's no guarantee on the orders that the test cases
are executed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -915,6 +921,25 @@ private boolean isMaxSlotNumExceededAfterAdding(int
numNewSlot) {
return getNumberRegisteredSlots() +
getNumberPendingTaskManagerSlots() + numNewSlot > maxSlotNum;
}
+ private void allocateRedundantSlots(int slotNum) {
+ int workerNum = (slotNum + numSlotsPerWorker - 1) /
numSlotsPerWorker;
Review comment:
There's a util method `MathUtils#divideRoundUp` that we can reuse.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,12 @@
"for streaming workloads, which may fail if there are
not enough slots. Note that this configuration option does not take " +
"effect for standalone clusters, where how many slots
are allocated is not controlled by Flink.");
+ public static final ConfigOption<Integer> REDUNDANT_SLOT_NUM =
ConfigOptions
+ .key("slotmanager.redundant-slot-num")
+ .intType()
+ .defaultValue(0)
+ .withDescription("Defines the number of redundant slots");
Review comment:
The description appears in user docs. It is important to explain a bit
more about how this option works. Without any background, the term `redundant
slots` alone is hard for users to understand.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -976,6 +976,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots()
throws Exception {
try (final SlotManager slotManager = createSlotManagerBuilder()
.setTaskManagerTimeout(taskManagerTimeout)
+ .setRedundantSlotNum(0)
Review comment:
Why do we need this? Isn't the default already 0?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -915,6 +921,25 @@ private boolean isMaxSlotNumExceededAfterAdding(int
numNewSlot) {
return getNumberRegisteredSlots() +
getNumberPendingTaskManagerSlots() + numNewSlot > maxSlotNum;
}
+ private void allocateRedundantSlots(int slotNum) {
+ int workerNum = (slotNum + numSlotsPerWorker - 1) /
numSlotsPerWorker;
+ int allocatedWorkerNum = allocateResources(workerNum);
+ if (workerNum != allocatedWorkerNum) {
+ LOG.warn("Expect to allocate {} workers for {}
redundant slots. Actually allocate {} workers.",
+ workerNum, slotNum, allocatedWorkerNum);
+ }
+ }
+
+ private int allocateResources(int workerNum) {
+ int allocatedWorkerNum = 0;
+ for (int i = 0; i < workerNum; ++i) {
+ if
(allocateResource(defaultSlotResourceProfile).isPresent()) {
Review comment:
Should we break the for-loop early if the condition is not met?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -1225,15 +1251,27 @@ void checkTaskManagerTimeouts() {
// ResourceActions.releaseResource call
timedOutTaskManagers.add(taskManagerRegistration);
}
+ totalNumberFreeSlots +=
taskManagerRegistration.getNumberFreeSlots();
Review comment:
No need to calculate `totalNumberFreeSlots`. We can get it with
`freeSlots.size()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]