Repository: tez Updated Branches: refs/heads/master ea05361f8 -> a47e8fcbe
TEZ-3605. Detect and prune empty partitions for the Ordered case (kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a47e8fcb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a47e8fcb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a47e8fcb Branch: refs/heads/master Commit: a47e8fcbea5eeab5a7cf812271d329524cc02dba Parents: ea05361 Author: Kuhu Shukla <[email protected]> Authored: Wed Jun 28 12:47:37 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Wed Jun 28 12:47:37 2017 -0500 ---------------------------------------------------------------------- .../common/sort/impl/PipelinedSorter.java | 107 ++++++++++------- .../common/sort/impl/dflt/DefaultSorter.java | 113 ++++++++++-------- .../common/shuffle/TestShuffleUtils.java | 2 +- .../common/sort/impl/TestPipelinedSorter.java | 70 ++++++++++- .../sort/impl/dflt/TestDefaultSorter.java | 119 ++++++++++++++++++- 5 files changed, 307 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 3d4d29b..88d10d0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -490,8 +490,10 @@ public class PipelinedSorter extends ExternalSorter { Writer writer = null; try { long segmentStart = out.getPos(); - writer = new Writer(conf, out, keyClass, valClass, codec, - spilledRecordsCounter, null, false); + if (!sendEmptyPartitionDetails || (i == partition)) { + writer = new Writer(conf, out, keyClass, valClass, codec, + spilledRecordsCounter, null, false); + } // we need not check for combiner since its a single record if (i == partition) { final long recordStart = out.getPos(); @@ -499,16 +501,18 @@ public class PipelinedSorter extends ExternalSorter { mapOutputRecordCounter.increment(1); mapOutputByteCounter.increment(out.getPos() - recordStart); } - - writer.close(); - adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength()); - + long rawLength = 0; + long partLength = 0; + if (writer != null) { + writer.close(); + rawLength = writer.getRawLength(); + partLength = writer.getCompressedLength(); + } + adjustSpillCounters(rawLength, partLength); // record offsets final TezIndexRecord rec = new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); + segmentStart, rawLength, partLength); spillRec.putIndex(rec, i); writer = null; } finally { @@ -569,28 +573,37 @@ public class PipelinedSorter extends ExternalSorter { TezRawKeyValueIterator kvIter = merger.filter(i); //write merged output to disk long segmentStart = out.getPos(); - Writer writer = - new Writer(conf, out, keyClass, valClass, codec, + Writer writer = null; + boolean hasNext = kvIter.next(); + if (hasNext || !sendEmptyPartitionDetails) { + writer = new Writer(conf, out, keyClass, valClass, codec, spilledRecordsCounter, null, merger.needsRLE()); + } if (combiner == null) { - while(kvIter.next()) { + while (hasNext) { writer.append(kvIter.getKey(), kvIter.getValue()); + hasNext = kvIter.next(); } } else { - runCombineProcessor(kvIter, writer); + if (hasNext) { + runCombineProcessor(kvIter, writer); + } } + long rawLength = 0; + long partLength = 0; //close - writer.close(); - adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength()); + if (writer != null) { + writer.close(); + rawLength = writer.getRawLength(); + partLength = writer.getCompressedLength(); + } + adjustSpillCounters(rawLength, partLength); // record offsets - final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); + final TezIndexRecord rec = + new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, i); if (!isFinalMergeEnabled() && reportPartitionStats()) { - partitionStats[i] += writer.getCompressedLength(); + partitionStats[i] += partLength; } } @@ -741,18 +754,21 @@ public class PipelinedSorter extends ExternalSorter { final TezSpillRecord spillRec = new TezSpillRecord(partitions); for (int parts = 0; parts < partitions; parts++) { + boolean shouldWrite = false; //create the segments to be merged List<Segment> segmentList = new ArrayList<Segment>(numSpills); for (int i = 0; i < numSpills; i++) { Path spillFilename = spillFilePaths.get(i); TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); - - DiskSegment s = - new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(), - indexRecord.getPartLength(), codec, ifileReadAhead, - ifileReadAheadLength, ifileBufferSize, true); - segmentList.add(i, s); + if (indexRecord.hasData() || !sendEmptyPartitionDetails) { + shouldWrite = true; + DiskSegment s = + new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(), + indexRecord.getPartLength(), codec, ifileReadAhead, + ifileReadAheadLength, ifileBufferSize, true); + segmentList.add(s); + } } int mergeFactor = @@ -771,29 +787,32 @@ public class PipelinedSorter extends ExternalSorter { null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just work. //write merged output to disk long segmentStart = finalOut.getPos(); - Writer writer = - new Writer(conf, finalOut, keyClass, valClass, codec, - spilledRecordsCounter, null, merger.needsRLE()); - if (combiner == null || numSpills < minSpillsForCombine) { - TezMerger.writeFile(kvIter, writer, progressable, - TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); - } else { - runCombineProcessor(kvIter, writer); - } + long rawLength = 0; + long partLength = 0; + if (shouldWrite) { + Writer writer = + new Writer(conf, finalOut, keyClass, valClass, codec, + spilledRecordsCounter, null, merger.needsRLE()); + if (combiner == null || numSpills < minSpillsForCombine) { + TezMerger.writeFile(kvIter, writer, progressable, + TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + } else { + runCombineProcessor(kvIter, writer); + } - //close - writer.close(); - outputBytesWithOverheadCounter.increment(writer.getRawLength()); + //close + writer.close(); + rawLength = writer.getRawLength(); + partLength = writer.getCompressedLength(); + } + outputBytesWithOverheadCounter.increment(rawLength); // record offsets final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); + new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, parts); if (reportPartitionStats()) { - partitionStats[parts] += writer.getCompressedLength(); + partitionStats[parts] += partLength; } } http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 1528076..268e237 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -901,8 +901,11 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab IFile.Writer writer = null; try { long segmentStart = out.getPos(); - writer = new Writer(conf, out, keyClass, valClass, codec, - spilledRecordsCounter, null, rle); + if (spindex < mend && kvmeta.get(offsetFor(spindex) + PARTITION) == i + || !sendEmptyPartitionDetails) { + writer = new Writer(conf, out, keyClass, valClass, codec, + spilledRecordsCounter, null, rle); + } if (combiner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); @@ -934,21 +937,22 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab runCombineProcessor(kvIter, writer); } } - + long rawLength = 0; + long partLength = 0; // close the writer - writer.close(); - adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength()); + if (writer != null) { + writer.close(); + rawLength = writer.getRawLength(); + partLength = writer.getCompressedLength(); + } + adjustSpillCounters(rawLength, partLength); // record offsets final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); + new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, i); - if (!isFinalMergeEnabled() && reportPartitionStats()) { - partitionStats[i] += writer.getCompressedLength(); + if (!isFinalMergeEnabled() && reportPartitionStats() && writer != null) { + partitionStats[i] += partLength; } - writer = null; } finally { if (null != writer) writer.close(); @@ -1003,9 +1007,10 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab try { long segmentStart = out.getPos(); // Create a new codec, don't care! - writer = new IFile.Writer(conf, out, keyClass, valClass, codec, - spilledRecordsCounter, null); - + if (!sendEmptyPartitionDetails || (i == partition)) { + writer = new Writer(conf, out, keyClass, valClass, codec, + spilledRecordsCounter, null, false); + } if (i == partition) { final long recordStart = out.getPos(); writer.append(key, value); @@ -1013,16 +1018,17 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab // compression mapOutputByteCounter.increment(out.getPos() - recordStart); } - writer.close(); - - adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength()); + long rawLength =0; + long partLength =0; + if (writer != null) { + writer.close(); + rawLength = writer.getRawLength(); + partLength = writer.getCompressedLength(); + } + adjustSpillCounters(rawLength, partLength); // record offsets - TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); + TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, i); writer = null; @@ -1265,22 +1271,23 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab if (numSpills == 0) { // TODO Change event generation to say there is no data rather than generating a dummy file //create dummy files - + long rawLength = 0; + long partLength = 0; TezSpillRecord sr = new TezSpillRecord(partitions); try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); - Writer writer = - new Writer(conf, finalOut, keyClass, valClass, codec, null, null); - writer.close(); - + if (!sendEmptyPartitionDetails) { + Writer writer = + new Writer(conf, finalOut, keyClass, valClass, codec, null, null); + writer.close(); + rawLength = writer.getRawLength(); + partLength = writer.getCompressedLength(); + } TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); + new TezIndexRecord(segmentStart, rawLength, partLength); // Covers the case of multiple spills. - outputBytesWithOverheadCounter.increment(writer.getRawLength()); + outputBytesWithOverheadCounter.increment(rawLength); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, conf); @@ -1299,19 +1306,21 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab else { final TezSpillRecord spillRec = new TezSpillRecord(partitions); for (int parts = 0; parts < partitions; parts++) { + boolean shouldWrite = false; //create the segments to be merged List<Segment> segmentList = - new ArrayList<Segment>(numSpills); - for(int i = 0; i < numSpills; i++) { + new ArrayList<Segment>(numSpills); + for (int i = 0; i < numSpills; i++) { outputContext.notifyProgress(); TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); - - DiskSegment s = - new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(), - indexRecord.getPartLength(), codec, ifileReadAhead, - ifileReadAheadLength, ifileBufferSize, true); - segmentList.add(i, s); - + if (indexRecord.hasData() || !sendEmptyPartitionDetails) { + shouldWrite = true; + DiskSegment s = + new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(), + indexRecord.getPartLength(), codec, ifileReadAhead, + ifileReadAheadLength, ifileBufferSize, true); + segmentList.add(s); + } if (LOG.isDebugEnabled()) { LOG.debug(outputContext.getDestinationVertexName() + ": " + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts + @@ -1338,6 +1347,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab //write merged output to disk long segmentStart = finalOut.getPos(); + long rawLength = 0; + long partLength = 0; + if (shouldWrite) { Writer writer = new Writer(conf, finalOut, keyClass, valClass, codec, spilledRecordsCounter, null); @@ -1348,17 +1360,16 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab runCombineProcessor(kvIter, writer); } writer.close(); - outputBytesWithOverheadCounter.increment(writer.getRawLength()); - - // record offsets - final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); + rawLength = writer.getRawLength(); + partLength = writer.getCompressedLength(); + } + outputBytesWithOverheadCounter.increment(rawLength); + // record offsets + final TezIndexRecord rec = + new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, parts); if (reportPartitionStats()) { - partitionStats[parts] += writer.getCompressedLength(); + partitionStats[parts] += partLength; } } numShuffleChunks.setValue(1); //final merge has happened http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index c461ea5..1d2d428 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -142,7 +142,7 @@ public class TestShuffleUtils { for(int i=0;i<numPartitions;i++) { long rawLen = ThreadLocalRandom.current().nextLong(100, 200); if (i % 2 == 0 || allEmptyPartitions) { - rawLen = 6; //indicates empty partition + rawLen = 0; //indicates empty partition, see TEZ-3605 } TezIndexRecord indexRecord = new TezIndexRecord(startOffset, rawLen, partLen); startOffset += partLen; http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 15fae07..f85272b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -176,9 +176,63 @@ public class TestPipelinedSorter { // final merge is disabled. Final output file would not be populated in this case. assertTrue(sorter.finalOutputFile == null); TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); - assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue()); +// assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue()); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + + } + + @Test + public void testEmptyPartitionsTwoSpillsNoEmptyEvents() throws Exception { + testEmptyPartitionsHelper(2, false); + } + + @Test + public void testEmptyPartitionsTwoSpillsWithEmptyEvents() throws Exception { + testEmptyPartitionsHelper(2, true); + } + + @Test + public void testEmptyPartitionsNoSpillsNoEmptyEvents() throws Exception { + testEmptyPartitionsHelper(0, false); + } + + @Test + public void testEmptyPartitionsNoSpillsWithEmptyEvents() throws Exception { + testEmptyPartitionsHelper(0, true); + } + + public void testEmptyPartitionsHelper(int numKeys, boolean sendEmptyPartitionDetails) throws IOException, InterruptedException { + int partitions = 50; + this.numOutputs = partitions; + this.initialAvailableMem = 1 *1024 * 1024; + Configuration conf = getConf(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionDetails); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, partitions, + initialAvailableMem); + writeData(sorter, numKeys, 1000000); + if (numKeys == 0) { + assertTrue(sorter.getNumSpills() == 1); + } else { + assertTrue(sorter.getNumSpills() == numKeys + 1); + } + verifyCounters(sorter, outputContext); + Path indexFile = sorter.getFinalIndexFile(); + TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf); + for (int i = 0; i < partitions; i++) { + TezIndexRecord tezIndexRecord = spillRecord.getIndex(i); + if (tezIndexRecord.hasData()) { + continue; + } + if (sendEmptyPartitionDetails) { + Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength()); + } else { + Assert.assertEquals("Unexpected raw length for " + i + "th partition", 6, tezIndexRecord.getRawLength()); + } + } } @Test @@ -452,10 +506,14 @@ public class TestPipelinedSorter { verifyCounters(sorter, outputContext); Path outputFile = sorter.finalOutputFile; FileSystem fs = outputFile.getFileSystem(conf); - IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096); + TezCounter finalOutputBytes = + outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); + if (finalOutputBytes.getValue() > 0) { + IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096); + verifyData(reader); + reader.close(); + } //Verify dataset - verifyData(reader); - reader.close(); verify(outputContext, atLeastOnce()).notifyProgress(); } @@ -486,11 +544,11 @@ public class TestPipelinedSorter { TezCounter finalOutputBytes = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); - assertTrue(finalOutputBytes.getValue() > 0); + assertTrue(finalOutputBytes.getValue() >= 0); TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); - assertTrue(outputBytesWithOverheadCounter.getValue() > 0); + assertTrue(outputBytesWithOverheadCounter.getValue() >= 0); } http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index b3b16d9..444ebaf 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.library.common.sort.impl.dflt; +import org.junit.Assert; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -31,6 +33,8 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -49,6 +53,8 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.runtime.api.Event; @@ -62,6 +68,8 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter; +import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; +import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; @@ -267,6 +275,56 @@ public class TestDefaultSorter { } @Test(timeout = 30000) + public void testEmptyCaseFileLengths() throws IOException { + testEmptyCaseFileLengthsHelper(50, 2, 1, 48); + testEmptyCaseFileLengthsHelper(1, 1, 10, 0); + } + + public void testEmptyCaseFileLengthsHelper(int numPartitions, int numKeys, int keyLen, int expectedEmptyPartitions) + throws IOException { + OutputContext context = createTezOutputContext(); + + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + String auxService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + DefaultSorter sorter = new DefaultSorter(context, conf, numPartitions, handler.getMemoryAssigned()); + try { + writeData(sorter, numKeys, keyLen); + List<Event> events = new ArrayList<Event>(); + String pathComponent = (context.getUniqueIdentifier() + "_" + 0); + ShuffleUtils.generateEventOnSpill(events, true, true, context, 0, + sorter.indexCacheList.get(0), 0, true, pathComponent, sorter.getPartitionStats(), + sorter.reportDetailedPartitionStats(), auxService, TezCommonUtils.newBestCompressionDeflater()); + + CompositeDataMovementEvent compositeDataMovementEvent = + (CompositeDataMovementEvent) events.get(1); + ByteBuffer bb = compositeDataMovementEvent.getUserPayload(); + ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = + ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb)); + + if (shufflePayload.hasEmptyPartitions()) { + byte[] emptyPartitionsBytesString = + TezCommonUtils.decompressByteStringToByteArray( + shufflePayload.getEmptyPartitions()); + BitSet emptyPartitionBitSet = TezUtilsInternal.fromByteArray(emptyPartitionsBytesString); + Assert.assertTrue("Number of empty partitions did not match!", + emptyPartitionBitSet.cardinality() == expectedEmptyPartitions); + } else { + Assert.assertTrue(expectedEmptyPartitions == 0); + } + //4 bytes of header + numKeys* 2 *(keydata.length + keyLength.length) + 2 * 1 byte of EOF_MARKER + 4 bytes of checksum + assertEquals("Unexpected Output File Size!", + localFs.getFileStatus(sorter.getFinalOutputFile()).getLen(), numKeys * (4 + (2 * (2 + keyLen)) + 2 + 4)); + assertTrue(sorter.getNumSpills() == 1); + verifyCounters(sorter, context); + } catch(IOException ioe) { + fail(ioe.getMessage()); + } + } + + @Test public void testWithEmptyData() throws IOException { OutputContext context = createTezOutputContext(); @@ -312,6 +370,63 @@ public class TestDefaultSorter { } } + @Test + public void testEmptyPartitions() throws Exception { + testEmptyPartitionsHelper(2, false); + testEmptyPartitionsHelper(2, true); + testEmptyPartitionsHelper(0, true); + testEmptyPartitionsHelper(0, true); + } + + public void testEmptyPartitionsHelper(int numKeys, boolean sendEmptyPartitionDetails) throws IOException { + OutputContext context = createTezOutputContext(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionDetails); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1); + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + int partitions = 50; + DefaultSorter sorter = new DefaultSorter(context, conf, partitions, handler.getMemoryAssigned()); + + writeData(sorter, numKeys, 1000000); + if (numKeys == 0) { + assertTrue(sorter.getNumSpills() == 1); + } else { + assertTrue(sorter.getNumSpills() == numKeys); + } + verifyCounters(sorter, context); + if (sorter.indexCacheList.size() != 0) { + for (int i = 0; i < sorter.getNumSpills(); i++) { + TezSpillRecord record = sorter.indexCacheList.get(i); + for (int j = 0; j < partitions; j++) { + TezIndexRecord tezIndexRecord = record.getIndex(j); + if (tezIndexRecord.hasData()) { + continue; + } + if (sendEmptyPartitionDetails) { + Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength()); + } else { + Assert.assertEquals("", tezIndexRecord.getRawLength(), 6); + } + } + } + } + Path indexFile = sorter.getFinalIndexFile(); + TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf); + for (int i = 0; i < partitions; i++) { + TezIndexRecord tezIndexRecord = spillRecord.getIndex(i); + if (tezIndexRecord.hasData()) { + continue; + } + if (sendEmptyPartitionDetails) { + Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength()); + } else { + Assert.assertEquals("Unexpected raw length for " + i + "th partition", 6, tezIndexRecord.getRawLength()); + } + } + } + void testPartitionStats(boolean withStats) throws IOException { conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, withStats); OutputContext context = createTezOutputContext(); @@ -428,11 +543,11 @@ public class TestDefaultSorter { } TezCounter finalOutputBytes = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); - assertTrue(finalOutputBytes.getValue() > 0); + assertTrue(finalOutputBytes.getValue() >= 0); TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); - assertTrue(outputBytesWithOverheadCounter.getValue() > 0); + assertTrue(outputBytesWithOverheadCounter.getValue() >= 0); verify(context, atLeastOnce()).notifyProgress(); }
