[
https://issues.apache.org/jira/browse/FLINK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17458167#comment-17458167
]
Zhilong Hong commented on FLINK-24295:
--------------------------------------
Thank you for your replies, [~trohrmann] and [~huntercc]. I'm wondering what
should the default value of retry times be.
> Too many requestPartitionState may jam the JobManager during task deployment
> ----------------------------------------------------------------------------
>
> Key: FLINK-24295
> URL: https://issues.apache.org/jira/browse/FLINK-24295
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.14.0, 1.15.0
> Reporter: Zhilong Hong
> Priority: Major
>
> After the optimization of the phase 2 we've done in FLINK-21110, the speed of
> task deployment has accelerated. However, we find that during the task
> deployment, there may be too many {{requestPartitionState}} RPC calls from
> TaskManagers that would jam the JobManager.
> Why would there be too many {{requestPartitionState}} RPC calls? After the
> optimization, the JobManager can submit tasks to TaskManagers quickly. If
> JobManager calls {{submitTask}} faster than the speed of dealing with
> {{submitTask}} by TaskManagers, there may be a scenario that some
> TaskManagers deploy tasks faster than other TaskManagers.
> When a downstream task is deployed, it would try to request partitions from
> upstream tasks, which may be located at a remote TaskManager. If the upstream
> tasks are not deployed, it would request the partition state from JobManager.
> In the worst case, the complexity of the computation and memory would be
> O(N^2).
> In our test with a streaming job, which has two vertices with the 8,000
> parallelism and connected with all-to-all edges, in the worst case, there
> will be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager.
> Each RPC call requires 1 KiB space in the heap memory of the JobManager. The
> overall space cost of {{requestPartitionState}} will be 32 GiB, which is a
> heavy burden for GC to deal with.
> In our test, the size of the heap memory of JobManager is 8 GiB. During the
> task deployment the JobManager gets more full GCs. The JobManager gets stuck
> since it is filled with full GCs and has no time to deal with the incoming
> RPC calls.
> The worst thing is that there's no log outputted for this RPC call. When a
> user find the JobManager is get slower or get stuck, he/she won't be able to
> find out why.
> Why does this case rarely happen before? Before the optimization, it takes a
> long time to calculate TaskDeploymentDescriptors and send them to
> TaskManagers. JobManager calls {{submitTask}} more slowly than the speed of
> dealing with {{submitTask}} by TaskManagers in most cases. Since the
> deployment of tasks are topologically sorted, the upstream tasks is deployed
> before the downstream tasks, and this case rarely happens.
> In my opinion, the solution of this issue needs more discussion. According to
> the discussion in the pull request
> ([https://github.com/apache/flink/pull/6680]), it's not safe to remove this
> RPC call, because we cannot always make sure the assumption that an upstream
> task failure will always fail the downstream consumers is always right.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)