[
https://issues.apache.org/jira/browse/FLINK-2655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739949#comment-14739949
]
ASF GitHub Bot commented on FLINK-2655:
---------------------------------------
Github user HuangWHWHW commented on a diff in the pull request:
https://github.com/apache/flink/pull/1118#discussion_r39232369
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
---
@@ -1525,34 +1525,41 @@ protected final void disposeSortBuffers(boolean
releaseMemory) {
final List<MemorySegment>
allReadBuffers, final List<MemorySegment> writeBuffers)
throws IOException
{
- final double numMerges = Math.ceil(channelIDs.size() /
((double) this.maxFanIn));
- final int channelsToMergePerStep = (int)
Math.ceil(channelIDs.size() / numMerges);
-
+ // A channel list with length maxFanIn<sup>i</sup> can
be merged to maxFanIn files in i-1 rounds where every merge
+ // is a full merge with maxFanIn input channels. A
partial round includes merges with fewer than maxFanIn
+ // inputs. It is most efficient to perform the partial
round first.
+ final double scale =
Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1;
+
+ final int numStart = channelIDs.size();
+ final int numEnd = (int) Math.pow(this.maxFanIn, scale);
+
+ final int numMerges = (int) Math.ceil((numStart -
numEnd) / (double) (this.maxFanIn - 1));
--- End diff --
Are you sure the `this.maxFanIn` will never equal 1(avoid dividing 0)?
> Minimize intermediate merging of spilled buffers
> ------------------------------------------------
>
> Key: FLINK-2655
> URL: https://issues.apache.org/jira/browse/FLINK-2655
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime
> Affects Versions: master
> Reporter: Greg Hogan
>
> If the number of spilled buffers exceeds taskmanager.runtime.max-fan then the
> number of files must reduced with an intermediate merge by reading, merging,
> and spilling into a single, larger file.
> The current implementation performs an intermediate merge on all files. An
> optimal implementation minimizes the amount of merged data by performing
> partial merges first.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)