Repository: tez Updated Branches: refs/heads/master 68fe02338 -> 4ed4a5693
TEZ-3673. Allocate smaller buffers in UnorderedPartitionedKVWriter. (harishjp) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ed4a569 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ed4a569 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ed4a569 Branch: refs/heads/master Commit: 4ed4a56939d8aa330485e4c400af0adfc309242d Parents: 68fe023 Author: Harish JP <[email protected]> Authored: Fri May 5 15:42:42 2017 +0530 Committer: Harish JP <[email protected]> Committed: Fri May 5 15:42:42 2017 +0530 ---------------------------------------------------------------------- .../library/api/TezRuntimeConfiguration.java | 11 ++ .../writers/UnorderedPartitionedKVWriter.java | 191 ++++++++++++++----- .../TestUnorderedPartitionedKVWriter.java | 94 +++++++-- 3 files changed, 223 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4ed4a569/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 4d24bfb..2eec276 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -173,6 +173,16 @@ public class TezRuntimeConfiguration { public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2; /** + * Integer value. Percentage of buffer to be filled before we spill to disk. Default value is 0, + * which will spill for every buffer. + */ + @ConfigurationProperty(type="int") + public static final String TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT = + TEZ_RUNTIME_PREFIX + "unordered-partitioned-kvwriter.buffer-merge-percent"; + public static final int TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT_DEFAULT = + 0; + + /** * Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496 * This can be enabled/disabled at vertex level. * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats} @@ -590,6 +600,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS); tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); + tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT); defaultConf.addResource("core-default.xml"); defaultConf.addResource("core-site.xml"); http://git-wip-us.apache.org/repos/asf/tez/blob/4ed4a569/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index ea49118..b01d00f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -135,7 +135,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @VisibleForTesting int sizePerBuffer; @VisibleForTesting + int lastBufferSize; + @VisibleForTesting int numInitializedBuffers; + @VisibleForTesting + int spillLimit; private Throwable spillException; private AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -161,9 +165,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final long indexFileSizeEstimate; + private List<WrappedBuffer> filledBuffers = new ArrayList<>(); + public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf, int numOutputs, long availableMemoryBytes) throws IOException { super(outputContext, conf, numOutputs); + Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes"); this.deflater = TezCommonUtils.newBestCompressionDeflater(); @@ -187,7 +194,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Allow unit tests to control the buffer sizes. int maxSingleBufferSizeBytes = conf.getInt( - TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, Integer.MAX_VALUE); + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, + Integer.MAX_VALUE); computeNumBuffersAndSize(maxSingleBufferSizeBytes); availableBuffers = new LinkedBlockingQueue<WrappedBuffer>(); @@ -251,11 +259,48 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit + ", reportPartitionStats=" + reportPartitionStats); } + private static final int ALLOC_OVERHEAD = 64; private void computeNumBuffersAndSize(int bufferLimit) { - numBuffers = Math.max(2, (int) (availableMemory / bufferLimit) - + ((availableMemory % bufferLimit) == 0 ? 0 : 1)); - sizePerBuffer = (int) (availableMemory / numBuffers); + numBuffers = (int)(availableMemory / bufferLimit); + + if (numBuffers >= 2) { + sizePerBuffer = bufferLimit - ALLOC_OVERHEAD; + lastBufferSize = (int)(availableMemory % bufferLimit); + // Use leftover memory last buffer only if the leftover memory > 50% of bufferLimit + if (lastBufferSize > bufferLimit / 2) { + numBuffers += 1; + } else { + if (lastBufferSize > 0) { + LOG.warn("Underallocating memory. Unused memory size: {}.", lastBufferSize); + } + lastBufferSize = sizePerBuffer; + } + } else { + // We should have minimum of 2 buffers. + numBuffers = 2; + if (availableMemory / numBuffers > Integer.MAX_VALUE) { + sizePerBuffer = Integer.MAX_VALUE; + } else { + sizePerBuffer = (int)(availableMemory / numBuffers); + } + // 2 equal sized buffers. + lastBufferSize = sizePerBuffer; + } + // Ensure allocation size is multiple of INT_SIZE, truncate down. sizePerBuffer = sizePerBuffer - (sizePerBuffer % INT_SIZE); + lastBufferSize = lastBufferSize - (lastBufferSize % INT_SIZE); + + int mergePercent = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT, + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT_DEFAULT); + spillLimit = numBuffers * mergePercent / 100; + // Keep within limits. + if (spillLimit < 1) { + spillLimit = 1; + } + if (spillLimit > numBuffers) { + spillLimit = numBuffers; + } } @Override @@ -375,18 +420,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit currentBuffer.reset(); } else { // Update overall stats - LOG.info(destNameTrimmed + ": " + "Moving to next buffer and triggering spill"); + LOG.info(destNameTrimmed + ": " + "Moving to next buffer"); updateGlobalStats(currentBuffer); - pendingSpillCount.incrementAndGet(); - - int spillNumber = numSpills.getAndIncrement(); - ListenableFuture<SpillResult> future = spillExecutor.submit( - new SpillCallable(currentBuffer, codec, spilledRecordsCounter, spillNumber)); - Futures.addCallback(future, new SpillCallback(spillNumber)); - // Update once per buffer (instead of every record) - updateTezCountersAndNotify(); - + filledBuffers.add(currentBuffer); + if (filledBuffers.size() >= spillLimit) { + LOG.info(destNameTrimmed + ": triggering spill"); + pendingSpillCount.incrementAndGet(); + int spillNumber = numSpills.getAndIncrement(); + ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable( + new ArrayList<WrappedBuffer>(filledBuffers), codec, spilledRecordsCounter, + spillNumber)); + filledBuffers.clear(); + Futures.addCallback(future, new SpillCallback(spillNumber)); + // Update once per buffer (instead of every record) + updateTezCountersAndNotify(); + } WrappedBuffer wb = getNextAvailableBuffer(); currentBuffer = wb; } @@ -408,7 +457,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private WrappedBuffer getNextAvailableBuffer() throws IOException { if (availableBuffers.peek() == null) { if (numInitializedBuffers < numBuffers) { - buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions, sizePerBuffer); + buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions, + numInitializedBuffers == numBuffers - 1 ? lastBufferSize : sizePerBuffer); numInitializedBuffers++; return buffers[numInitializedBuffers - 1]; } else { @@ -428,16 +478,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // All spills using compression for now. private class SpillCallable extends CallableWithNdc<SpillResult> { - private final WrappedBuffer wrappedBuffer; + private final List<WrappedBuffer> filledBuffers; private final CompressionCodec codec; private final TezCounter numRecordsCounter; private int spillIndex; private SpillPathDetails spillPathDetails; private int spillNumber; - public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec, + public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec, TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) { - this.wrappedBuffer = wrappedBuffer; + this.filledBuffers = filledBuffers; this.codec = codec; this.numRecordsCounter = numRecordsCounter; this.spillIndex = spillPathDetails.spillIndex; @@ -446,9 +496,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit this.spillPathDetails = spillPathDetails; } - public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec, + public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec, TezCounter numRecordsCounter, int spillNumber) throws IOException { - this.wrappedBuffer = wrappedBuffer; + this.filledBuffers = filledBuffers; this.codec = codec; this.numRecordsCounter = numRecordsCounter; this.spillNumber = spillNumber; @@ -474,33 +524,43 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit IFile.Writer writer = null; try { long segmentStart = out.getPos(); - if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) { - // Skip empty partition. - continue; + long numRecords = 0; + for (WrappedBuffer buffer : filledBuffers) { + outputContext.notifyProgress(); + if (buffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) { + // Skip empty partition. + continue; + } + if (writer == null) { + writer = new Writer(conf, out, keyClass, valClass, codec, null, null); + } + numRecords += writePartition(buffer.partitionPositions[i], buffer, writer, key, val); } - writer = new Writer(conf, out, keyClass, valClass, codec, null, null); - long numRecords = writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, - writer, key, val); - if (numRecordsCounter != null) { - // TezCounter is not threadsafe; Since numRecordsCounter would be updated from - // multiple threads, it is good to synchronize it when incrementing it for correctness. - synchronized (numRecordsCounter) { - numRecordsCounter.increment(numRecords); + if (writer != null) { + if (numRecordsCounter != null) { + // TezCounter is not threadsafe; Since numRecordsCounter would be updated from + // multiple threads, it is good to synchronize it when incrementing it for correctness. + synchronized (numRecordsCounter) { + numRecordsCounter.increment(numRecords); + } } + writer.close(); + compressedLength += writer.getCompressedLength(); + TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), + writer.getCompressedLength()); + spillRecord.putIndex(indexRecord, i); + writer = null; } - writer.close(); - compressedLength += writer.getCompressedLength(); - TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), - writer.getCompressedLength()); - spillRecord.putIndex(indexRecord, i); - writer = null; } finally { if (writer != null) { writer.close(); } } } - spillResult = new SpillResult(compressedLength, this.wrappedBuffer); + key.close(); + val.close(); + + spillResult = new SpillResult(compressedLength, this.filledBuffers); handleSpillIndex(spillPathDetails, spillRecord); LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex); @@ -717,10 +777,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit return false; } else { updateGlobalStats(currentBuffer); + filledBuffers.add(currentBuffer); //setup output file and index file SpillPathDetails spillPathDetails = getSpillPathDetails(true, -1); - SpillCallable spillCallable = new SpillCallable(currentBuffer, codec, null, spillPathDetails); + SpillCallable spillCallable = new SpillCallable(filledBuffers, codec, null, spillPathDetails); try { SpillResult spillResult = spillCallable.call(); @@ -901,7 +962,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit sizePerPartition[i] += writer.getRawLength(); } writer.close(); - additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); + synchronized (additionalSpillBytesWritternCounter) { + additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); + } TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(), writer.getCompressedLength()); spillRecord.putIndex(indexRecord, i); @@ -1072,25 +1135,47 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @Override public void onSuccess(SpillResult result) { - spilledSize += result.spillSize; + synchronized (UnorderedPartitionedKVWriter.this) { + spilledSize += result.spillSize; + } - sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, - result.wrappedBuffer.sizePerPartition, spillNumber, false); + int recordsPerPartition[] = null; + long sizePerPartition[] = null; + if (result.filledBuffers.size() == 1) { + recordsPerPartition = result.filledBuffers.get(0).recordsPerPartition; + sizePerPartition = result.filledBuffers.get(0).sizePerPartition; + } else { + recordsPerPartition = new int[numPartitions]; + sizePerPartition = new long[numPartitions]; + for (WrappedBuffer buffer : result.filledBuffers) { + for (int i = 0; i < numPartitions; ++i) { + recordsPerPartition[i] += buffer.recordsPerPartition[i]; + sizePerPartition[i] += buffer.sizePerPartition[i]; + } + } + } - try { - result.wrappedBuffer.reset(); - availableBuffers.add(result.wrappedBuffer); + sendPipelinedEventForSpill(recordsPerPartition, sizePerPartition, spillNumber, false); + try { + for (WrappedBuffer buffer : result.filledBuffers) { + buffer.reset(); + availableBuffers.add(buffer); + } } catch (Throwable e) { - LOG.error(destNameTrimmed + ": " + "Failure while attempting to reset buffer after spill", e); + LOG.error(destNameTrimmed + ": Failure while attempting to reset buffer after spill", e); outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting to reset buffer after spill"); } if (!pipelinedShuffle) { - additionalSpillBytesWritternCounter.increment(result.spillSize); + synchronized(additionalSpillBytesWritternCounter) { + additionalSpillBytesWritternCounter.increment(result.spillSize); + } } else { - fileOutputBytesCounter.increment(indexFileSizeEstimate); - fileOutputBytesCounter.increment(result.spillSize); + synchronized(fileOutputBytesCounter) { + fileOutputBytesCounter.increment(indexFileSizeEstimate); + fileOutputBytesCounter.increment(result.spillSize); + } } spillLock.lock(); @@ -1121,11 +1206,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private static class SpillResult { final long spillSize; - final WrappedBuffer wrappedBuffer; + final List<WrappedBuffer> filledBuffers; - SpillResult(long size, WrappedBuffer wrappedBuffer) { + SpillResult(long size, List<WrappedBuffer> filledBuffers) { this.spillSize = size; - this.wrappedBuffer = wrappedBuffer; + this.filledBuffers = filledBuffers; } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ed4a569/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 41b2b97..d970b95 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -93,7 +93,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; @@ -125,7 +124,7 @@ public class TestUnorderedPartitionedKVWriter { } @SuppressWarnings("deprecation") - @Parameterized.Parameters(name = "test[{0}, {1}, {2}]") + @Parameterized.Parameters(name = "test[{0}, {1}]") public static Collection<Object[]> data() { Object[][] data = new Object[][] { { false, ReportPartitionStats.DISABLED }, @@ -162,7 +161,8 @@ public class TestUnorderedPartitionedKVWriter { String uniqueId = UUID.randomUUID().toString(); OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); - int maxSingleBufferSizeBytes = 2047; + final int maxSingleBufferSizeBytes = 2047; + final long sizePerBuffer = maxSingleBufferSizeBytes - 64 - maxSingleBufferSizeBytes % 4; Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, false, maxSingleBufferSizeBytes); @@ -170,57 +170,106 @@ public class TestUnorderedPartitionedKVWriter { UnorderedPartitionedKVWriter kvWriter = null; + // Not enough memory so divide into 2 buffers. kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048); assertEquals(2, kvWriter.numBuffers); assertEquals(1024, kvWriter.sizePerBuffer); + assertEquals(1024, kvWriter.lastBufferSize); assertEquals(1, kvWriter.numInitializedBuffers); + assertEquals(1, kvWriter.spillLimit); + // allocate exact kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, maxSingleBufferSizeBytes * 3); assertEquals(3, kvWriter.numBuffers); - assertEquals(maxSingleBufferSizeBytes - maxSingleBufferSizeBytes % 4, kvWriter.sizePerBuffer); + assertEquals(sizePerBuffer, kvWriter.sizePerBuffer); + assertEquals(sizePerBuffer, kvWriter.lastBufferSize); assertEquals(1, kvWriter.numInitializedBuffers); + assertEquals(1, kvWriter.spillLimit); + // under allocate kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, - maxSingleBufferSizeBytes * 2 + 1); + maxSingleBufferSizeBytes * 2 + maxSingleBufferSizeBytes / 2); + assertEquals(2, kvWriter.numBuffers); + assertEquals(sizePerBuffer, kvWriter.sizePerBuffer); + assertEquals(sizePerBuffer, kvWriter.lastBufferSize); + assertEquals(1, kvWriter.numInitializedBuffers); + assertEquals(1, kvWriter.spillLimit); + + // over allocate + kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, + maxSingleBufferSizeBytes * 2 + maxSingleBufferSizeBytes / 2 + 1); assertEquals(3, kvWriter.numBuffers); - assertEquals(1364, kvWriter.sizePerBuffer); + assertEquals(sizePerBuffer, kvWriter.sizePerBuffer); + assertEquals(maxSingleBufferSizeBytes / 2 + 1, kvWriter.lastBufferSize); + assertEquals(1, kvWriter.numInitializedBuffers); + assertEquals(1, kvWriter.spillLimit); + + // spill limit 1. + kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, + 4 * maxSingleBufferSizeBytes + 1); + assertEquals(4, kvWriter.numBuffers); + assertEquals(sizePerBuffer, kvWriter.sizePerBuffer); + assertEquals(sizePerBuffer, kvWriter.lastBufferSize); + assertEquals(1, kvWriter.numInitializedBuffers); + assertEquals(1, kvWriter.spillLimit); + + // spill limit 2. + conf.setInt( + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT, + 50); + kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, + 4 * maxSingleBufferSizeBytes + 1); + assertEquals(4, kvWriter.numBuffers); + assertEquals(sizePerBuffer, kvWriter.sizePerBuffer); + assertEquals(sizePerBuffer, kvWriter.lastBufferSize); assertEquals(1, kvWriter.numInitializedBuffers); + assertEquals(2, kvWriter.spillLimit); - kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 10240); - assertEquals(6, kvWriter.numBuffers); - assertEquals(1704, kvWriter.sizePerBuffer); + // Available memory is less than buffer size. + conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES); + kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, + 2048); + assertEquals(2, kvWriter.numBuffers); + assertEquals(1024, kvWriter.sizePerBuffer); + assertEquals(1024, kvWriter.lastBufferSize); assertEquals(1, kvWriter.numInitializedBuffers); + assertEquals(1, kvWriter.spillLimit); } @Test(timeout = 10000) public void testNoSpill() throws IOException, InterruptedException { - baseTest(10, 10, null, shouldCompress); + baseTest(10, 10, null, shouldCompress, -1, 0); } @Test(timeout = 10000) public void testSingleSpill() throws IOException, InterruptedException { - baseTest(50, 10, null, shouldCompress); + baseTest(50, 10, null, shouldCompress, -1, 0); } @Test(timeout = 10000) public void testMultipleSpills() throws IOException, InterruptedException { - baseTest(200, 10, null, shouldCompress); + baseTest(200, 10, null, shouldCompress, -1, 0); + } + + @Test(timeout = 10000) + public void testMergeBuffersAndSpill() throws IOException, InterruptedException { + baseTest(200, 10, null, shouldCompress, 2048, 10); } @Test(timeout = 10000) public void testNoRecords() throws IOException, InterruptedException { - baseTest(0, 10, null, shouldCompress); + baseTest(0, 10, null, shouldCompress, -1, 0); } @Test(timeout = 10000) public void testSkippedPartitions() throws IOException, InterruptedException { - baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress); + baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress, -1, 0); } @Test(timeout = 10000) public void testNoSpill_SinglePartition() throws IOException, InterruptedException { - baseTest(10, 1, null, shouldCompress); + baseTest(10, 1, null, shouldCompress, -1, 0); } @@ -703,7 +752,8 @@ public class TestUnorderedPartitionedKVWriter { private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions, - boolean shouldCompress) throws IOException, InterruptedException { + boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent) + throws IOException, InterruptedException { PartitionerForTest partitioner = new PartitionerForTest(); ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); @@ -711,7 +761,11 @@ public class TestUnorderedPartitionedKVWriter { OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, - shouldCompress, -1); + shouldCompress, maxSingleBufferSizeBytes); + conf.setInt( + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT, + bufferMergePercent); + CompressionCodec codec = null; if (shouldCompress) { codec = new DefaultCodec(); @@ -752,7 +806,7 @@ public class TestUnorderedPartitionedKVWriter { List<Event> events = kvWriter.close(); int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead; - int numExpectedSpills = numRecordsWritten / recordsPerBuffer; + int numExpectedSpills = numRecordsWritten / recordsPerBuffer / kvWriter.spillLimit; verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class), any(String.class)); @@ -801,7 +855,7 @@ public class TestUnorderedPartitionedKVWriter { assertTrue(additionalSpillBytesRead > (recordsPerBuffer * numExpectedSpills * sizePerRecord)); } } - assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead); + assertEquals(additionalSpillBytesWritten, additionalSpillBytesRead); assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue()); BitSet emptyPartitionBits = null; @@ -889,7 +943,7 @@ public class TestUnorderedPartitionedKVWriter { } private static String createRandomString(int size) { - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(size); Random random = new Random(); for (int i = 0; i < size; i++) { int r = Math.abs(random.nextInt()) % 26;
