Repository: tez Updated Branches: refs/heads/master a5179d649 -> cadf31b5e
TEZ-3680. Optimizations to UnorderedPartitionedKVWriter (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cadf31b5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cadf31b5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cadf31b5 Branch: refs/heads/master Commit: cadf31b5e21fd824c53ccfa74ac97eb8aff508b9 Parents: a5179d6 Author: Rajesh Balamohan <[email protected]> Authored: Thu Apr 20 14:49:20 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Apr 20 14:49:20 2017 +0530 ---------------------------------------------------------------------- .../writers/UnorderedPartitionedKVWriter.java | 110 ++++++++++++++----- 1 file changed, 84 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cadf31b5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 0f38a29..d8cedac 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -28,8 +28,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; @@ -45,7 +47,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; @@ -114,6 +115,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final ListeningExecutorService spillExecutor; private final int[] numRecordsPerPartition; + private long localOutputRecordBytesCounter; + private long localOutputBytesWithOverheadCounter; + private long localOutputRecordsCounter; + // notify after x records + private static final int NOTIFY_THRESHOLD = 1000; // uncompressed size for each partition private final long[] sizePerPartition; private volatile long spilledSize = 0; @@ -200,15 +206,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit valSerializer.open(dos); rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw(); - ExecutorService executor = Executors.newFixedThreadPool( - 1, + ExecutorService executor = new ThreadPoolExecutor(1, Math.max(2, numBuffers/2), + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat( - "UnorderedOutSpiller {" - + TezUtilsInternal.cleanVertexName( - outputContext.getDestinationVertexName()) + "}") - .build()); + "UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName( + outputContext.getDestinationVertexName()) + "}") + .build() + ); spillExecutor = MoreExecutors.listeningDecorator(executor); numRecordsPerPartition = new int[numPartitions]; reportPartitionStats = ReportPartitionStats.fromString( @@ -338,10 +345,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit currentBuffer.metaBuffer.put(metaIndex + INDEX_NEXT, indexNext); currentBuffer.skipSize += metaSkip; // For size estimation // Update stats on number of records - outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE)); - outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip); - outputRecordsCounter.increment(1); - outputContext.notifyProgress(); + localOutputRecordBytesCounter += (currentBuffer.nextPosition - (metaStart + META_SIZE)); + localOutputBytesWithOverheadCounter += ((currentBuffer.nextPosition - metaStart) + metaSkip); + localOutputRecordsCounter++; + if (localOutputRecordBytesCounter % NOTIFY_THRESHOLD == 0) { + updateTezCountersAndNotify(); + } currentBuffer.partitionPositions[partition] = metaStart; currentBuffer.recordsPerPartition[partition]++; currentBuffer.sizePerPartition[partition] += @@ -350,6 +359,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } + private void updateTezCountersAndNotify() { + outputRecordBytesCounter.increment(localOutputRecordBytesCounter); + outputBytesWithOverheadCounter.increment(localOutputBytesWithOverheadCounter); + outputRecordsCounter.increment(localOutputRecordsCounter); + outputContext.notifyProgress(); + localOutputRecordBytesCounter = 0; + localOutputBytesWithOverheadCounter = 0; + localOutputRecordsCounter = 0; + } + private void setupNextBuffer() throws IOException { if (currentBuffer.numRecords == 0) { @@ -361,11 +380,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit pendingSpillCount.incrementAndGet(); - SpillPathDetails spillPathDetails = getSpillPathDetails(false, -1); - ListenableFuture<SpillResult> future = spillExecutor.submit( - new SpillCallable(currentBuffer, codec, spilledRecordsCounter, spillPathDetails)); - Futures.addCallback(future, new SpillCallback(spillPathDetails.spillIndex)); + new SpillCallable(currentBuffer, codec, spilledRecordsCounter, numSpills.getAndIncrement())); + Futures.addCallback(future, new SpillCallback(numSpills.get())); + // Update once per buffer (instead of every record) + updateTezCountersAndNotify(); WrappedBuffer wb = getNextAvailableBuffer(); currentBuffer = wb; @@ -411,8 +430,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final WrappedBuffer wrappedBuffer; private final CompressionCodec codec; private final TezCounter numRecordsCounter; - private final int spillIndex; - private final SpillPathDetails spillPathDetails; + private int spillIndex; + private SpillPathDetails spillPathDetails; + private int spillNumber; public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec, TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) { @@ -425,6 +445,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit this.spillPathDetails = spillPathDetails; } + public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec, + TezCounter numRecordsCounter, int spillNumber) throws IOException { + this.wrappedBuffer = wrappedBuffer; + this.codec = codec; + this.numRecordsCounter = numRecordsCounter; + this.spillNumber = spillNumber; + } + @Override protected SpillResult callInternal() throws IOException { // This should not be called with an empty buffer. Check before invoking. @@ -432,6 +460,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Number of parallel spills determined by number of threads. // Last spill synchronization handled separately. SpillResult spillResult = null; + if (spillPathDetails == null) { + this.spillPathDetails = getSpillPathDetails(false, -1, spillNumber); + this.spillIndex = spillPathDetails.spillIndex; + } FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath); TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); DataInputBuffer key = new DataInputBuffer(); @@ -439,15 +471,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit long compressedLength = 0; for (int i = 0; i < numPartitions; i++) { IFile.Writer writer = null; - outputContext.notifyProgress(); try { long segmentStart = out.getPos(); if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) { // Skip empty partition. continue; } - writer = new Writer(conf, out, keyClass, valClass, codec, numRecordsCounter, null); - writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, writer, key, val); + writer = new Writer(conf, out, keyClass, valClass, codec, null, null); + long numRecords = writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, + writer, key, val); + if (numRecordsCounter != null) { + // TezCounter is not threadsafe; Since numRecordsCounter would be updated from + // multiple threads, it is good to synchronize it when incrementing it for correctness. + synchronized (numRecordsCounter) { + numRecordsCounter.increment(numRecords); + } + } writer.close(); compressedLength += writer.getCompressedLength(); TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), @@ -473,8 +512,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - private void writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer, + private long writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer, DataInputBuffer keyBuffer, DataInputBuffer valBuffer) throws IOException { + long numRecords = 0; while (pos != WrappedBuffer.PARTITION_ABSENT_POSITION) { int metaIndex = pos / INT_SIZE; int keyLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_KEYLEN); @@ -483,8 +523,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit valBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE + keyLength, valLength); writer.append(keyBuffer, valBuffer); + numRecords++; pos = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_NEXT); } + return numRecords; } public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) { @@ -562,6 +604,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } else { finalSpill(); } + updateTezCountersAndNotify(); cleanupCurrentBuffer(); eventList.add(generateVMEvent()); eventList.add(generateDMEvent()); @@ -575,6 +618,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, sizePerPartition, numSpills.get() - 1, true); } + updateTezCountersAndNotify(); cleanupCurrentBuffer(); return events; } @@ -697,9 +741,23 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit * @return SpillPathDetails * @throws IOException */ - private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize) throws - IOException { + private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize) + throws IOException { int spillNumber = numSpills.getAndIncrement(); + return getSpillPathDetails(isFinalSpill, expectedSpillSize, spillNumber); + } + + /** + * Set up spill output file, index file details. + * + * @param isFinalSpill + * @param expectedSpillSize + * @param spillNumber + * @return SpillPathDetails + * @throws IOException + */ + private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize, + int spillNumber) throws IOException { long spillSize = (expectedSpillSize < 0) ? (currentBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH) : expectedSpillSize; @@ -768,7 +826,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } synchronized (spillInfoList) { for (SpillInfo spillInfo : spillInfoList) { - outputContext.notifyProgress(); TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i); if (indexRecord.getPartLength() == 0) { // Skip empty partitions within a spill @@ -795,6 +852,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit writer.getCompressedLength()); writer = null; finalSpillRecord.putIndex(indexRecord, i); + outputContext.notifyProgress(); } finally { if (writer != null) { writer.close(); @@ -973,10 +1031,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private void sendPipelinedEventForSpill( BitSet emptyPartitions, long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) { - List<Event> eventList = Lists.newLinkedList(); if (!pipelinedShuffle) { return; } + List<Event> eventList = Lists.newLinkedList(); //Send out an event for consuming. try { String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
