[
https://issues.apache.org/jira/browse/FLINK-24005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406577#comment-17406577
]
Zhu Zhu edited comment on FLINK-24005 at 8/30/21, 6:20 AM:
-----------------------------------------------------------
Thanks for the explanation! [~trohrmann] [~chesnay]
I now understand why it was needed to decrease resource requirements for
disconnected TMs. But I think it is more a like workaround.
Conceptually, resource requirements should be from the job/scheduler instead of
from the acquired slots. i.e.
- Resource requirements should be increased when job requires for some slots,
including reserving available
slots({{DeclarativeSlotPoolBridge#reserveFreeSlotForResource()}}) or request
new slots({{DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot()}})
- Resource requirements should be decreased when job retract the requirements,
including canceling pending
requests({{DeclarativeSlotPoolBridge#cancelPendingRequests()}} &
{{DeclarativeSlotPoolBridge#releaseSlot()}}) and freeing reserved
slots({{DeclarativeSlotPoolBridge#releaseSlot()}}).
Therefore, I think what Till proposed might be a cleaner solution. However, it
would need more work to maintain a mapping between {{AllocationID}} and
{{ResourceProfile}}. The main obstacle I can see is that the
{{ResourceProfile}} of {{PendingRequest}} may not be correct in all cases,
considering {{DefaultDeclarativeSlotPool#adjustRequirements()}} may have been
invoked.
So at the moment, I'm also fine with that the fix proposed by [~chesnay] which
looks simpler.
was (Author: zhuzh):
Thanks for the explanation! [~trohrmann] [~chesnay]
I now understand why it was needed to decrease resource requirements for
disconnected TMs. But I think it is more a like workaround.
Conceptually, resource requirements should be from the job/scheduler instead of
from the acquired slots. i.e.
- Resource requirements should be increased when job requires for some slots,
including reserving available
slots({{DeclarativeSlotPoolBridge#reserveFreeSlotForResource()}}) or request
new slots({{DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot()}})
- Resource requirements should be decreased when job retract the requirements,
including canceling pending
requests({{DeclarativeSlotPoolBridge#cancelPendingRequests()}} &
{{DeclarativeSlotPoolBridge#releaseSlot()}}) and freeing reserved
slots({{DeclarativeSlotPoolBridge#releaseSlot()}}).
Therefore, I think what Till proposed might be a cleaner solution. However, it
would need more work to maintain a mapping between {{AllocationID}} and
{{ResourceProfile}}. The main obstacle I can see is that the
{{ResourceProfile}} of {{PendingRequest}} may not be correct in all cases,
considering {{DefaultDeclarativeSlotPool#adjustRequirements()}} may have been
invoked.
So at the moment, I'm also fine with that the fix proposed by [~chesnay] which
looks simpler.
> Resource requirements declaration may be incorrect if JobMaster disconnects
> with a TaskManager with available slots in the SlotPool
> -----------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24005
> URL: https://issues.apache.org/jira/browse/FLINK-24005
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.14.0, 1.12.5, 1.13.2
> Reporter: Zhu Zhu
> Assignee: Chesnay Schepler
> Priority: Critical
> Fix For: 1.14.0, 1.12.6, 1.13.3
>
> Attachments: decrease_resource_requirements.log
>
>
> When a TaskManager disconnects with JobMaster, it will trigger the
> `DeclarativeSlotPoolService#decreaseResourceRequirementsBy()` for all the
> slots that are registered to the JobMaster from the TaskManager. If the slots
> are still available, i.e. not assigned to any task, the
> `decreaseResourceRequirementsBy` may lead to incorrect resource requirements
> declaration.
> For example, there is one job with 3 source tasks only. It requires 3 slots
> and declares for 3 slots. Initially all the tasks are running. Suddenly one
> task failed and waits for some delay before restarting. The previous slot is
> returned to the SlotPool. Now the job requires 2 slots and declares for 2
> slots. At this moment, the TaskManager of that returned slot get lost. After
> the triggered `decreaseResourceRequirementsBy`, the job only declares for 1
> slot. Finally, when the failed task starts to re-schedule, the job will
> declare for 2 slots while it actually needs 3 slots.
> The attached log of a real job and logs of the added test in
> https://github.com/zhuzhurk/flink/commit/59ca0ac5fa9c77b97c6e8a43dcc53ca8a0ad6c37
> can demonstrate this case.
> Note that the real job is configured with a large
> "restart-strategy.fixed-delay.delay" and and large "slot.idle.timeout". So
> possibly in production it is a rare case.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)