Repository: tez Updated Branches: refs/heads/master a2ba95043 -> d5e65e207
TEZ-3701. UnorderedPartitionedKVWriter - issues with parallel Deflater usage, synchronousqueue in threadpool (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5e65e20 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5e65e20 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5e65e20 Branch: refs/heads/master Commit: d5e65e207492ee1663a737e2411dbb75e07b891f Parents: a2ba950 Author: Rajesh Balamohan <[email protected]> Authored: Fri Jun 2 07:28:58 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Fri Jun 2 07:28:58 2017 +0530 ---------------------------------------------------------------------- .../writers/UnorderedPartitionedKVWriter.java | 106 +++++++++++++++---- .../TestUnorderedPartitionedKVWriter.java | 23 +++- 2 files changed, 102 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d5e65e20/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 6c65ca2..bcc9cf9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -124,7 +125,23 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // uncompressed size for each partition private final long[] sizePerPartition; private volatile long spilledSize = 0; - private final Deflater deflater; + + static final ThreadLocal<Deflater> deflater = new ThreadLocal<Deflater>() { + + @Override + public Deflater initialValue() { + return TezCommonUtils.newBestCompressionDeflater(); + } + + @Override + public Deflater get() { + Deflater deflater = super.get(); + deflater.reset(); + return deflater; + } + }; + + private final Semaphore availableSlots; /** * Represents final number of records written (spills are not counted) @@ -174,7 +191,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes"); - this.deflater = TezCommonUtils.newBestCompressionDeflater(); this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()); //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in // this case. Add it later if needed. @@ -215,16 +231,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit valSerializer.open(dos); rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw(); - ExecutorService executor = new ThreadPoolExecutor(1, Math.max(2, numBuffers/2), + int maxThreads = Math.max(2, numBuffers/2); + //TODO: Make use of TezSharedExecutor later + ExecutorService executor = new ThreadPoolExecutor(1, maxThreads, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat( "UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName( - outputContext.getDestinationVertexName()) + "}") + outputContext.getDestinationVertexName()) + "} #%d") .build() ); + // to restrict submission of more tasks than threads (e.g numBuffers > numThreads) + // This is maxThreads - 1, to avoid race between callback thread releasing semaphore and the + // thread calling tryAcquire. + availableSlots = new Semaphore(maxThreads - 1, true); spillExecutor = MoreExecutors.listeningDecorator(executor); numRecordsPerPartition = new int[numPartitions]; reportPartitionStats = ReportPartitionStats.fromString( @@ -428,23 +450,56 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit updateGlobalStats(currentBuffer); filledBuffers.add(currentBuffer); - if (filledBuffers.size() >= spillLimit) { - if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) { - LOG.info(destNameTrimmed + ": triggering spill"); + mayBeSpill(false); + + currentBuffer = getNextAvailableBuffer(); + + // in case spill threads are free, check if spilling is needed + mayBeSpill(false); + } + } + + private void mayBeSpill(boolean shouldBlock) throws IOException { + if (filledBuffers.size() >= spillLimit) { + // Do not block; possible that there are more buffers + scheduleSpill(shouldBlock); + } + } + + private boolean scheduleSpill(boolean block) throws IOException { + if (filledBuffers.isEmpty()) { + return false; + } + + try { + if (block) { + availableSlots.acquire(); + } else { + if (!availableSlots.tryAcquire()) { + // Data in filledBuffers would be spilled in subsequent iteration. + return false; } - pendingSpillCount.incrementAndGet(); - int spillNumber = numSpills.getAndIncrement(); - ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable( - new ArrayList<WrappedBuffer>(filledBuffers), codec, spilledRecordsCounter, - spillNumber)); - filledBuffers.clear(); - Futures.addCallback(future, new SpillCallback(spillNumber)); - // Update once per buffer (instead of every record) - updateTezCountersAndNotify(); } - WrappedBuffer wb = getNextAvailableBuffer(); - currentBuffer = wb; + + final int filledBufferCount = filledBuffers.size(); + if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) { + LOG.info(destNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount); + } + pendingSpillCount.incrementAndGet(); + int spillNumber = numSpills.getAndIncrement(); + + ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable( + new ArrayList<WrappedBuffer>(filledBuffers), codec, spilledRecordsCounter, + spillNumber)); + filledBuffers.clear(); + Futures.addCallback(future, new SpillCallback(spillNumber)); + // Update once per buffer (instead of every record) + updateTezCountersAndNotify(); + return true; + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); // reset interrupt status } + return false; } private boolean reportPartitionStats() { @@ -470,6 +525,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } else { // All buffers initialized, and none available right now. Wait try { + // Ensure that spills are triggered so that buffers can be released. + mayBeSpill(true); return availableBuffers.take(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -610,6 +667,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @Override public List<Event> close() throws IOException, InterruptedException { + // In case there are buffers to be spilled, schedule spilling + scheduleSpill(true); List<Event> eventList = Lists.newLinkedList(); isShutdown.set(true); spillLock.lock(); @@ -708,7 +767,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private Event generateVMEvent() throws IOException { return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition, - this.reportDetailedPartitionStats(), deflater); + this.reportDetailedPartitionStats(), deflater.get()); } private Event generateDMEvent() throws IOException { @@ -728,7 +787,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (emptyPartitions.cardinality() != 0) { // Empty partitions exist ByteString emptyPartitionsByteString = - TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions), deflater); + TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray + (emptyPartitions), deflater.get()); payloadBuilder.setEmptyPartitions(emptyPartitionsByteString); } @@ -772,7 +832,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit List<Event> eventList = Lists.newLinkedList(); eventList.add(ShuffleUtils.generateVMEvent(outputContext, reportPartitionStats() ? new long[numPartitions] : null, - reportDetailedPartitionStats(), deflater)); + reportDetailedPartitionStats(), deflater.get())); //Send final event with all empty partitions and null path component. BitSet emptyPartitions = new BitSet(numPartitions); emptyPartitions.flip(0, numPartitions); @@ -1110,7 +1170,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); if (isFinalUpdate) { eventList.add(ShuffleUtils.generateVMEvent(outputContext, - sizePerPartition, reportDetailedPartitionStats(), deflater)); + sizePerPartition, reportDetailedPartitionStats(), deflater.get())); } Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, pathComponent, emptyPartitions); @@ -1191,6 +1251,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } finally { spillLock.unlock(); + availableSlots.release(); } } @@ -1206,6 +1267,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit spillInProgress.signal(); } finally { spillLock.unlock(); + availableSlots.release(); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/d5e65e20/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 27e7992..1a0bbf0 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -259,6 +259,12 @@ public class TestUnorderedPartitionedKVWriter { } @Test(timeout = 10000) + public void testMultipleSpillsWithSmallBuffer() throws IOException, InterruptedException { + // numBuffers is much higher than available threads. + baseTest(200, 10, null, shouldCompress, 512, 0, 9600); + } + + @Test(timeout = 10000) public void testMergeBuffersAndSpill() throws IOException, InterruptedException { baseTest(200, 10, null, shouldCompress, 2048, 10); } @@ -702,8 +708,8 @@ public class TestUnorderedPartitionedKVWriter { } else { assertEquals(0, fileOutputBytes); } - assertEquals(recordsPerBuffer * numExpectedSpills, - spilledRecordsCounter.getValue()); + // due to multiple threads, buffers could be merged in chunks in scheduleSpill. + assertTrue(recordsPerBuffer * numExpectedSpills >= spilledRecordsCounter.getValue()); long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue(); long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue(); @@ -776,9 +782,16 @@ public class TestUnorderedPartitionedKVWriter { } } - private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions, boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent) + throws IOException, InterruptedException { + baseTest(numRecords, numPartitions, skippedPartitions, shouldCompress, + maxSingleBufferSizeBytes, bufferMergePercent, 2048); + } + + private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions, + boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent, int + availableMemory) throws IOException, InterruptedException { PartitionerForTest partitioner = new PartitionerForTest(); ApplicationId appId = ApplicationId.newInstance(10000000, 1); @@ -802,7 +815,6 @@ public class TestUnorderedPartitionedKVWriter { } int numOutputs = numPartitions; - long availableMemory = 2048; int numRecordsWritten = 0; Map<Integer, Multimap<Integer, Long>> expectedValues = new HashMap<Integer, Multimap<Integer, Long>>(); @@ -885,7 +897,8 @@ public class TestUnorderedPartitionedKVWriter { } } assertEquals(additionalSpillBytesWritten, additionalSpillBytesRead); - assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue()); + // due to multiple threads, buffers could be merged in chunks in scheduleSpill. + assertTrue(numExpectedSpills >= numAdditionalSpillsCounter.getValue()); BitSet emptyPartitionBits = null; // Verify the events returned
