Repository: tez Updated Branches: refs/heads/master 6b6834e82 -> 1f6b474cb
TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter. (Contributed by Cyrille Chépélov) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1f6b474c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1f6b474c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1f6b474c Branch: refs/heads/master Commit: 1f6b474cb70e0c0781a0a8951f872812fc264bbb Parents: 6b6834e Author: Rajesh Balamohan <[email protected]> Authored: Thu Apr 30 04:42:50 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Apr 30 04:42:50 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../writers/UnorderedPartitionedKVWriter.java | 29 +++++++++++--------- 2 files changed, 17 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1f6b474c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6e7eb7b..940d257 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter TEZ-2084. Tez UI: Stacktrace format info is lost in diagnostics TEZ-2374. Fix build break against hadoop-2.2 due to TEZ-2325. TEZ-2314. Tez task attempt failures due to bad event serialization http://git-wip-us.apache.org/repos/asf/tez/blob/1f6b474c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 7aac6f9..37d8be6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -250,7 +250,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Wrap to 4 byte (Int) boundary for metaData int mod = currentBuffer.nextPosition % INT_SIZE; int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod); - if (currentBuffer.availableSize < (META_SIZE + metaSkip)) { + if ((currentBuffer.availableSize < (META_SIZE + metaSkip)) || (currentBuffer.full)) { // Move over to the next buffer. metaSkip = 0; setupNextBuffer(); @@ -259,9 +259,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit int metaStart = currentBuffer.nextPosition; currentBuffer.availableSize -= (META_SIZE + metaSkip); currentBuffer.nextPosition += META_SIZE; - try { - keySerializer.serialize(key); - } catch (BufferTooSmallException e) { + + keySerializer.serialize(key); + + if (currentBuffer.full) { if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk. // Key too large for any buffer. Write entire record to disk. currentBuffer.reset(); @@ -275,10 +276,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit return; } } + + int valStart = currentBuffer.nextPosition; - try { - valSerializer.serialize(value); - } catch (BufferTooSmallException e) { + valSerializer.serialize(value); + + if (currentBuffer.full) { // Value too large for current buffer, or K-V too large for entire buffer. if (metaStart == 0) { // Key + Value too large for a single buffer. @@ -824,8 +827,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } public void write(byte[] b, int off, int len) throws IOException { - if (len > currentBuffer.availableSize) { - throw new BufferTooSmallException(); + if (currentBuffer.full) { + /* no longer do anything until reset */ + } else if (len > currentBuffer.availableSize) { + currentBuffer.full = true; /* stop working & signal we hit the end */ } else { System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len); currentBuffer.nextPosition += len; @@ -851,6 +856,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private int nextPosition = 0; private int availableSize; + private boolean full = false; WrappedBuffer(int numPartitions, int size) { this.partitionPositions = new int[numPartitions]; @@ -876,6 +882,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit nextPosition = 0; skipSize = 0; availableSize = size; + full = false; } void cleanup() { @@ -884,10 +891,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - private static class BufferTooSmallException extends IOException { - private static final long serialVersionUID = 1L; - } - private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean isFinalUpdate) { if (!pipelinedShuffle) { return;
