Zhilong Hong created FLINK-24295:
------------------------------------
Summary: Too many requestPartitionState may jam the JobManager
during task deployment
Key: FLINK-24295
URL: https://issues.apache.org/jira/browse/FLINK-24295
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Zhilong Hong
After the optimization of the phase 2 we've done in FLINK-21110, the speed of
task deployment has accelerated. However, we find that during the task
deployment, there may be too many {{requestPartitionState}} RPC calls from
TaskManagers that would jam the JobManager.
Why would there be too many {{requestPartitionState}} RPC calls? After the
optimization, the JobManager can submit tasks to TaskManagers quickly. If
JobManager calls {{submitTask}} faster than the speed of dealing with
{{submitTask}} by TaskManagers, there may be a scenario that some TaskManagers
deploy tasks faster than other TaskManagers.
When a downstream task is deployed, it would try to request partitions from
upstream tasks, which may be located at a remote TaskManager. If the upstream
tasks are not deployed, it would request the partition state from JobManager.
In the worst case, the complexity of the computation and memory would be O(N^2).
In our test with a streaming job, which has two vertices with the 8,000
parallelism and connected with all-to-all edges, in the worst case, there will
be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager. Each RPC
call requires 1 KiB space in the heap memory of the JobManager. The overall
space cost of {{requestPartitionState}} will be 32 GiB, which is a heavy burden
for GC to deal with.
In our test, the size of the heap memory of JobManager is 8 GiB. During the
task deployment the JobManager gets more full GCs. The JobManager gets stuck
since it is filled with full GCs and has no time to deal with the incoming RPC
calls. The log is attached below.
The worst thing is that there's no log outputted for this RPC call. When a user
find the JobManager is get slower or get stuck, he/she won't be able to find
out why.
Why does this case rarely happen before? Before the optimization, it takes a
long time to calculate TaskDeploymentDescriptors and send them to TaskManagers.
JobManager calls {{submitTask}} more slowly than the speed of dealing with
{{submitTask}} by TaskManagers in most cases. Since the deployment of tasks are
topologically sorted, the upstream tasks is deployed before the downstream
tasks, and this case rarely happens.
In my opinion, the solution of this issue needs more discussion. According to
the discussion in the pull request
([https://github.com/apache/flink/pull/6680]), it's not safe to remove this RPC
call, because we cannot always make sure the assumption that an upstream task
failure will always fail the downstream consumers is always right.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)