[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897795#comment-15897795
 ] 

ASF GitHub Bot commented on FLINK-4545:
---------------------------------------

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/3480

    [FLINK-4545] use size-restricted LocalBufferPool instances for network 
communication

    Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the 
network buffer parameter.
    
    With this PR, the number of buffers a `LocalBufferPool` has to offer, will 
be limited to `2 * <number of channels> + 8` for both input and output 
connections. This way, we reduce buffer bloat in our network stack without 
limiting ourselves to specific jobs and their connections too much since the 
total number of network buffers can now be arbitrarily large again without 
consequences on the delays checkpoint barriers, for example, have while 
travelling through all TMs.
    
    Eventually, this will lead to the network buffer parameter being removed 
(which was the initial goal) but in a simple scenario like the following, with 
a parallelism of 2 and thus running on 6 TMs, we were able to reduce the 
75-percentile of checkpoint delays by 60% from 38ms to 16ms (median at 7 for 
both).
    
    ```java
    final StreamExecutionEnvironment env = 
getStreamExecutionEnvironment(params);
    env.disableOperatorChaining();
    
    env.enableCheckpointing(1_000L);
    
    DataStreamSource<Tuple2<Long, Long>> source1 = env.addSource(new 
LongSource());
    
    source1.slotSharingGroup("source")
                .keyBy(1)
                .map(new IdentityMapFunction<Tuple2<Long, Long>>())
                .slotSharingGroup("map")
                .keyBy(1)
                .addSink(new DiscardingSink<Tuple2<Long, Long>>())
                .slotSharingGroup("sink");
    ```
    
    By adding random delays (every 1000 keys 0-1ms) to the 
`IdentityMapFunction`, the median even improves from 5026ms to 293ms.
    
    Both scenarios do not influence the throughput of the program but for real 
programs, reductions in delay may differ since there actual state may need to 
be stored and other components take part as well ;)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-4545

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3480
    
----
commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-10T13:36:37Z

    [FLINK-4545] remove (unused) persistent partition type

commit 11557c004450bcbbe680f1575f0e41d164424eae
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-10T15:11:08Z

    [docs] improve some documentation around network buffers

commit cd999061d04ae803c79473241ac1f9b39c1f2731
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-10T15:12:19Z

    [hotfix][network] add some assertions documenting on which locks we rely

commit 8f529bb3f42916c816c5091228569952917ad9b5
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-03-01T13:33:44Z

    [FLINK-4545] remove fixed-size BufferPool instances
    
    These were unused except for unit tests and will be replaced with bounded
    BufferPool instances.

commit 91cea2917e9453f9de5c02472d99d4fc0d090dda
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-03-06T11:36:02Z

    [FLINK-4545] remove JobVertex#connectNewDataSetAsInput variant without 
partition type
    
    This removes
    JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern 
distPattern)
    and requires the developer to call
    JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern 
distPattern, ResultPartitionType partitionType)
    instead and think about the partition type to add.

commit 83d1404b106b558679e4c9ef16123fbc6b5eac72
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-03-06T11:37:56Z

    [FLINK-4545] remove unused IntermediateDataSet constructors
    
    These were implying a default result partition type which we want the 
developer
    to actively decide upon.

commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-10T13:53:09Z

    [FLINK-4545] add a bounded result partition type
    
    This can be used to limit the number of network buffers used for this 
partition.
    
    (borrows the appropriate parts of a commit previously sketched for
    FLINK-5088 to implement bounded network queue lengths)

commit b57f0652a768645a5712d376d0e4b438f35cfa6c
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-10T17:22:55Z

    [FLINK-4545] allow LocalBufferPool to use a limited number of buffers

commit d1d8b18bba967c6fb8f3934aa4cf1cfc8a2c1106
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-20T16:12:54Z

    [FLINK-4545] also make the ResultPartitionType available at the InputGate
    
    This way, we know what kind of result partition is consumed by the input 
gate.

commit d23fdf9d80dea5d46bfe2f7597f4d5e1295cae7b
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-13T18:24:45Z

    [FLINK-4545] try to set an upper bound on the LocalBufferPool if restricted
    
    Use "2 * <number of channels> + 8" from the following considerations:
    * 1 buffer for in-flight data in the subpartition/input channel
    * 1 buffer for parallel serialization
    * + some extra buffers
    
    Also re-introduce some tests for bounded buffer pools similar to the 
fixed-size
    buffer pool tests before.

commit 37eed7bc59b6899e3d7bdd4b1a3dac87e5f04406
Author: Nico Kruber <n...@data-artisans.com>
Date:   2017-02-24T12:41:11Z

    [FLINK-4545] re-implement NetworkBufferPool#redistributeBuffers
    
    This version also takes the bounded network buffers into account.
    The distribution is not strictly uniform anymore though:
    * for every buffer pool, we determine the maximum number of buffers it can 
take
      from the available number - let's call this its 'capacity'
    * then, each of them will get roughly available * capacity / totalCapacity
      buffers on top of the required number of buffers

----


> Flink automatically manages TM network buffer
> ---------------------------------------------
>
>                 Key: FLINK-4545
>                 URL: https://issues.apache.org/jira/browse/FLINK-4545
>             Project: Flink
>          Issue Type: Wish
>          Components: Network
>            Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to