Repository: tez Updated Branches: refs/heads/branch-0.8 5a6139940 -> 2e483ef2c
TEZ-3709. TezMerger is slow for high number of segments (jeagles) (cherry picked from commit 4d100b2bfb880927932ff095f2ba02780d5df01a) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e483ef2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e483ef2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e483ef2 Branch: refs/heads/branch-0.8 Commit: 2e483ef2c9302615bb345cfc576e39f44005879b Parents: 5a61399 Author: Jonathan Eagles <[email protected]> Authored: Tue May 16 10:16:13 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue May 16 10:16:59 2017 -0500 ---------------------------------------------------------------------- .../shuffle/orderedgrouped/MergeManager.java | 5 +- .../library/common/sort/impl/TezMerger.java | 77 ++++++++++++++------ 2 files changed, 60 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2e483ef2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 26bdca7..7321397 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -1005,8 +1005,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { for (MapOutput mo : inMemoryMapOutputs) { fullSize += mo.getMemory().length; } + int inMemoryMapOutputsOffset = 0; while((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) { - MapOutput mo = inMemoryMapOutputs.remove(0); + MapOutput mo = inMemoryMapOutputs.get(inMemoryMapOutputsOffset++); byte[] data = mo.getMemory(); long size = data.length; totalSize += size; @@ -1018,6 +1019,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null))); } + // Bulk remove removed in-memory map outputs efficiently + inMemoryMapOutputs.subList(0, inMemoryMapOutputsOffset).clear(); return totalSize; } http://git-wip-us.apache.org/repos/asf/tez/blob/2e483ef2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index 17e0fe2..23f2946 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.sort.impl; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -721,7 +722,7 @@ public class TezMerger { mergeProgress = mergePhase; } - long totalBytes = computeBytesInMerges(factor, inMem); + long totalBytes = computeBytesInMerges(segments, factor, inMem, considerFinalMergeForProgress); if (totalBytes != 0) { progPerByte = 1.0f / (float)totalBytes; } @@ -891,7 +892,7 @@ public class TezMerger { * number of segments - 1 to be divisible by the factor - 1 (each pass * takes X segments and produces 1) to minimize the number of merges. */ - private int getPassFactor(int factor, int passNo, int numSegments) { + private static int getPassFactor(int factor, int passNo, int numSegments) { // passNo > 1 in the OR list - is that correct ? if (passNo > 1 || numSegments <= factor || factor == 1) return factor; @@ -910,14 +911,12 @@ public class TezMerger { segments.clear(); return subList; } - - List<Segment> subList = - new ArrayList<Segment>(segments.subList(0, numDescriptors)); - // TODO Replace this with a batch operation - for (int i=0; i < numDescriptors; ++i) { - segments.remove(0); - } - return subList; + + // Efficiently bulk remove segments + List<Segment> subList = segments.subList(0, numDescriptors); + List<Segment> subListCopy = new ArrayList<>(subList); + subList.clear(); + return subListCopy; } /** @@ -925,12 +924,14 @@ public class TezMerger { * calculating mergeProgress. This simulates the above merge() method and * tries to obtain the number of bytes that are going to be merged in all * merges(assuming that there is no combiner called while merging). + * @param segments segments to compute merge bytes * @param factor mapreduce.task.io.sort.factor * @param inMem number of segments in memory to be merged + * @param considerFinalMergeForProgress whether to consider for final merge */ - long computeBytesInMerges(int factor, int inMem) { + static long computeBytesInMerges(List<Segment> segments, int factor, int inMem, boolean considerFinalMergeForProgress) { int numSegments = segments.size(); - List<Long> segmentSizes = new ArrayList<Long>(numSegments); + long[] segmentSizes = new long[numSegments]; long totalBytes = 0; int n = numSegments - inMem; // factor for 1st pass @@ -940,33 +941,67 @@ public class TezMerger { for (int i = 0; i < numSegments; i++) { // Not handling empty segments here assuming that it would not affect // much in calculation of mergeProgress. - segmentSizes.add(segments.get(i).getLength()); + segmentSizes[i] = segments.get(i).getLength(); } // If includeFinalMerge is true, allow the following while loop iterate // for 1 more iteration. This is to include final merge as part of the // computation of expected input bytes of merges boolean considerFinalMerge = considerFinalMergeForProgress; - + + int offset = 0; while (n > f || considerFinalMerge) { - if (n <=f ) { + if (n <= f) { considerFinalMerge = false; } long mergedSize = 0; - f = Math.min(f, segmentSizes.size()); + f = Math.min(f, n); for (int j = 0; j < f; j++) { - mergedSize += segmentSizes.remove(0); + mergedSize += segmentSizes[offset + j]; } totalBytes += mergedSize; // insert new size into the sorted list - int pos = Collections.binarySearch(segmentSizes, mergedSize); + int pos = Arrays.binarySearch(segmentSizes, offset, offset + n, mergedSize); if (pos < 0) { pos = -pos-1; } - segmentSizes.add(pos, mergedSize); - - n -= (f-1); + if (pos < offset + f) { + // Insert at the beginning + offset += f - 1; + segmentSizes[offset] = mergedSize; + } else if (pos < offset + n) { + // Insert in the middle + if (offset + n < segmentSizes.length) { + // Shift right after insertion point into unused capacity + System.arraycopy(segmentSizes, pos, segmentSizes, pos + 1, offset + n - pos); + // Insert into insertion point + segmentSizes[pos] = mergedSize; + offset += f; + } else { + // Full left shift before insertion point + System.arraycopy(segmentSizes, offset + f, segmentSizes, 0, pos - (offset + f)); + // Insert in the middle + segmentSizes[pos - (offset + f)] = mergedSize; + // Full left shift after insertion point + System.arraycopy(segmentSizes, pos, segmentSizes, pos - (offset + f) + 1, offset + n - pos); + offset = 0; + } + } else { + // Insert at the end + if (pos < segmentSizes.length) { + // Append into unused capacity + segmentSizes[pos] = mergedSize; + offset += f; + } else { + // Full left shift + // Append at the end + System.arraycopy(segmentSizes, offset + f, segmentSizes, 0, n - f); + segmentSizes[n - f] = mergedSize; + offset = 0; + } + } + n -= f - 1; f = factor; }
