[ 
https://issues.apache.org/jira/browse/FLINK-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316450#comment-16316450
 ] 

ASF GitHub Bot commented on FLINK-8371:
---------------------------------------

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/5261

    [FLINK-8371][network] always recycle Buffers when releasing 
SpillableSubpartition

    ## What is the purpose of the change
    
    There were places where `Buffer` instances were not released upon 
`SpillableSubpartition#release()` with a view attached to a non-spilled 
subpartition:
    
    1) `SpillableSubpartition#buffer`: `SpillableSubpartition#release()` 
delegates the recycling to the view, but `SpillableSubpartitionView` does not 
clean up the `buffers` queue (the recycling was only done by the subpartition 
if there was no view).
    2) `SpillableSubpartitionView#nextBuffer`: If this field is populated when 
the subpartition is released, it will neither be given out in subsequent 
`SpillableSubpartitionView#getNextBuffer()` calls (there was a short path 
returning `null` here), nor was it recycled
    
    - Please refer to dataArtisans/flink#3.
    - This PR is based on #5260 .
    - It should probably be applied to Flink-1.4 as well.
    
    ## Brief change log
    
    - similarly to the `PipelinesSubpartition` implementation, make 
`SpillableSubpartition#release()` always clean up and recycle the buffers
    - recycle `SpillableSubpartitionView#nextBuffer` in 
`SpillableSubpartitionView#releaseAllResources()`
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - added tests for various scenarios releasing a spillable/spilled or 
pipelined subpartition
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
      - The S3 file system connector: **no**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **not applicable**

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-8371

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5261
    
----
commit fada3022186670851a3956a961f490e6d86e2a53
Author: Nico Kruber <nico@...>
Date:   2017-12-13T14:28:08Z

    [hotfix][checkstyle] only ignore checkstyle in existing packages under 
runtime.io.network
    
    - ignore runtime.io.(async|disk)
    - ignore runtime.io.network.(api|buffer|netty|partition|serialization|util)
    -> everything else will be checked against the ruleset
    - fix checkstyle errors in classes directly under runtime.io.network

commit f8dff47a707c4e7572d02e072197927ec2ce2ef7
Author: Nico Kruber <nico@...>
Date:   2017-12-14T16:30:19Z

    [FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks
    
    This allows us to use the output flushing interval as a parameter to 
evaluate,
    too.

commit a4980ae85bac1dca9f8939e0dc3c8839991ed5e8
Author: Zhijiang <wangzhijiang999@...>
Date:   2017-08-17T11:38:45Z

    [FLINK-7468][network] Implement sender backlog logic for credit-based

commit 0da032b3e7b90da2cbee5ca6f051667add104ac6
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-01-05T14:28:40Z

    [FLINK-8375][network] Remove unnecessary synchronization
    
    Synchronized blocks in ResultPartition could affect only:
    1. totalNumberOfBuffers and totalNumberOfBytes counters
    2. subpartition add(), finish() and release() calls.
    
    However:
    1. counters were not used anywhere - they are removed by this commit
    2a. add(), finish() and release() methods for PipelinedSubpartition were 
already threads safe
    2b. add(), finish() and release() methods for SpillableSubpartition were 
made thread safe in
    this commit, by basically pushing synchronized section down one level.

commit 2938610f996361e68dedefed6c247c3547cea331
Author: Nico Kruber <nico@...>
Date:   2018-01-05T15:17:13Z

    [hotfix][tests] move assertions out of the finally block
    
    There was a potential for them to mask exceptions.

commit f2dcc6ed3d5b5c136e4d899375ae85c3fe1e0a3e
Author: Nico Kruber <nico@...>
Date:   2018-01-05T17:18:35Z

    [FLINK-8371][network] always recycle Buffers when releasing 
SpillableSubpartition
    
    There were places where Buffer instances were not released upon
    SpillableSubpartition#release() with a view attached to a non-spilled
    subpartition:
    
    1) SpillableSubpartition#buffer:
      SpillableSubpartition#release() delegates the recycling to the view, but
      SpillableSubpartitionView does not clean up the 'buffers' queue (the
      recycling was only done by the subpartition if there was no view).
    2) SpillableSubpartitionView#nextBuffer:
      If this field is populated when the subpartition is released, it will 
neither
      be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls
      (there was a short path returning 'null' here), nor was it recycled
    
    -> similarly to the PipelinesSubpartition implementation, make
       SpillableSubpartition#release() always clean up and recycle the buffers
    -> recycle SpillableSubpartitionView#nextBuffer in
       SpillableSubpartitionView#releaseAllResources()

----


> Buffers are not recycled in a non-spilled SpillableSubpartition upon release
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8371
>                 URL: https://issues.apache.org/jira/browse/FLINK-8371
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.4.0, 1.3.2
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Blocker
>
> {{SpillableSubpartition}} only recycles buffers in its {{buffer}} queue if 
> there is no view attached to it yet. If there is a view, it delegates this 
> task to the view, but {{SpillableSubpartitionView}} only instructs the 
> {{SpilledSubpartitionView}} to clean up in its {{releaseAllResources()}}.
> Similarly to the {{PipelinesSubpartition}} implementation, we should always 
> clean up and recycle the buffers in {{SpillableSubpartition#release()}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to