Repository: tez Updated Branches: refs/heads/master d5bf28bfa -> bdc0ee9c9
TEZ-3637. TezMerger logs too much at INFO level. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bdc0ee9c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bdc0ee9c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bdc0ee9c Branch: refs/heads/master Commit: bdc0ee9c9ffcc9c199e0ca4245d7084f6df943c4 Parents: d5bf28b Author: Siddharth Seth <[email protected]> Authored: Mon Feb 27 19:26:31 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Mon Feb 27 19:26:31 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../shuffle/orderedgrouped/MergeManager.java | 213 +++++++++++++++---- .../library/common/sort/impl/TezMerger.java | 56 ++--- 3 files changed, 197 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c9ac898..88b0b98 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3637. TezMerger logs too much at INFO level TEZ-3638. VertexImpl logs too much at info when removing tasks after auto-reduce parallelism TEZ-3634. reduce the default buffer sizes in PipelinedSorter by a small amount. TEZ-3627. Use queue name available in RegisterApplicationMasterResponse for publishing to ATS. http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 26bdca7..9f0e73c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.io.FileChunk; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Time; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; @@ -146,6 +147,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { private final int ifileReadAheadLength; private final int ifileBufferSize; + // Variables for stats and logging + private long lastInMemSegmentLogTime = -1L; + private final SegmentStatsTracker statsInMemTotal = new SegmentStatsTracker(); + private final SegmentStatsTracker statsInMemLastLog = new SegmentStatsTracker(); + + private AtomicInteger mergeFileSequenceId = new AtomicInteger(0); private final boolean cleanup; @@ -465,13 +472,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { unreserve(size); } + @Override public synchronized void closeInMemoryFile(MapOutput mapOutput) { inMemoryMapOutputs.add(mapOutput); - LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() - + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() - + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" + - mapOutput); + trackAndLogCloseInMemoryFile(mapOutput); commitMemory+= mapOutput.getSize(); @@ -490,6 +495,44 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } } + private void trackAndLogCloseInMemoryFile(MapOutput mapOutput) { + statsInMemTotal.updateStats(mapOutput.getSize()); + + if (LOG.isDebugEnabled()) { + LOG.debug("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() + + ", commitMemory -> " + commitMemory + ", usedMemory ->" + + usedMemory + ", mapOutput=" + + mapOutput); + } else { + statsInMemLastLog.updateStats(mapOutput.getSize()); + long now = Time.monotonicNow(); + if (now > lastInMemSegmentLogTime + 30 * 1000L) { + LOG.info( + "CloseInMemoryFile. Current state: inMemoryMapOutputs.size={}," + + " commitMemory={}," + + " usedMemory={}. Since last log:" + + " count={}," + + " min={}," + + " max={}," + + " total={}," + + " avg={}", + inMemoryMapOutputs.size(), + commitMemory, + usedMemory, + statsInMemLastLog.count, + statsInMemLastLog.minSize, + statsInMemLastLog.maxSize, + statsInMemLastLog.size, + (statsInMemLastLog.count == 0 ? "nan" : + (statsInMemLastLog.size / (double) statsInMemLastLog.count)) + ); + statsInMemLastLog.reset(); + lastInMemSegmentLogTime = now; + } + } + } + private void startMemToDiskMerge() { synchronized (inMemoryMerger) { if (!inMemoryMerger.isInProgress()) { @@ -505,9 +548,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) { inMemoryMergedMapOutputs.add(mapOutput); - LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + - ", inMemoryMergedMapOutputs.size() -> " + - inMemoryMergedMapOutputs.size()); + if (LOG.isDebugEnabled()) { + // This log could be moved to INFO level for a while, after mem-to-mem + // merge is production ready. + LOG.debug("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + + ", inMemoryMergedMapOutputs.size() -> " + + inMemoryMergedMapOutputs.size()); + } commitMemory += mapOutput.getSize(); @@ -535,9 +582,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } onDiskMapOutputs.add(file); - if (LOG.isDebugEnabled()) { - LOG.debug("close onDiskFile=" + file.getPath() + ", len=" + file.getLength()); - } + logCloseOnDiskFile(file); synchronized (onDiskMerger) { if (!onDiskMerger.isInProgress() && @@ -547,6 +592,23 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } } + private long lastOnDiskSegmentLogTime = -1L; + private void logCloseOnDiskFile(FileChunk file) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "close onDiskFile=" + file.getPath() + ", len=" + file.getLength() + + ", onDisMapOutputs=" + onDiskMapOutputs.size()); + } else { + long now = Time.monotonicNow(); + if (now > lastOnDiskSegmentLogTime + 30 * 1000L) { + LOG.info( + "close onDiskFile. State: NumOnDiskFiles={}. Current: path={}, len={}", + onDiskMapOutputs.size(), file.getPath(), file.getLength()); + lastOnDiskSegmentLogTime = now; + } + } + } + /** * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can * return an invalid state since a merge may not be in progress dur to @@ -576,6 +638,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs); onDiskMapOutputs.clear(); + if (statsInMemTotal.count > 0) { + LOG.info( + "TotalInMemFetchStats: count={}, totalSize={}, min={}, max={}, avg={}", + statsInMemTotal.count, statsInMemTotal.size, + statsInMemTotal.minSize, statsInMemTotal.maxSize, + (statsInMemTotal.size / (float) statsInMemTotal.size)); + } + // Don't attempt a final merge if close is invoked as a result of a previous // shuffle exception / error. if (tryFinalMerge) { @@ -1069,21 +1139,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { List<MapOutput> inMemoryMapOutputs, List<FileChunk> onDiskMapOutputs ) throws IOException, InterruptedException { - LOG.info("finalMerge called with " + - inMemoryMapOutputs.size() + " in-memory map-outputs and " + - onDiskMapOutputs.size() + " on-disk map-outputs"); - if (LOG.isDebugEnabled()) { - for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) { - LOG.debug("inMemoryOutput=" + inMemoryMapOutput + ", size=" + inMemoryMapOutput - .getSize()); - } - - for (FileChunk onDiskMapOutput : onDiskMapOutputs) { - LOG.debug("onDiskMapOutput=" + onDiskMapOutput.getPath() + ", size=" + onDiskMapOutput - .getLength()); - } - } + logFinalMergeStart(inMemoryMapOutputs, onDiskMapOutputs); + StringBuilder finalMergeLog = new StringBuilder(); inputContext.notifyProgress(); @@ -1148,15 +1206,25 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { // add to list of final disk outputs. onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen())); - LOG.info("Merged " + numMemDiskSegments + " segments, " + - inMemToDiskBytes + " bytes to disk to satisfy " + - "reduce memory limit. outputPath=" + outputPath); + if (LOG.isInfoEnabled()) { + finalMergeLog.append("MemMerged: " + numMemDiskSegments + ", " + inMemToDiskBytes); + if (LOG.isDebugEnabled()) { + LOG.debug("Merged " + numMemDiskSegments + "segments, size=" + + inMemToDiskBytes + " to " + outputPath); + } + } + inMemToDiskBytes = 0; memDiskSegments.clear(); } else if (inMemToDiskBytes != 0) { - LOG.info("Keeping " + numMemDiskSegments + " segments, " + - inMemToDiskBytes + " bytes in memory for " + - "intermediate, on-disk merge"); + if (LOG.isInfoEnabled()) { + finalMergeLog.append("DelayedMemMerge: " + numMemDiskSegments + ", " + inMemToDiskBytes); + if (LOG.isDebugEnabled()) { + LOG.debug("Keeping " + numMemDiskSegments + " segments, " + + inMemToDiskBytes + " bytes in memory for " + + "intermediate, on-disk merge"); + } + } } } @@ -1167,8 +1235,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { for (FileChunk fileChunk : onDisk) { final long fileLength = fileChunk.getLength(); onDiskBytes += fileLength; - LOG.info("Disk file=" + fileChunk.getPath() + ", len=" + fileLength + ", isLocal=" + - fileChunk.isLocalFile()); + if (LOG.isDebugEnabled()) { + LOG.debug("Disk file=" + fileChunk.getPath() + ", len=" + fileLength + + ", isLocal=" + + fileChunk.isLocalFile()); + } final Path file = fileChunk.getPath(); TezCounter counter = @@ -1179,8 +1250,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { diskSegments.add(new DiskSegment(fs, file, fileOffset, fileLength, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, preserve, counter)); } - LOG.info("Merging " + onDisk.length + " files, " + - onDiskBytes + " bytes from disk"); + if (LOG.isInfoEnabled()) { + finalMergeLog.append(". DiskSeg: " + onDisk.length + ", " + onDiskBytes); + if (LOG.isDebugEnabled()) { + LOG.debug("Merging " + onDisk.length + " files, " + + onDiskBytes + " bytes from disk"); + } + } Collections.sort(diskSegments, new Comparator<Segment>() { public int compare(Segment o1, Segment o2) { if (o1.getLength() == o2.getLength()) { @@ -1194,8 +1270,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { List<Segment> finalSegments = new ArrayList<Segment>(); long inMemBytes = createInMemorySegments(inMemoryMapOutputs, finalSegments, 0); - LOG.info("Merging " + finalSegments.size() + " segments, " + - inMemBytes + " bytes from memory into reduce"); + if (LOG.isInfoEnabled()) { + finalMergeLog.append(". MemSeg: " + finalSegments.size() + ", " + inMemBytes); + if (LOG.isDebugEnabled()) { + LOG.debug("Merging " + finalSegments.size() + " segments, " + + inMemBytes + " bytes from memory into reduce"); + } + } + if (0 != onDiskBytes) { final int numInMemSegments = memDiskSegments.size(); diskSegments.addAll(0, memDiskSegments); @@ -1211,6 +1293,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { finalSegments.add(new Segment( new RawKVIteratorReader(diskMerge, onDiskBytes), null)); } + if (LOG.isInfoEnabled()) { + LOG.info(finalMergeLog.toString()); + } // This is doing nothing but creating an iterator over the segments. return TezMerger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, @@ -1218,6 +1303,35 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { additionalBytesRead, null); } + + private void logFinalMergeStart(List<MapOutput> inMemoryMapOutputs, + List<FileChunk> onDiskMapOutputs) { + long inMemSegmentSize = 0; + for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) { + inMemSegmentSize += inMemoryMapOutput.getSize(); + + if (LOG.isDebugEnabled()) { + LOG.debug("finalMerge: inMemoryOutput=" + inMemoryMapOutput + ", size=" + + inMemoryMapOutput.getSize()); + } + } + long onDiskSegmentSize = 0; + for (FileChunk onDiskMapOutput : onDiskMapOutputs) { + onDiskSegmentSize += onDiskMapOutput.getLength(); + + if (LOG.isDebugEnabled()) { + LOG.debug("finalMerge: onDiskMapOutput=" + onDiskMapOutput.getPath() + + ", size=" + onDiskMapOutput.getLength()); + } + } + + LOG.info( + "finalMerge with #inMemoryOutputs={}, size={} and #onDiskOutputs={}, size={}", + inMemoryMapOutputs.size(), inMemSegmentSize, onDiskMapOutputs.size(), + onDiskSegmentSize); + + } + @VisibleForTesting long getCommitMemory() { return commitMemory; @@ -1232,4 +1346,31 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { void waitForMemToMemMerge() throws InterruptedException { memToMemMerger.waitForMerge(); } + + + + private static class SegmentStatsTracker { + private long size; + private int count; + private long minSize; + private long maxSize; + + SegmentStatsTracker() { + reset(); + } + + void updateStats(long segSize) { + size += segSize; + count++; + minSize = (segSize < minSize ? segSize : minSize); + maxSize = (segSize > maxSize ? segSize : maxSize); + } + + void reset() { + size = 0L; + count = 0; + minSize = Long.MAX_VALUE; + maxSize = Long.MIN_VALUE; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index 17e0fe2..8f3e84a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -90,32 +90,6 @@ public class TezMerger { mergePhase); } - public static - TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, - Class keyClass, Class valueClass, - CompressionCodec codec, boolean ifileReadAhead, - int ifileReadAheadLength, int ifileBufferSize, - Path[] inputs, boolean deleteInputs, - int mergeFactor, Path tmpDir, - RawComparator comparator, - Progressable reporter, - TezCounter readsCounter, - TezCounter writesCounter, - TezCounter mergedMapOutputsCounter, - TezCounter bytesReadCounter, - Progress mergePhase) - throws IOException, InterruptedException { - return - new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead, - ifileReadAheadLength, ifileBufferSize, false, comparator, - reporter, mergedMapOutputsCounter).merge( - keyClass, valueClass, - mergeFactor, tmpDir, - readsCounter, writesCounter, - bytesReadCounter, - mergePhase); - } - // Used by the in-memory merger. public static TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, @@ -225,8 +199,8 @@ public class TezMerger { } } } - if ((count > 0) && LOG.isDebugEnabled()) { - LOG.debug("writeFile SAME_KEY count=" + count); + if ((count > 0) && LOG.isTraceEnabled()) { + LOG.trace("writeFile SAME_KEY count=" + count); } } @@ -510,7 +484,9 @@ public class TezMerger { this.considerFinalMergeForProgress = considerFinalMergeForProgress; for (Path file : inputs) { - LOG.debug("MergeQ: adding: " + file); + if (LOG.isTraceEnabled()) { + LOG.trace("MergeQ: adding: " + file); + } segments.add(new DiskSegment(fs, file, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, !deleteInputs, @@ -702,11 +678,13 @@ public class TezMerger { TezCounter bytesReadCounter, Progress mergePhase) throws IOException, InterruptedException { - LOG.info("Merging " + segments.size() + " sorted segments"); if (segments.size() == 0) { LOG.info("Nothing to merge. Returning an empty iterator"); return new EmptyIterator(); } + if (LOG.isDebugEnabled()) { + LOG.debug("Merging " + segments.size() + " sorted segments"); + } /* * If there are inMemory segments, then they come first in the segments @@ -806,19 +784,23 @@ public class TezMerger { mergeProgress.set(totalBytesProcessed * progPerByte); else mergeProgress.set(1.0f); // Last pass and no segments left - we're done - - LOG.info("Down to the last merge-pass, with " + numSegments + - " segments left of total size: " + - (totalBytes - totalBytesProcessed) + " bytes"); + + if (LOG.isDebugEnabled()) { + LOG.debug("Down to the last merge-pass, with " + numSegments + + " segments left of total size: " + + (totalBytes - totalBytesProcessed) + " bytes"); + } // At this point, Factor Segments have not been physically // materialized. The merge will be done dynamically. Some of them may // be in-memory segments, other on-disk semgnets. Decision to be made // by a finalMerge is that is required. return this; } else { - LOG.info("Merging " + segmentsToMerge.size() + - " intermediate segments out of a total of " + - (segments.size()+segmentsToMerge.size())); + if (LOG.isDebugEnabled()) { + LOG.debug("Merging " + segmentsToMerge.size() + + " intermediate segments out of a total of " + + (segments.size() + segmentsToMerge.size())); + } long bytesProcessedInPrevMerges = totalBytesProcessed; totalBytesProcessed += startBytes;
