Repository: tez Updated Branches: refs/heads/branch-0.6 c5f57ccf6 -> 8fbd0ba84
TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter. (Contributed by Cyrille Chépélov) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8fbd0ba8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8fbd0ba8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8fbd0ba8 Branch: refs/heads/branch-0.6 Commit: 8fbd0ba842293713da27ca8fa39e2376398f9645 Parents: c5f57cc Author: Rajesh Balamohan <[email protected]> Authored: Thu Apr 30 04:48:40 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Apr 30 04:48:40 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../writers/UnorderedPartitionedKVWriter.java | 31 +++++++++++--------- 2 files changed, 18 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8fbd0ba8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9067c86..129f0f7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter. TEZ-2385. branch-0.6 compile failure caused by TEZ-2226. TEZ-2380. Disable fall back to reading from timeline if timeline disabled. TEZ-2226. Disable writing history to timeline if domain creation fails. http://git-wip-us.apache.org/repos/asf/tez/blob/8fbd0ba8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 1ba00a0..1203a5e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -199,7 +199,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Wrap to 4 byte (Int) boundary for metaData int mod = currentBuffer.nextPosition % INT_SIZE; int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod); - if (currentBuffer.availableSize < (META_SIZE + metaSkip)) { + if ((currentBuffer.availableSize < (META_SIZE + metaSkip)) || (currentBuffer.full)) { // Move over to the next buffer. metaSkip = 0; setupNextBuffer(); @@ -208,9 +208,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit int metaStart = currentBuffer.nextPosition; currentBuffer.availableSize -= (META_SIZE + metaSkip); currentBuffer.nextPosition += META_SIZE; - try { - keySerializer.serialize(key); - } catch (BufferTooSmallException e) { + + keySerializer.serialize(key); + + if (currentBuffer.full) { if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk. // Key too large for any buffer. Write entire record to disk. currentBuffer.reset(); @@ -224,10 +225,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit return; } } + + int valStart = currentBuffer.nextPosition; - try { - valSerializer.serialize(value); - } catch (BufferTooSmallException e) { + valSerializer.serialize(value); + + if (currentBuffer.full) { // Value too large for current buffer, or K-V too large for entire buffer. if (metaStart == 0) { // Key + Value too large for a single buffer. @@ -647,8 +650,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } public void write(byte[] b, int off, int len) throws IOException { - if (len > currentBuffer.availableSize) { - throw new BufferTooSmallException(); + if (currentBuffer.full) { + /* no longer do anything until reset */ + } else if (len > currentBuffer.availableSize) { + currentBuffer.full = true; /* stop working & signal we hit the end */ } else { System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len); currentBuffer.nextPosition += len; @@ -674,6 +679,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private int nextPosition = 0; private int availableSize; + private boolean full = false; WrappedBuffer(int numPartitions, int size) { this.partitionPositions = new int[numPartitions]; @@ -699,6 +705,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit nextPosition = 0; skipSize = 0; availableSize = size; + full = false; } void cleanup() { @@ -707,10 +714,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - private static class BufferTooSmallException extends IOException { - private static final long serialVersionUID = 1L; - } - private class SpillCallback implements FutureCallback<SpillResult> { private final int spillNumber; @@ -790,4 +793,4 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); return shufflePort; } -} \ No newline at end of file +}
