GitHub user NicoK opened a pull request:
https://github.com/apache/flink/pull/5261
[FLINK-8371][network] always recycle Buffers when releasing
SpillableSubpartition
## What is the purpose of the change
There were places where `Buffer` instances were not released upon
`SpillableSubpartition#release()` with a view attached to a non-spilled
subpartition:
1) `SpillableSubpartition#buffer`: `SpillableSubpartition#release()`
delegates the recycling to the view, but `SpillableSubpartitionView` does not
clean up the `buffers` queue (the recycling was only done by the subpartition
if there was no view).
2) `SpillableSubpartitionView#nextBuffer`: If this field is populated when
the subpartition is released, it will neither be given out in subsequent
`SpillableSubpartitionView#getNextBuffer()` calls (there was a short path
returning `null` here), nor was it recycled
- Please refer to dataArtisans/flink#3.
- This PR is based on #5260 .
- It should probably be applied to Flink-1.4 as well.
## Brief change log
- similarly to the `PipelinesSubpartition` implementation, make
`SpillableSubpartition#release()` always clean up and recycle the buffers
- recycle `SpillableSubpartitionView#nextBuffer` in
`SpillableSubpartitionView#releaseAllResources()`
## Verifying this change
This change added tests and can be verified as follows:
- added tests for various scenarios releasing a spillable/spilled or
pipelined subpartition
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **not applicable**
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/NicoK/flink flink-8371
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5261.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5261
----
commit fada3022186670851a3956a961f490e6d86e2a53
Author: Nico Kruber <nico@...>
Date: 2017-12-13T14:28:08Z
[hotfix][checkstyle] only ignore checkstyle in existing packages under
runtime.io.network
- ignore runtime.io.(async|disk)
- ignore runtime.io.network.(api|buffer|netty|partition|serialization|util)
-> everything else will be checked against the ruleset
- fix checkstyle errors in classes directly under runtime.io.network
commit f8dff47a707c4e7572d02e072197927ec2ce2ef7
Author: Nico Kruber <nico@...>
Date: 2017-12-14T16:30:19Z
[FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks
This allows us to use the output flushing interval as a parameter to
evaluate,
too.
commit a4980ae85bac1dca9f8939e0dc3c8839991ed5e8
Author: Zhijiang <wangzhijiang999@...>
Date: 2017-08-17T11:38:45Z
[FLINK-7468][network] Implement sender backlog logic for credit-based
commit 0da032b3e7b90da2cbee5ca6f051667add104ac6
Author: Piotr Nowojski <piotr.nowojski@...>
Date: 2018-01-05T14:28:40Z
[FLINK-8375][network] Remove unnecessary synchronization
Synchronized blocks in ResultPartition could affect only:
1. totalNumberOfBuffers and totalNumberOfBytes counters
2. subpartition add(), finish() and release() calls.
However:
1. counters were not used anywhere - they are removed by this commit
2a. add(), finish() and release() methods for PipelinedSubpartition were
already threads safe
2b. add(), finish() and release() methods for SpillableSubpartition were
made thread safe in
this commit, by basically pushing synchronized section down one level.
commit 2938610f996361e68dedefed6c247c3547cea331
Author: Nico Kruber <nico@...>
Date: 2018-01-05T15:17:13Z
[hotfix][tests] move assertions out of the finally block
There was a potential for them to mask exceptions.
commit f2dcc6ed3d5b5c136e4d899375ae85c3fe1e0a3e
Author: Nico Kruber <nico@...>
Date: 2018-01-05T17:18:35Z
[FLINK-8371][network] always recycle Buffers when releasing
SpillableSubpartition
There were places where Buffer instances were not released upon
SpillableSubpartition#release() with a view attached to a non-spilled
subpartition:
1) SpillableSubpartition#buffer:
SpillableSubpartition#release() delegates the recycling to the view, but
SpillableSubpartitionView does not clean up the 'buffers' queue (the
recycling was only done by the subpartition if there was no view).
2) SpillableSubpartitionView#nextBuffer:
If this field is populated when the subpartition is released, it will
neither
be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls
(there was a short path returning 'null' here), nor was it recycled
-> similarly to the PipelinesSubpartition implementation, make
SpillableSubpartition#release() always clean up and recycle the buffers
-> recycle SpillableSubpartitionView#nextBuffer in
SpillableSubpartitionView#releaseAllResources()
----
---