xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r552509039
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -463,7 +473,7 @@ public boolean isAllocated(int index, JobID jobId,
AllocationID allocationId) {
TaskSlot<T> taskSlot = taskSlots.get(index);
if (taskSlot != null) {
return taskSlot.isAllocated(jobId, allocationId);
- } else if (index < 0) {
+ } else if (index >= numberSlots) {
Review comment:
If we also insert dynamic slot to the `taskSlot`, we won't need this
`else-if` branch anymore.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -95,6 +96,9 @@
/** The table state. */
private volatile State state;
+ /** Current index for dynamic slot, should always not less than
numberSlots */
+ private AtomicInteger dynamicSlotIndex;
Review comment:
I think `TaskSlotTableImpl` is not designed to be thread-safe, and
should always be accessed from the rpc main thread. So we should not need
`AtomicInteger` here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -321,6 +325,12 @@ public boolean allocateSlot(
return false;
}
+ // The negative index indicate that the SlotManger allocate a dynamic
slot, we transfer the
+ // index to an increasing number not less than the numberSlots.
+ if (index < 0) {
+ index = nextDynamicSlotIndex();
+ }
Review comment:
It's quite implicit that the method argument is overwritten in the
middle of the method body.
I would suggest the following to convert `index` into a `effectiveIndex` at
the beginning of this method. (Or maybe rename the argument to `requestedIndex`
and convert it to `index`). Then use the effective index for the rest of the
method.
That also means all the `index < 0` checks should be replaced with `index >=
numberSlots`. Maybe introduce a util method `isDynamicIndex`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,6 +288,11 @@ public boolean allocateSlot(
TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
+ if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+ // If the slot is a dynamic slot with expected jobId and
allocationId, it should be
+ // treated as duplicate allocate request.
+ return true;
+ }
Review comment:
These boolean expressions in the `if` and `return` statements have
become quite hard to understand.
Maybe we can wrap them into separate methods with meaningful names.
Something like:
```
if (isAllocationIdExist()) {
return isDuplicateSlot();
} else if (isSlotIndexTaken()) {
return false;
}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,6 +288,11 @@ public boolean allocateSlot(
TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
+ if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+ // If the slot is a dynamic slot with expected jobId and
allocationId, it should be
+ // treated as duplicate allocate request.
+ return true;
+ }
Review comment:
I think this is a reported issue, FLINK-15660.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -329,7 +339,7 @@ public boolean allocateSlot(
jobId,
allocationId,
memoryVerificationExecutor);
- if (index >= 0) {
+ if (index < numberSlots) {
Review comment:
Now since the dynamic slots also have unique indexes, we can also insert
them into `taskSlots`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]