azagrebin commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r415579887
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -1301,16 +1305,15 @@ private SlotRequest createSlotRequest(JobID jobId,
ResourceProfile resourceProfi
}
private SlotManagerImpl createSlotManager(ResourceManagerId
resourceManagerId, ResourceActions resourceManagerActions) {
- return createSlotManager(resourceManagerId,
resourceManagerActions, 1);
+ return createSlotManager(resourceManagerId,
resourceManagerActions, new Configuration());
}
- private SlotManagerImpl createSlotManager(ResourceManagerId
resourceManagerId, ResourceActions resourceManagerActions, int
numSlotsPerWorker) {
- SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder()
+ private SlotManagerImpl createSlotManager(ResourceManagerId
resourceManagerId, ResourceActions resourceManagerActions, Configuration
configuration) {
+ return SlotManagerBuilder.newBuilder()
.setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC)
- .setNumSlotsPerWorker(numSlotsPerWorker)
- .build();
- slotManager.start(resourceManagerId,
Executors.directExecutor(), resourceManagerActions);
- return slotManager;
+
.setNumSlotsPerWorker(configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+
.setMaxSlotNum(configuration.getInteger(ResourceManagerOptions.MAX_SLOT_NUM))
+ .buildWithDirectExec(resourceManagerId,
Executors.directExecutor(), resourceManagerActions);
Review comment:
```suggestion
.buildAndStartWithDirectExec(resourceManagerId,
Executors.directExecutor(), resourceManagerActions);
```
`buildWithDirectExec` already has `WithDirectExec` in its name so I would
not pass `Executors.directExecutor()` to it but already have it inside
`buildWithDirectExec`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -808,16 +850,31 @@ private void
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
return Optional.empty();
}
- private boolean isFulfillableByRegisteredSlots(ResourceProfile
resourceProfile) {
+ private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile
resourceProfile) {
for (TaskManagerSlot slot : slots.values()) {
if
(slot.getResourceProfile().isMatching(resourceProfile)) {
return true;
}
}
+
+ for (PendingTaskManagerSlot slot : pendingSlots.values()) {
+ if
(slot.getResourceProfile().isMatching(resourceProfile)) {
+ return true;
+ }
+ }
+
return false;
}
private Optional<PendingTaskManagerSlot>
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+ final int numRegisteredSlots = getNumberRegisteredSlots();
+ final int numPendingSlots = getNumberPendingTaskManagerSlots();
+ if (numPendingSlots + numRegisteredSlots + numSlotsPerWorker >
maxSlotNum) {
+ LOG.warn("Could not allocate {} more slots. The number
of registered and pending slots is {}, while the maximum is {}.",
+ numSlotsPerWorker, numPendingSlots +
numRegisteredSlots, maxSlotNum);
+ return Optional.empty();
+ }
Review comment:
nit: potentially this can be deduplicated with
`isMaxSlotNumExceededAfterRegistration` if we make it something like:
`isMaxSlotNumExceededAfterRegistration(int maxNewSlotsNumber,
Supplier<Integer> newSlotsNumberExact)`
but maybe too complicated.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -804,16 +850,31 @@ private void
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
return Optional.empty();
}
- private boolean isFulfillableByRegisteredSlots(ResourceProfile
resourceProfile) {
+ private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile
resourceProfile) {
for (TaskManagerSlot slot : slots.values()) {
if
(slot.getResourceProfile().isMatching(resourceProfile)) {
return true;
}
}
+
+ for (PendingTaskManagerSlot slot : pendingSlots.values()) {
Review comment:
alright, makes sense
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
##########
@@ -1301,13 +1305,14 @@ private SlotRequest createSlotRequest(JobID jobId,
ResourceProfile resourceProfi
}
private SlotManagerImpl createSlotManager(ResourceManagerId
resourceManagerId, ResourceActions resourceManagerActions) {
- return createSlotManager(resourceManagerId,
resourceManagerActions, 1);
+ return createSlotManager(resourceManagerId,
resourceManagerActions, new Configuration());
}
- private SlotManagerImpl createSlotManager(ResourceManagerId
resourceManagerId, ResourceActions resourceManagerActions, int
numSlotsPerWorker) {
+ private SlotManagerImpl createSlotManager(ResourceManagerId
resourceManagerId, ResourceActions resourceManagerActions, Configuration
configuration) {
Review comment:
You could have a method to return a common builder for this test suite
`SlotManagerBuilder.newBuilder().setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC)`.
I would then use explicitly other setters and `buildWithDirectExec`.
The reason we use builders is to improve readability to avoid methods with a
lot of parameters which are not named in java. Setters basically make them
named for the price of some duplication and verbosity.
I would also agree to keep the existing `createSlotManager` with 3 args
(`NUM_TASK_SLOTS`) but adding a 4th arg for `MAX_SLOT_NUM` may be too much
indeed.
imo, passing `Configuration` makes some implicit contract which is not
obvious that it is used only for `NUM_TASK_SLOTS` and `MAX_SLOT_NUM`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]