GitHub user NicoK opened a pull request:
https://github.com/apache/flink/pull/3480
[FLINK-4545] use size-restricted LocalBufferPool instances for network
communication
Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the
network buffer parameter.
With this PR, the number of buffers a `LocalBufferPool` has to offer, will
be limited to `2 * <number of channels> + 8` for both input and output
connections. This way, we reduce buffer bloat in our network stack without
limiting ourselves to specific jobs and their connections too much since the
total number of network buffers can now be arbitrarily large again without
consequences on the delays checkpoint barriers, for example, have while
travelling through all TMs.
Eventually, this will lead to the network buffer parameter being removed
(which was the initial goal) but in a simple scenario like the following, with
a parallelism of 2 and thus running on 6 TMs, we were able to reduce the
75-percentile of checkpoint delays by 60% from 38ms to 16ms (median at 7 for
both).
```java
final StreamExecutionEnvironment env =
getStreamExecutionEnvironment(params);
env.disableOperatorChaining();
env.enableCheckpointing(1_000L);
DataStreamSource<Tuple2<Long, Long>> source1 = env.addSource(new
LongSource());
source1.slotSharingGroup("source")
.keyBy(1)
.map(new IdentityMapFunction<Tuple2<Long, Long>>())
.slotSharingGroup("map")
.keyBy(1)
.addSink(new DiscardingSink<Tuple2<Long, Long>>())
.slotSharingGroup("sink");
```
By adding random delays (every 1000 keys 0-1ms) to the
`IdentityMapFunction`, the median even improves from 5026ms to 293ms.
Both scenarios do not influence the throughput of the program but for real
programs, reductions in delay may differ since there actual state may need to
be stored and other components take part as well ;)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/NicoK/flink flink-4545
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3480.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3480
----
commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3
Author: Nico Kruber <[email protected]>
Date: 2017-02-10T13:36:37Z
[FLINK-4545] remove (unused) persistent partition type
commit 11557c004450bcbbe680f1575f0e41d164424eae
Author: Nico Kruber <[email protected]>
Date: 2017-02-10T15:11:08Z
[docs] improve some documentation around network buffers
commit cd999061d04ae803c79473241ac1f9b39c1f2731
Author: Nico Kruber <[email protected]>
Date: 2017-02-10T15:12:19Z
[hotfix][network] add some assertions documenting on which locks we rely
commit 8f529bb3f42916c816c5091228569952917ad9b5
Author: Nico Kruber <[email protected]>
Date: 2017-03-01T13:33:44Z
[FLINK-4545] remove fixed-size BufferPool instances
These were unused except for unit tests and will be replaced with bounded
BufferPool instances.
commit 91cea2917e9453f9de5c02472d99d4fc0d090dda
Author: Nico Kruber <[email protected]>
Date: 2017-03-06T11:36:02Z
[FLINK-4545] remove JobVertex#connectNewDataSetAsInput variant without
partition type
This removes
JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern
distPattern)
and requires the developer to call
JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern
distPattern, ResultPartitionType partitionType)
instead and think about the partition type to add.
commit 83d1404b106b558679e4c9ef16123fbc6b5eac72
Author: Nico Kruber <[email protected]>
Date: 2017-03-06T11:37:56Z
[FLINK-4545] remove unused IntermediateDataSet constructors
These were implying a default result partition type which we want the
developer
to actively decide upon.
commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86
Author: Nico Kruber <[email protected]>
Date: 2017-02-10T13:53:09Z
[FLINK-4545] add a bounded result partition type
This can be used to limit the number of network buffers used for this
partition.
(borrows the appropriate parts of a commit previously sketched for
FLINK-5088 to implement bounded network queue lengths)
commit b57f0652a768645a5712d376d0e4b438f35cfa6c
Author: Nico Kruber <[email protected]>
Date: 2017-02-10T17:22:55Z
[FLINK-4545] allow LocalBufferPool to use a limited number of buffers
commit d1d8b18bba967c6fb8f3934aa4cf1cfc8a2c1106
Author: Nico Kruber <[email protected]>
Date: 2017-02-20T16:12:54Z
[FLINK-4545] also make the ResultPartitionType available at the InputGate
This way, we know what kind of result partition is consumed by the input
gate.
commit d23fdf9d80dea5d46bfe2f7597f4d5e1295cae7b
Author: Nico Kruber <[email protected]>
Date: 2017-02-13T18:24:45Z
[FLINK-4545] try to set an upper bound on the LocalBufferPool if restricted
Use "2 * <number of channels> + 8" from the following considerations:
* 1 buffer for in-flight data in the subpartition/input channel
* 1 buffer for parallel serialization
* + some extra buffers
Also re-introduce some tests for bounded buffer pools similar to the
fixed-size
buffer pool tests before.
commit 37eed7bc59b6899e3d7bdd4b1a3dac87e5f04406
Author: Nico Kruber <[email protected]>
Date: 2017-02-24T12:41:11Z
[FLINK-4545] re-implement NetworkBufferPool#redistributeBuffers
This version also takes the bounded network buffers into account.
The distribution is not strictly uniform anymore though:
* for every buffer pool, we determine the maximum number of buffers it can
take
from the available number - let's call this its 'capacity'
* then, each of them will get roughly available * capacity / totalCapacity
buffers on top of the required number of buffers
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---