[ 
https://issues.apache.org/jira/browse/FLINK-2655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14740428#comment-14740428
 ] 

ASF GitHub Bot commented on FLINK-2655:
---------------------------------------

Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1118#discussion_r39252417
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 ---
    @@ -1525,34 +1525,41 @@ protected final void disposeSortBuffers(boolean 
releaseMemory) {
                                        final List<MemorySegment> 
allReadBuffers, final List<MemorySegment> writeBuffers)
                throws IOException
                {
    -                   final double numMerges = Math.ceil(channelIDs.size() / 
((double) this.maxFanIn));
    -                   final int channelsToMergePerStep = (int) 
Math.ceil(channelIDs.size() / numMerges);
    -                   
    +                   // A channel list with length maxFanIn<sup>i</sup> can 
be merged to maxFanIn files in i-1 rounds where every merge
    +                   // is a full merge with maxFanIn input channels. A 
partial round includes merges with fewer than maxFanIn
    +                   // inputs. It is most efficient to perform the partial 
round first.
    +                   final double scale = 
Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1;
    +
    +                   final int numStart = channelIDs.size();
    +                   final int numEnd = (int) Math.pow(this.maxFanIn, scale);
    +
    +                   final int numMerges = (int) Math.ceil((numStart - 
numEnd) / (double) (this.maxFanIn - 1));
    --- End diff --
    
    Ah, thanks for this info :)


> Minimize intermediate merging of spilled buffers
> ------------------------------------------------
>
>                 Key: FLINK-2655
>                 URL: https://issues.apache.org/jira/browse/FLINK-2655
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>    Affects Versions: master
>            Reporter: Greg Hogan
>
> If the number of spilled buffers exceeds taskmanager.runtime.max-fan then the 
> number of files must reduced with an intermediate merge by reading, merging, 
> and spilling into a single, larger file.
> The current implementation performs an intermediate merge on all files. An 
> optimal implementation minimizes the amount of merged data by performing 
> partial merges first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to