Repository: tez Updated Branches: refs/heads/master 24475acc7 -> b3e20c7c6
TEZ-3769. Unordered: Fix wrong stats being sent out in the last event, when final merge is disabled (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b3e20c7c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b3e20c7c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b3e20c7c Branch: refs/heads/master Commit: b3e20c7c69125645797674d20c01802e9a409558 Parents: 24475ac Author: Rajesh Balamohan <[email protected]> Authored: Wed Jun 28 13:37:09 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Jun 28 13:37:09 2017 +0530 ---------------------------------------------------------------------- .../writers/UnorderedPartitionedKVWriter.java | 134 +++++---- .../TestUnorderedPartitionedKVWriter.java | 273 ++++++++++++++++++- 2 files changed, 352 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b3e20c7c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 70c577c..6ea0385 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -172,7 +172,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit //for single partition cases (e.g UnorderedKVOutput) private final IFile.Writer writer; - private final boolean skipBuffers; + @VisibleForTesting + final boolean skipBuffers; private final ReentrantLock spillLock = new ReentrantLock(); private final Condition spillInProgress = spillLock.newCondition(); @@ -285,6 +286,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit + "numBuffers=" + numBuffers + ", sizePerBuffer=" + sizePerBuffer + ", skipBuffers=" + skipBuffers + + ", numPartitions=" + numPartitions + + ", availableMemory=" + availableMemory + + ", maxSingleBufferSizeBytes=" + maxSingleBufferSizeBytes + ", pipelinedShuffle=" + pipelinedShuffle + ", isFinalMergeEnabled=" + isFinalMergeEnabled + ", numPartitions=" + numPartitions @@ -558,17 +562,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec, TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) { - this.filledBuffers = filledBuffers; - this.codec = codec; - this.numRecordsCounter = numRecordsCounter; - this.spillIndex = spillPathDetails.spillIndex; + this(filledBuffers, codec, numRecordsCounter, spillPathDetails.spillIndex); Preconditions.checkArgument(spillPathDetails.outputFilePath != null, "Spill output file " + "path can not be null"); this.spillPathDetails = spillPathDetails; } public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec, - TezCounter numRecordsCounter, int spillNumber) throws IOException { + TezCounter numRecordsCounter, int spillNumber) { this.filledBuffers = filledBuffers; this.codec = codec; this.numRecordsCounter = numRecordsCounter; @@ -720,47 +721,66 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit emptyPartitions.set(0); } if (reportPartitionStats()) { - sizePerPartition[0] = rawLen; + if (outputRecordsCounter.getValue() > 0) { + sizePerPartition[0] = rawLen; + } } cleanupCurrentBuffer(); - outputBytesWithOverheadCounter.increment(rawLen); - fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate); + if (outputRecordsCounter.getValue() > 0) { + outputBytesWithOverheadCounter.increment(rawLen); + fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate); + } eventList.add(generateVMEvent()); eventList.add(generateDMEvent(false, -1, false, outputContext .getUniqueIdentifier(), emptyPartitions)); return eventList; } - //Regular code path. - boolean updatedCounters = false; - if (numSpills.get() > 0 && isFinalMergeEnabled) { - mergeAll(); - } else { - if (finalSpill() && !isFinalMergeEnabled) { - //final spill generated some data. Add it to final events - updateTezCountersAndNotify(); - updatedCounters = true; - finalEvents.add(generateVMEvent()); - finalEvents.add(generateDMEvent()); + /* + 1. Final merge enabled + - When lots of spills are there, mergeAll, generate events and return + - If there are no existing spills, check for final spill and generate events + 2. Final merge disabled + - If finalSpill generated data, generate events and return + - If finalSpill did not generate data, it would automatically populate events + */ + if (isFinalMergeEnabled) { + if (numSpills.get() > 0) { + mergeAll(); + } else { + finalSpill(); } - } - if (!updatedCounters) { updateTezCountersAndNotify(); - } - cleanupCurrentBuffer(); - if (isFinalMergeEnabled) { eventList.add(generateVMEvent()); eventList.add(generateDMEvent()); } else { - //all events to be sent out are added in finalEvents. + // if no data is generated, finalSpill would create VMEvent & add to finalEvents + SpillResult result = finalSpill(); + if (result != null) { + updateTezCountersAndNotify(); + // Generate vm event + finalEvents.add(generateVMEvent()); + + // compute empty partitions based on spill result and generate DME + int spillNum = numSpills.get() - 1; + SpillCallback callback = new SpillCallback(spillNum); + callback.computePartitionStats(result); + BitSet emptyPartitions = getEmptyPartitions(callback.getRecordsPerPartition()); + String pathComponent = generatePathComponent(outputContext.getUniqueIdentifier(), spillNum); + Event finalEvent = generateDMEvent(true, spillNum, + true, pathComponent, emptyPartitions); + finalEvents.add(finalEvent); + } + //all events to be sent out are in finalEvents. eventList.addAll(finalEvents); } + cleanupCurrentBuffer(); return eventList; } //For pipelined case, send out an event in case finalspill generated a spill file. - if (finalSpill()) { + if (finalSpill() != null) { // VertexManagerEvent is only sent at the end and thus sizePerPartition is used // for the sum of all spills. mayBeSendEventsForSpill(currentBuffer.recordsPerPartition, @@ -848,41 +868,44 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit availableBuffers.clear(); } - private boolean finalSpill() throws IOException { + private SpillResult finalSpill() throws IOException { if (currentBuffer.nextPosition == 0) { if (pipelinedShuffle || !isFinalMergeEnabled) { List<Event> eventList = Lists.newLinkedList(); eventList.add(ShuffleUtils.generateVMEvent(outputContext, reportPartitionStats() ? new long[numPartitions] : null, - reportDetailedPartitionStats(), deflater.get())); - //Send final event with all empty partitions and null path component. - BitSet emptyPartitions = new BitSet(numPartitions); - emptyPartitions.flip(0, numPartitions); - eventList.add(generateDMEvent(true, numSpills.get(), true, - null, emptyPartitions)); + reportDetailedPartitionStats(), deflater.get())); + if (localOutputRecordsCounter == 0 && outputLargeRecordsCounter.getValue() == 0) { + // Should send this event (all empty partitions) only when no records are written out. + BitSet emptyPartitions = new BitSet(numPartitions); + emptyPartitions.flip(0, numPartitions); + eventList.add(generateDMEvent(true, numSpills.get(), true, + null, emptyPartitions)); + } if (pipelinedShuffle) { outputContext.sendEvents(eventList); } else if (!isFinalMergeEnabled) { - finalEvents.addAll(eventList); + finalEvents.addAll(0, eventList); } } - return false; + return null; } else { updateGlobalStats(currentBuffer); filledBuffers.add(currentBuffer); //setup output file and index file SpillPathDetails spillPathDetails = getSpillPathDetails(true, -1); - SpillCallable spillCallable = new SpillCallable(filledBuffers, codec, null, spillPathDetails); + SpillCallable spillCallable = new SpillCallable(filledBuffers, + codec, null, spillPathDetails); try { SpillResult spillResult = spillCallable.call(); fileOutputBytesCounter.increment(spillResult.spillSize); fileOutputBytesCounter.increment(indexFileSizeEstimate); + return spillResult; } catch (Exception ex) { throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); } - return true; } } @@ -934,8 +957,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit indexFilePath = outputFileHandler.getSpillIndexFileForWrite(spillNumber, indexFileSizeEstimate); } - SpillPathDetails spillDetails = new SpillPathDetails(outputFilePath, indexFilePath, spillNumber); - return spillDetails; + return new SpillPathDetails(outputFilePath, indexFilePath, spillNumber); } private void mergeAll() throws IOException { @@ -1184,12 +1206,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } + private String generatePathComponent(String uniqueId, int spillNumber) { + return (uniqueId + "_" + spillNumber); + } + private List<Event> generateEventForSpill(BitSet emptyPartitions, long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) throws IOException { List<Event> eventList = Lists.newLinkedList(); //Send out an event for consuming. - String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); + String pathComponent = generatePathComponent(outputContext.getUniqueIdentifier(), spillNumber); if (isFinalUpdate) { eventList.add(ShuffleUtils.generateVMEvent(outputContext, sizePerPartition, reportDetailedPartitionStats(), deflater.get())); @@ -1237,19 +1263,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private class SpillCallback implements FutureCallback<SpillResult> { private final int spillNumber; + private int recordsPerPartition[]; + private long sizePerPartition[]; SpillCallback(int spillNumber) { this.spillNumber = spillNumber; } - @Override - public void onSuccess(SpillResult result) { - synchronized (UnorderedPartitionedKVWriter.this) { - spilledSize += result.spillSize; - } - - int recordsPerPartition[] = null; - long sizePerPartition[] = null; + void computePartitionStats(SpillResult result) { if (result.filledBuffers.size() == 1) { recordsPerPartition = result.filledBuffers.get(0).recordsPerPartition; sizePerPartition = result.filledBuffers.get(0).sizePerPartition; @@ -1263,6 +1284,19 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } } + } + + int[] getRecordsPerPartition() { + return recordsPerPartition; + } + + @Override + public void onSuccess(SpillResult result) { + synchronized (UnorderedPartitionedKVWriter.this) { + spilledSize += result.spillSize; + } + + computePartitionStats(result); mayBeSendEventsForSpill(recordsPerPartition, sizePerPartition, spillNumber, false); @@ -1276,7 +1310,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting to reset buffer after spill"); } - if (!pipelinedShuffle) { + if (!pipelinedShuffle && isFinalMergeEnabled) { synchronized(additionalSpillBytesWritternCounter) { additionalSpillBytesWritternCounter.increment(result.spillSize); } http://git-wip-us.apache.org/repos/asf/tez/blob/b3e20c7c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 71bd240..f1cea7e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -19,13 +19,13 @@ package org.apache.tez.runtime.library.common.writers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -47,12 +47,13 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.google.protobuf.ByteString; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; import org.roaringbitmap.RoaringBitmap; @@ -89,7 +90,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles; import org.apache.tez.runtime.library.partitioner.HashPartitioner; -import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; import org.junit.After; import org.junit.Before; @@ -275,6 +275,12 @@ public class TestUnorderedPartitionedKVWriter { } @Test(timeout = 10000) + public void testNoRecords_SinglePartition() throws IOException, InterruptedException { + // skipBuffers + baseTest(0, 1, null, shouldCompress, -1, 0); + } + + @Test(timeout = 10000) public void testSkippedPartitions() throws IOException, InterruptedException { baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress, -1, 0); } @@ -614,6 +620,12 @@ public class TestUnorderedPartitionedKVWriter { } @Test(timeout = 10000) + public void testNoRecords_SinglePartition_WithPipelinedShuffle() throws IOException, InterruptedException { + // skipBuffers + baseTestWithPipelinedTransfer(0, 1, null, shouldCompress); + } + + @Test(timeout = 10000) public void testSkippedPartitions_WithPipelinedShuffle() throws IOException, InterruptedException { baseTestWithPipelinedTransfer(200, 10, Sets.newHashSet(2, 5), shouldCompress); } @@ -681,6 +693,11 @@ public class TestUnorderedPartitionedKVWriter { ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class); List<Event> lastEvents = kvWriter.close(); + + if (numPartitions == 1) { + assertEquals(false, kvWriter.skipBuffers); + } + //no events are sent to kvWriter upon close with pipelining assertTrue(lastEvents.size() == 0); verify(outputContext, atLeast(numExpectedSpills)).sendEvents(eventCaptor.capture()); @@ -753,7 +770,6 @@ public class TestUnorderedPartitionedKVWriter { //No additional spills when final merge is disabled. assertEquals(numAdditionalSpillsCounter.getValue(), 0); - BitSet emptyPartitionBits = null; assertTrue(lastEvents.size() > 0); //Get the last event int index = lastEvents.size() - 1; @@ -767,10 +783,31 @@ public class TestUnorderedPartitionedKVWriter { ByteString.copyFrom(cdme.getUserPayload())); //Ensure that this is the last event assertTrue(eventProto.getLastEvent()); + verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, skippedPartitions); + + verify(outputContext, atLeast(1)).notifyProgress(); + + // Verify if all spill files are available. + TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId); + + if (numRecordsWritten > 0) { + int numSpills = kvWriter.numSpills.get(); + for (int i = 0; i < numSpills; i++) { + assertTrue(localFs.exists(taskOutput.getSpillFileForWrite(i, 10))); + assertTrue(localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 10))); + } + } else { + return; + } + } + + private void verifyEmptyPartitions(DataMovementEventPayloadProto eventProto, + int numRecordsWritten, int numPartitions, Set<Integer> skippedPartitions) + throws IOException { if (eventProto.hasEmptyPartitions()) { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray( eventProto.getEmptyPartitions()); - emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions); + BitSet emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions); if (numRecordsWritten == 0) { assertEquals(numPartitions, emptyPartitionBits.cardinality()); } else { @@ -795,6 +832,228 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(SHUFFLE_PORT, eventProto.getPort()); assertTrue(eventProto.hasPathComponent()); } + } + + @Test(timeout = 10000) + public void testNoSpill_WithFinalMergeDisabled() throws IOException, InterruptedException { + baseTestWithFinalMergeDisabled(10, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testSingleSpill_WithFinalMergeDisabled() throws IOException, InterruptedException { + baseTestWithFinalMergeDisabled(50, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testSinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException { + baseTestWithFinalMergeDisabled(0, 1, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testMultipleSpills_WithFinalMergeDisabled() throws IOException, InterruptedException { + baseTestWithFinalMergeDisabled(200, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testNoRecords_WithFinalMergeDisabled() throws IOException, InterruptedException { + baseTestWithFinalMergeDisabled(0, 10, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testNoRecords_SinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException { + baseTestWithFinalMergeDisabled(0, 1, null, shouldCompress); + } + + @Test(timeout = 10000) + public void testSkippedPartitions_WithFinalMergeDisabled() throws IOException, InterruptedException { + baseTestWithFinalMergeDisabled(200, 10, Sets.newHashSet(2, 5), shouldCompress); + } + + @SuppressWarnings("unchecked") + private void baseTestWithFinalMergeDisabled(int numRecords, int numPartitions, + Set<Integer> skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException { + + PartitionerForTest partitioner = new PartitionerForTest(); + ApplicationId appId = ApplicationId.newInstance(10000000, 1); + TezCounters counters = new TezCounters(); + String uniqueId = UUID.randomUUID().toString(); + int dagId = 1; + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); + + Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, + shouldCompress, -1); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false); + + CompressionCodec codec = null; + if (shouldCompress) { + codec = new DefaultCodec(); + ((Configurable) codec).setConf(conf); + } + + int numOutputs = numPartitions; + long availableMemory = 2048; + int numRecordsWritten = 0; + + UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, + conf, numOutputs, availableMemory); + + int sizePerBuffer = kvWriter.sizePerBuffer; + int sizePerRecord = 4 + 8; // IntW + LongW + int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD + + BitSet partitionsWithData = new BitSet(numPartitions); + IntWritable intWritable = new IntWritable(); + LongWritable longWritable = new LongWritable(); + for (int i = 0; i < numRecords; i++) { + intWritable.set(i); + longWritable.set(i); + int partition = partitioner.getPartition(intWritable, longWritable, numOutputs); + if (skippedPartitions != null && skippedPartitions.contains(partition)) { + continue; + } + partitionsWithData.set(partition); + kvWriter.write(intWritable, longWritable); + numRecordsWritten++; + } + + int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead; + int numExpectedSpills = numRecordsWritten / recordsPerBuffer; + + ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class); + List<Event> lastEvents = kvWriter.close(); + + if (numPartitions == 1) { + assertEquals(true, kvWriter.skipBuffers); + } + + // max events sent are spills + one VM event. If there are no spills, atleast empty + // partitions would be sent out finally. + int spills = Math.max(1, kvWriter.numSpills.get()); + assertEquals((spills + 1), lastEvents.size()); //spills + VMEvent + verify(outputContext, atMost(0)).sendEvents(eventCaptor.capture()); + + for (int i=0; i<lastEvents.size(); i++) { + Event event =lastEvents.get(i); + if (event instanceof VertexManagerEvent) { + //when there are no records, empty IFile with 6 bytes would be created which would add up + // to stats. + if (numRecordsWritten > 0) { + verifyPartitionStats(((VertexManagerEvent) event), partitionsWithData); + } + } + } + + verify(outputContext, never()).reportFailure(any(TaskFailureType.class), + any(Throwable.class), any(String.class)); + + assertNull(kvWriter.currentBuffer); + assertEquals(0, kvWriter.availableBuffers.size()); + + // Verify the counters + TezCounter outputRecordBytesCounter = + counters.findCounter(TaskCounter.OUTPUT_BYTES); + TezCounter outputRecordsCounter = + counters.findCounter(TaskCounter.OUTPUT_RECORDS); + TezCounter outputBytesWithOverheadCounter = + counters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); + TezCounter fileOutputBytesCounter = + counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); + TezCounter spilledRecordsCounter = + counters.findCounter(TaskCounter.SPILLED_RECORDS); + TezCounter additionalSpillBytesWritternCounter = counters + .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN); + TezCounter additionalSpillBytesReadCounter = counters + .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); + TezCounter numAdditionalSpillsCounter = counters + .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); + assertEquals(numRecordsWritten * sizePerRecord, + outputRecordBytesCounter.getValue()); + assertEquals(numRecordsWritten, outputRecordsCounter.getValue()); + if (outputRecordsCounter.getValue() > 0) { + assertEquals(numRecordsWritten * sizePerRecordWithOverhead, + outputBytesWithOverheadCounter.getValue()); + } else { + assertEquals(0, outputBytesWithOverheadCounter.getValue()); + } + long fileOutputBytes = fileOutputBytesCounter.getValue(); + if (numRecordsWritten > 0) { + assertTrue(fileOutputBytes > 0); + if (!shouldCompress) { + assertTrue("fileOutputBytes=" + fileOutputBytes + ", outputRecordBytes=" + +outputRecordBytesCounter.getValue(), + fileOutputBytes > outputRecordBytesCounter.getValue()); + } + } else { + assertEquals(0, fileOutputBytes); + } + // due to multiple threads, buffers could be merged in chunks in scheduleSpill. + assertTrue(recordsPerBuffer * numExpectedSpills >= spilledRecordsCounter.getValue()); + long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue(); + long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue(); + + //No additional spill bytes written when final merge is disabled. + assertEquals(additionalSpillBytesWritten, 0); + + //No additional spills when final merge is disabled. + assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead); + + //No additional spills when final merge is disabled. + assertEquals(numAdditionalSpillsCounter.getValue(), 0); + + assertTrue(lastEvents.size() > 0); + //Get the last event + int index = lastEvents.size() - 1; + assertTrue(lastEvents.get(index) instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent cdme = + (CompositeDataMovementEvent)lastEvents.get(index); + assertEquals(0, cdme.getSourceIndexStart()); + assertEquals(numOutputs, cdme.getCount()); + DataMovementEventPayloadProto eventProto = + DataMovementEventPayloadProto.parseFrom( + ByteString.copyFrom(cdme.getUserPayload())); + + verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, skippedPartitions); + + if (outputRecordsCounter.getValue() > 0) { + //Ensure that this is the last event + assertTrue(eventProto.getLastEvent()); + } + + // Verify if all path components have spillIds when final merge is disabled + Pattern mergePathComponentPattern = Pattern.compile("(.*)(_\\d+)"); + for(Event event : lastEvents) { + if (!(event instanceof CompositeDataMovementEvent)) { + continue; + } + cdme = (CompositeDataMovementEvent)event; + eventProto = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload())); + + assertEquals(false, eventProto.getPipelined()); + if (eventProto.hasPathComponent()) { + //for final merge disabled cases, it should have _spillId + Matcher matcher = mergePathComponentPattern.matcher(eventProto.getPathComponent()); + assertTrue("spill id should be present in path component " + eventProto.getPathComponent(), matcher.matches()); + assertEquals(2, matcher.groupCount()); + assertEquals(uniqueId, matcher.group(1)); + assertTrue("spill id should be present in path component", matcher.group(2) != null); + } else { + assertEquals(0, eventProto.getSpillId()); + if (outputRecordsCounter.getValue() > 0) { + assertEquals(true, eventProto.getLastEvent()); + } else { + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto + .getEmptyPartitions()); + BitSet emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions); + assertEquals(numPartitions, emptyPartitionBits.cardinality()); + } + } + } + verify(outputContext, atLeast(1)).notifyProgress(); @@ -876,6 +1135,10 @@ public class TestUnorderedPartitionedKVWriter { } List<Event> events = kvWriter.close(); + if (numPartitions == 1) { + assertEquals(true, kvWriter.skipBuffers); + } + int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead; int numExpectedSpills = numRecordsWritten / recordsPerBuffer / kvWriter.spillLimit;
