fanrui created FLINK-25417:
------------------------------
Summary: Too many connections for TM
Key: FLINK-25417
URL: https://issues.apache.org/jira/browse/FLINK-25417
Project: Flink
Issue Type: Improvement
Components: Runtime / Network
Affects Versions: 1.14.2, 1.13.5, 1.15.0
Reporter: fanrui
Attachments: image-2021-12-22-19-17-59-486.png,
image-2021-12-22-19-18-23-138.png
Hi masters, when the number of task exceeds 10, some TM has more than 4000 TCP
connections.
!image-2021-12-22-19-17-59-486.png|width=1388,height=307!
h2. Reason:
When the task is initialized, the downstream InputChannel will connect to the
upstream ResultPartition.
In PartitionRequestClientFactory#createPartitionRequestClient, there is a
clients({_}ConcurrentMap<ConnectionID,
CompletableFuture{_}{_}<NettyPartitionRequestClient>{_}{_}> clients{_}). It's a
cache to avoid repeated tcp connections. But the ConnectionID has a field is
connectionIndex.
The connectionIndex comes from IntermediateResult, which is a random number.
When multiple Tasks are running in a TM, other TMs need to establish multiple
connections to this TM, and each Task has a NettyPartitionRequestClient.
Assume that the parallelism of the flink job is 100, each TM has 20 Tasks, and
the Partition strategy between tasks is rebalance or hash. Then the number of
connections for a single TM is (20-1) * 100 * 2 = 3800. If multiple such TMs
are running on a single node, there is a risk.
I want to know whether it is risky to change the cache key to
connectionID.address? That is: a tcp connection is shared between all Tasks of
TM.
I guess it is feasible because:
# I have tested it and the task can run normally.
# The Message contains the InputChannelID, which is used to distinguish which
channel the NettyMessage belongs to.
!image-2021-12-22-19-18-23-138.png|width=2953,height=686!
--
This message was sent by Atlassian Jira
(v8.20.1#820001)