[ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374672#comment-17374672
 ] 

Zhilong Hong edited comment on FLINK-22643 at 7/5/21, 8:45 AM:
---------------------------------------------------------------

Thank you for your reply, [~pnowojski]. 

??1. Does this solution have some drawback???

By now, we haven't find drawbacks. We've enabled this improvement in our 
internal version of Flink, and it works fine.

??2. Has someone tested if having fewer TCP connections decreases max 
throughput for example???

Fewer TCP connections may influence the performance of network. I'll set up 
several tests to test this improvement. Once the results are ready, I'll update 
the results here.

??3. I presume that the proposed change with the default configuration can not 
cause any regressions, as the code with 
taskmanager.network.max-num-tcp-connections = Integer.MAX_VALUE behaves the 
same way as it does right now???

This improvement makes all tasks on the same TaskManager reuse one TCP 
connection. The number of slots per TaskManager will not exceed 
Integer.MAX_VALUE. Therefore, when 
{{taskmanager.network.max-num-tcp-connections = Integer.MAX_VALUE}}, there will 
be no reuse between different tasks on the same TaskManager. The code works in 
the same way as it does right now. This improvement only takes effects when the 
value of {{max-num-tcp-connection}} is smaller than the number of slots per 
TaskManager.


was (Author: thesharing):
Thank you for your reply, [~pnowojski]. 

??1. Does this solution have some drawback???

By now, we haven't find drawbacks. We've enabled this improvement in our 
internal version of Flink, and it works fine.

??2. Has someone tested if having fewer TCP connections decreases max 
throughput for example???

Fewer TCP connections may I'll set up several tests to test this improvement. 
Once the results are ready, I'll update the results here.

??3. I presume that the proposed change with the default configuration can not 
cause any regressions, as the code with 
taskmanager.network.max-num-tcp-connections = Integer.MAX_VALUE behaves the 
same way as it does right now???

This improvement makes all tasks on the same TaskManager reuse one TCP 
connection. The number of slots per TaskManager will not exceed 
Integer.MAX_VALUE. Therefore, when 
{{taskmanager.network.max-num-tcp-connections = Integer.MAX_VALUE}}, there will 
be no reuse between different tasks on the same TaskManager. The code works in 
the same way as it does right now. This improvement only takes effects when the 
value of {{max-num-tcp-connection}} is smaller than the number of slots per 
TaskManager.

> Too many TCP connections among TaskManagers for large scale jobs
> ----------------------------------------------------------------
>
>                 Key: FLINK-22643
>                 URL: https://issues.apache.org/jira/browse/FLINK-22643
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.13.0
>            Reporter: Zhilong Hong
>            Priority: Minor
>              Labels: auto-deprioritized-major
>             Fix For: 1.14.0
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to