[ https://issues.apache.org/jira/browse/FLINK-2655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14740428#comment-14740428 ]
ASF GitHub Bot commented on FLINK-2655: --------------------------------------- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/1118#discussion_r39252417 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java --- @@ -1525,34 +1525,41 @@ protected final void disposeSortBuffers(boolean releaseMemory) { final List<MemorySegment> allReadBuffers, final List<MemorySegment> writeBuffers) throws IOException { - final double numMerges = Math.ceil(channelIDs.size() / ((double) this.maxFanIn)); - final int channelsToMergePerStep = (int) Math.ceil(channelIDs.size() / numMerges); - + // A channel list with length maxFanIn<sup>i</sup> can be merged to maxFanIn files in i-1 rounds where every merge + // is a full merge with maxFanIn input channels. A partial round includes merges with fewer than maxFanIn + // inputs. It is most efficient to perform the partial round first. + final double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1; + + final int numStart = channelIDs.size(); + final int numEnd = (int) Math.pow(this.maxFanIn, scale); + + final int numMerges = (int) Math.ceil((numStart - numEnd) / (double) (this.maxFanIn - 1)); --- End diff -- Ah, thanks for this info :) > Minimize intermediate merging of spilled buffers > ------------------------------------------------ > > Key: FLINK-2655 > URL: https://issues.apache.org/jira/browse/FLINK-2655 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Affects Versions: master > Reporter: Greg Hogan > > If the number of spilled buffers exceeds taskmanager.runtime.max-fan then the > number of files must reduced with an intermediate merge by reading, merging, > and spilling into a single, larger file. > The current implementation performs an intermediate merge on all files. An > optimal implementation minimizes the amount of merged data by performing > partial merges first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)