Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/5931
I looked into the problem @GJL reported and I think it is not caused by
killed TMs which don't have the chance to tell the JMs about the slot
allocations (even though this can theoretically happen). In such a case, the
protocol currently relies on the slot request timeout on the JM side to resend
new requests.
I think the actual problem is related to #5980. The problem there is that
the `ExecutionGraph` does not wait until all of its slots have been properly
returned to the `SlotPool` before it is restarted in case of a recovery. Due to
this, it might happen that some of the old tasks occupy some slots which are
actually needed for the new tasks. If this happens, the actual task to slot
assignment might be suboptimal meaning that the tasks are spread across more
slots than needed. For example, assume that we have two slots with a mapper and
sink task:
`S1: M1_old, S1_old`
`S2: M2_old, S2_old`
Now a failover happens and the system restarts the `ExecutionGraph`. When
this happens `M1_old` and `S2_old` have not been properly released.
`S1: M1_old`
`S2: S2_old`
Now we try to schedule the new tasks which suddenly needs 3 slots.
`S1: M1_old, S1_new`
`S2: M2_new, S2_old`
`S3: M1_new, S2_new`
After the old tasks have been released it would like that:
`S1: S1_new`
`S2: M2_new`
`S3:M1_new, S2_new`
With @GJL tests we had the situation that we could only allocate one
additional container due to resource limitations. Thus, if we actually needed 2
additional container, the program could not start.
By properly waiting for the slot release such a situation should no longer
happen.
However, #5980 does not solve the problem of early killed TMs which could
not communicate with the JM. At the moment we would have to rely on the slot
request timeouts to resolve this situation.
---