Repository: tez Updated Branches: refs/heads/branch-0.6 3a8bf0e0a -> f36eeb673
TEZ-2850. Tez MergeManager OOM for small Map Outputs (jeagles) (cherry picked from commit a9cfeb914001c381877657ea39b5de5451740050) (cherry picked from commit 35bd5ab0b58fa4bd2b9162c14baa7e94e22ba00c) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f36eeb67 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f36eeb67 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f36eeb67 Branch: refs/heads/branch-0.6 Commit: f36eeb6736e71009d3d3e57f000c1d135bdc7119 Parents: 3a8bf0e Author: Jonathan Eagles <[email protected]> Authored: Wed Oct 21 13:15:28 2015 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed Oct 21 13:28:07 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/library/common/sort/impl/IFile.java | 26 +++++++++++--------- 2 files changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f36eeb67/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 62a4929..0356b75 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2850. Tez MergeManager OOM for small Map Outputs TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails TEZ-2855. Fix a potential NPE while routing VertexManager events. TEZ-2716. DefaultSorter.isRleNeeded not thread safe. http://git-wip-us.apache.org/repos/asf/tez/blob/f36eeb67/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index f075772..1e1c547 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -506,10 +506,10 @@ public class IFile { protected int recNo = 1; protected int originalKeyLength; protected int prevKeyLength; - protected int currentKeyLength; - protected int currentValueLength; byte keyBytes[] = new byte[0]; + protected int currentKeyLength; + protected int currentValueLength; long startPos; protected boolean isCompressed = false; @@ -570,22 +570,26 @@ public class IFile { TezCounter readsCounter, TezCounter bytesReadCounter, boolean readAhead, int readAheadLength, int bufferSize, boolean isCompressed) throws IOException { - this.isCompressed = isCompressed; - checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/); - if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); - if (decompressor != null) { - this.in = codec.createInputStream(checksumIn, decompressor); + if (in != null) { + checksumIn = new IFileInputStream(in, length, readAhead, + readAheadLength/* , isCompressed */); + if (isCompressed && codec != null) { + decompressor = CodecPool.getDecompressor(codec); + if (decompressor != null) { + this.in = codec.createInputStream(checksumIn, decompressor); + } else { + LOG.warn("Could not obtain decompressor from CodecPool"); + this.in = checksumIn; + } } else { - LOG.warn("Could not obtain decompressor from CodecPool"); this.in = checksumIn; } + startPos = checksumIn.getPosition(); } else { - this.in = checksumIn; + this.in = null; } this.dataIn = new DataInputStream(this.in); - startPos = checksumIn.getPosition(); this.readRecordsCounter = readsCounter; this.bytesReadCounter = bytesReadCounter; this.fileLength = length;
