Repository: tez Updated Branches: refs/heads/master 8c61a6609 -> a9cfeb914
TEZ-2850. Tez MergeManager OOM for small Map Outputs (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9cfeb91 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9cfeb91 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9cfeb91 Branch: refs/heads/master Commit: a9cfeb914001c381877657ea39b5de5451740050 Parents: 8c61a66 Author: Jonathan Eagles <[email protected]> Authored: Wed Oct 21 13:15:28 2015 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed Oct 21 13:16:33 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 +++ .../runtime/library/common/sort/impl/IFile.java | 25 ++++++++++++-------- 2 files changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a9cfeb91/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2e5bbe7..0a07dde 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.2: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2850. Tez MergeManager OOM for small Map Outputs TEZ-1888. Fix javac warnings all over codebase. TEZ-2886. Ability to merge AM credentials with DAG credentials. TEZ-2896. Fix thread names used during Input/Output initialization. @@ -217,6 +218,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2850. Tez MergeManager OOM for small Map Outputs TEZ-2886. Ability to merge AM credentials with DAG credentials. TEZ-2896. Fix thread names used during Input/Output initialization. TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables @@ -500,6 +502,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2850. Tez MergeManager OOM for small Map Outputs TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails TEZ-2855. Fix a potential NPE while routing VertexManager events. TEZ-2716. DefaultSorter.isRleNeeded not thread safe http://git-wip-us.apache.org/repos/asf/tez/blob/a9cfeb91/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 8dcbf6d..20f44dd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -501,10 +501,10 @@ public class IFile { protected int recNo = 1; protected int originalKeyLength; protected int prevKeyLength; - protected int currentKeyLength; - protected int currentValueLength; byte keyBytes[] = new byte[0]; + protected int currentKeyLength; + protected int currentValueLength; long startPos; /** @@ -564,21 +564,26 @@ public class IFile { TezCounter readsCounter, TezCounter bytesReadCounter, boolean readAhead, int readAheadLength, int bufferSize, boolean isCompressed) throws IOException { - checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/); - if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); - if (decompressor != null) { - this.in = codec.createInputStream(checksumIn, decompressor); + if (in != null) { + checksumIn = new IFileInputStream(in, length, readAhead, + readAheadLength/* , isCompressed */); + if (isCompressed && codec != null) { + decompressor = CodecPool.getDecompressor(codec); + if (decompressor != null) { + this.in = codec.createInputStream(checksumIn, decompressor); + } else { + LOG.warn("Could not obtain decompressor from CodecPool"); + this.in = checksumIn; + } } else { - LOG.warn("Could not obtain decompressor from CodecPool"); this.in = checksumIn; } + startPos = checksumIn.getPosition(); } else { - this.in = checksumIn; + this.in = null; } this.dataIn = new DataInputStream(this.in); - startPos = checksumIn.getPosition(); this.readRecordsCounter = readsCounter; this.bytesReadCounter = bytesReadCounter; this.fileLength = length;
