Repository: tez Updated Branches: refs/heads/branch-0.7 aa767b90e -> 0e155e718
TEZ-2575. Handle KeyValue pairs size which do not fit in a single block in PipelinedSorter (Saikat via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0e155e71 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0e155e71 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0e155e71 Branch: refs/heads/branch-0.7 Commit: 0e155e7185d1350f64dead488103777295ac76d1 Parents: aa767b9 Author: Rajesh Balamohan <[email protected]> Authored: Wed Sep 2 19:18:45 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Sep 2 19:18:45 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/PipelinedSorter.java | 130 +++++++++++++++---- .../common/sort/impl/TestPipelinedSorter.java | 111 ++++++++++++++-- 3 files changed, 206 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0e155e71/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e098f7f..63706c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2575. Handle KeyValue pairs size which do not fit in a single block in PipelinedSorter TEZ-2198. Fix sorter spill counts TEZ-2440. Sorter should check for indexCacheList.size() in flush() TEZ-2742. VertexImpl.finished() terminationCause hides member var of the http://git-wip-us.apache.org/repos/asf/tez/blob/0e155e71/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index ba909ad..d43c3a3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -209,21 +209,18 @@ public class PipelinedSorter extends ExternalSorter { SortSpan newSpan = span.next(); if(newSpan == null) { - Stopwatch stopWatch = new Stopwatch(); - stopWatch.start(); - // sort in the same thread, do not wait for the thread pool - merger.add(span.sort(sorter)); - spill(); - stopWatch.stop(); - LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); - if (pipelinedShuffle) { - List<Event> events = Lists.newLinkedList(); - String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1)); - ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, - (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, - pathComponent); - outputContext.sendEvents(events); - LOG.info("Adding spill event for spill (final update=false), spillId=" + (numSpills - 1)); + //avoid sort/spill of empty span + if (span.length() > 0) { + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + // sort in the same thread, do not wait for the thread pool + merger.add(span.sort(sorter)); + spill(); + stopWatch.stop(); + LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); + if (pipelinedShuffle) { + sendPipelinedShuffleEvents(); + } } //safe to reset the iterator listIterator = bufferList.listIterator(); @@ -252,6 +249,17 @@ public class PipelinedSorter extends ExternalSorter { keySerializer.open(span.out); } + // if pipelined shuffle is enabled, this method is called to send events for every spill + private void sendPipelinedShuffleEvents() throws IOException{ + List<Event> events = Lists.newLinkedList(); + String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1)); + ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, + (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, + pathComponent); + outputContext.sendEvents(events); + LOG.info("Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); + } + @Override public void write(Object key, Object value) throws IOException { @@ -280,13 +288,18 @@ public class PipelinedSorter extends ExternalSorter { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); } - if(span.kvmeta.remaining() < METASIZE) { + // TBD:FIX in TEZ-2574 + if (span.kvmeta.remaining() < METASIZE) { this.sort(); + if (span.length() == 0) { + spillSingleRecord(key, value, partition); + return; + } } int keystart = span.kvbuffer.position(); int valstart = -1; int valend = -1; - try { + try { keySerializer.serialize(key); valstart = span.kvbuffer.position(); valSerializer.serialize(value); @@ -295,13 +308,13 @@ public class PipelinedSorter extends ExternalSorter { // restore limit span.kvbuffer.position(keystart); this.sort(); - - bufferOverflowRecursion++; - if (bufferOverflowRecursion > bufferList.size()) { - throw new MapBufferTooSmallException("Record too large for in-memory buffer. Exceeded " - + "buffer overflow limit, bufferOverflowRecursion=" + bufferOverflowRecursion + ", bufferList" - + ".size=" + bufferList.size() + ", blockSize=" + blockSize); + if (span.length() == 0 || bufferOverflowRecursion > bufferList.size()) { + // spill the current key value pair + spillSingleRecord(key, value, partition); + bufferOverflowRecursion = 0; + return; } + bufferOverflowRecursion++; // try again this.collect(key, value, partition); return; @@ -343,6 +356,72 @@ public class PipelinedSorter extends ExternalSorter { } } + // it is guaranteed that when spillSingleRecord is called, there is + // no merger spans queued in executor. + private void spillSingleRecord(final Object key, final Object value, + int partition) throws IOException { + final TezSpillRecord spillRec = new TezSpillRecord(partitions); + // getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown + final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1); + spillFilePaths.put(numSpills, filename); + FSDataOutputStream out = rfs.create(filename, true, 4096); + + try { + LOG.info("Spilling to " + filename.toString()); + for (int i = 0; i < partitions; ++i) { + Writer writer = null; + try { + long segmentStart = out.getPos(); + writer = new Writer(conf, out, keyClass, valClass, codec, + spilledRecordsCounter, null, false); + // we need not check for combiner since its a single record + if (i == partition) { + final long recordStart = out.getPos(); + writer.append(key, value); + mapOutputRecordCounter.increment(1); + mapOutputByteCounter.increment(out.getPos() - recordStart); + } + + writer.close(); + adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength()); + + // record offsets + final TezIndexRecord rec = + new TezIndexRecord( + segmentStart, + writer.getRawLength(), + writer.getCompressedLength()); + spillRec.putIndex(rec, i); + writer = null; + } finally { + if (null != writer) { + writer.close(); + } + } + } + + Path indexFilename = + mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions + * MAP_OUTPUT_INDEX_RECORD_LENGTH); + LOG.info("Spill Index filename:" + indexFilename); + spillFileIndexPaths.put(numSpills, indexFilename); + spillRec.writeToFile(indexFilename, conf); + //TODO: honor cache limits + indexCacheList.add(spillRec); + ++numSpills; + if (!isFinalMergeEnabled()) { + fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); + //No final merge. Set the number of files offered via shuffle-handler + numShuffleChunks.setValue(numSpills); + } + if (pipelinedShuffle) { + sendPipelinedShuffleEvents(); + } + } finally { + out.close(); + } + } + public void spill() throws IOException { // create spill file final long size = capacity + @@ -373,7 +452,6 @@ public class PipelinedSorter extends ExternalSorter { //close writer.close(); adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength()); - // record offsets final TezIndexRecord rec = new TezIndexRecord( @@ -390,12 +468,12 @@ public class PipelinedSorter extends ExternalSorter { spillRec.writeToFile(indexFilename, conf); //TODO: honor cache limits indexCacheList.add(spillRec); + ++numSpills; if (!isFinalMergeEnabled()) { fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); //No final merge. Set the number of files offered via shuffle-handler numShuffleChunks.setValue(numSpills); } - ++numSpills; } catch(InterruptedException ie) { // TODO:the combiner has been interrupted } finally { @@ -629,7 +707,7 @@ public class PipelinedSorter extends ExternalSorter { ByteBuffer reserved = source.duplicate(); reserved.mark(); LOG.info("reserved.remaining() = " + reserved.remaining()); - LOG.info("reserved.size = "+ metasize); + LOG.info("reserved.metasize = "+ metasize); reserved.position(metasize); kvbuffer = reserved.slice(); reserved.flip(); http://git-wip-us.apache.org/repos/asf/tez/blob/0e155e71/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 8bf91ce..135dc78 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -138,15 +138,8 @@ public class TestPipelinedSorter { @Test public void basicTestWithSmallBlockSize() throws IOException { - try { - //3 MB key & 3 MB value, whereas block size is just 3 MB - basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20); - fail(); - } catch (IOException ioe) { - Assert.assertTrue( - ioe.getMessage().contains("Record too large for in-memory buffer." - + " Exceeded buffer overflow limit")); - } + //3 MB key & 3 MB value, whereas block size is just 3 MB + basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20); } @Test @@ -157,6 +150,77 @@ public class TestPipelinedSorter { } @Test + public void testKVExceedsBuffer() throws IOException { + // a single block of 1mb, 2KV pair, key 1mb, value 1mb + basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 1<<20); + } + + @Test + public void testKVExceedsBuffer2() throws IOException { + // a list of 4 blocks each 256kb, 2KV pair, key 1mb, value 1mb + basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<10); + } + + @Test + public void testExceedsKVWithMultiplePartitions() throws IOException { + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + this.numOutputs = 5; + this.initialAvailableMem = 1 * 1024 * 1024; + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, 0); + + writeData(sorter, 100, 1<<20); + verifyCounters(sorter, outputContext); + } + + @Test + public void testExceedsKVWithPipelinedShuffle() throws IOException { + this.numOutputs = 1; + this.initialAvailableMem = 1 *1024 * 1024; + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, 1 << 20); + + writeData(sorter, 5, 1<<20); + + // final merge is disabled. Final output file would not be populated in this case. + assertTrue(sorter.finalOutputFile == null); + TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); + assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue()); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + } + + @Test + // first write a KV which dosnt fit into span, this will spill to disk + // next write smaller keys, which will update the hint + public void testWithVariableKVLength1() throws IOException { + int numkeys[] = {2, 2}; + int keylens[] = {32 << 20, 7 << 20}; + basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20); + } + + @Test + // first write a kv pair which fits into buffer, + // next try to write a kv pair which doesnt fit into remaining buffer + public void testWithVariableKVLength() throws IOException { + //2 KVpairs of 2X2mb, 2 KV of 2X7mb + int numkeys[] = {2, 2}; + int keylens[] = {2 << 20, 7<<20}; + basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20); + } + + @Test + // first write KV which fits into span + // then write KV which doesnot fit in buffer. this will be spilled to disk + // all keys should be merged properly + public void testWithVariableKVLength2() throws IOException { + // 20 KVpairs of 2X10kb, 10 KV of 2X200kb, 20KV of 2X10kb + int numkeys[] = {20, 10, 20}; + int keylens[] = {10<<10, 200<<10, 10<<10}; + basicTest2(1, numkeys, keylens, (1 * 1024l * 1024l), 1 << 18); + } + + @Test public void testWithCustomComparator() throws IOException { //Test with custom comparator conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName()); @@ -177,9 +241,36 @@ public class TestPipelinedSorter { //final merge is disabled. Final output file would not be populated in this case. assertTrue(sorter.finalOutputFile == null); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); verify(outputContext, times(1)).sendEvents(anyListOf(Event.class)); } + public void basicTest2(int partitions, int[] numkeys, int[] keysize, + long initialAvailableMem, int blockSize) throws IOException { + this.numOutputs = partitions; // single output + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, blockSize); + writeData2(sorter, numkeys, keysize); + verifyCounters(sorter, outputContext); + } + + private void writeData2(ExternalSorter sorter, + int[] numKeys, int[] keyLen) throws IOException { + sortedDataMap.clear(); + int counter = 0; + for (int numkey : numKeys) { + int curKeyLen = keyLen[counter]; + for (int i = 0; i < numkey; i++) { + Text key = new Text(RandomStringUtils.randomAlphanumeric(curKeyLen)); + Text value = new Text(RandomStringUtils.randomAlphanumeric(curKeyLen)); + sorter.write(key, value); + } + counter++; + } + sorter.flush(); + sorter.close(); + } + public void basicTest(int partitions, int numKeys, int keySize, long initialAvailableMem, int blockSize) throws IOException { this.numOutputs = partitions; // single output @@ -188,10 +279,10 @@ public class TestPipelinedSorter { writeData(sorter, numKeys, keySize); + verifyCounters(sorter, outputContext); Path outputFile = sorter.finalOutputFile; FileSystem fs = outputFile.getFileSystem(conf); - IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096); //Verify dataset verifyData(reader);
