Repository: tez Updated Branches: refs/heads/master cc68f7b3f -> 2ecef2527
TEZ-3206. Have unordered partitioned KV output send partition stats via VertexManagerEvent. Contributed by Ming Ma. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2ecef252 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2ecef252 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2ecef252 Branch: refs/heads/master Commit: 2ecef25276f17c4969ab031929684e81ef3beee3 Parents: cc68f7b Author: Siddharth Seth <[email protected]> Authored: Mon May 23 14:31:36 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon May 23 14:31:36 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../vertexmanager/ShuffleVertexManager.java | 2 +- .../writers/UnorderedPartitionedKVWriter.java | 108 +++++++++++-- .../TestUnorderedPartitionedKVWriter.java | 150 ++++++++++++++----- .../output/TestOnFileUnorderedKVOutput.java | 6 +- 5 files changed, 211 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba7a04e..c3499ac 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3206. Have unordered partitioned KV output send partition stats via VertexManagerEvent. TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer TEZ-3240. Improvements to tez.lib.uris to allow for multiple tarballs and mixing tarballs and jars. TEZ-3246. Improve diagnostics when DAG killed by user http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index aee8b6f..b83c64e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -585,7 +585,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { private void handleVertexManagerEvent(VertexManagerEvent vmEvent) { // currently events from multiple attempts of the same task can be ignored because - // their output will be the same. However, with pipelined events that may not hold. + // their output will be the same. TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier(); if (!taskWithVmEvents.add(producerTask)) { LOG.info("Ignoring vertex manager event from: " + producerTask); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index c7e3059..76075bb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; @@ -55,6 +56,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; @@ -63,7 +65,9 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,6 +115,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final ListeningExecutorService spillExecutor; private final int[] numRecordsPerPartition; + // uncompressed size for each partition + private final long[] sizePerPartition; private volatile long spilledSize = 0; /** @@ -197,10 +203,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit .setDaemon(true) .setNameFormat( "UnorderedOutSpiller {" - + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "}") + + TezUtilsInternal.cleanVertexName( + outputContext.getDestinationVertexName()) + "}") .build()); spillExecutor = MoreExecutors.listeningDecorator(executor); numRecordsPerPartition = new int[numPartitions]; + sizePerPartition = new long[numPartitions]; outputLargeRecordsCounter = outputContext.getCounters().findCounter( TaskCounter.OUTPUT_LARGE_RECORDS); @@ -250,7 +258,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } if (skipBuffers) { //special case, where we have only one partition and pipelining is disabled. - writer.append(key, value); // ???? Why is outputrecordscounter not updated here? + // The reason outputRecordsCounter isn't updated here: + // For skipBuffers case, IFile writer has the reference to + // outputRecordsCounter and during its close method call, + // it will update the outputRecordsCounter. + writer.append(key, value); outputContext.notifyProgress(); } else { int partition = partitioner.getPartition(key, value, numPartitions); @@ -272,7 +284,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit int metaStart = currentBuffer.nextPosition; currentBuffer.availableSize -= (META_SIZE + metaSkip); currentBuffer.nextPosition += META_SIZE; - + keySerializer.serialize(key); if (currentBuffer.full) { @@ -293,7 +305,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit int valStart = currentBuffer.nextPosition; valSerializer.serialize(value); - + if (currentBuffer.full) { // Value too large for current buffer, or K-V too large for entire buffer. if (metaStart == 0) { @@ -324,6 +336,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit outputContext.notifyProgress(); currentBuffer.partitionPositions[partition] = metaStart; currentBuffer.recordsPerPartition[partition]++; + currentBuffer.sizePerPartition[partition] += + currentBuffer.nextPosition - (metaStart + META_SIZE); currentBuffer.numRecords++; } @@ -353,6 +367,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private void updateGlobalStats(WrappedBuffer buffer) { for (int i = 0; i < numPartitions; i++) { numRecordsPerPartition[i] += buffer.recordsPerPartition[i]; + sizePerPartition[i] += buffer.sizePerPartition[i]; } } @@ -472,6 +487,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @Override public List<Event> close() throws IOException, InterruptedException { + List<Event> eventList = Lists.newLinkedList(); isShutdown.set(true); spillLock.lock(); try { @@ -513,12 +529,15 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (outputRecordsCounter.getValue() == 0) { emptyPartitions.set(0); } + sizePerPartition[0] = rawLen; cleanupCurrentBuffer(); outputBytesWithOverheadCounter.increment(rawLen); fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate); - return Collections.singletonList(generateDMEvent(false, -1, false, outputContext + eventList.add(generateVMEvent()); + eventList.add(generateDMEvent(false, -1, false, outputContext .getUniqueIdentifier(), emptyPartitions)); + return eventList; } //Regular code path. @@ -528,12 +547,17 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit finalSpill(); } cleanupCurrentBuffer(); - return Collections.singletonList(generateDMEvent()); + eventList.add(generateVMEvent()); + eventList.add(generateDMEvent()); + return eventList; } //For pipelined case, send out an event in case finalspill generated a spill file. if (finalSpill()) { - sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, numSpills.get() - 1, true); + // VertexManagerEvent is only sent at the end and thus sizePerPartition is used + // for the sum of all spills. + sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, + sizePerPartition, numSpills.get() - 1, true); } cleanupCurrentBuffer(); return events; @@ -551,6 +575,39 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit return emptyPartitions; } + private Event generateVMEvent() throws IOException { + return generateVMEvent(this.sizePerPartition); + } + + private Event generateVMEvent(long[] sizePerPartition) throws IOException { + ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = + ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder(); + + long outputSize = outputContext.getCounters(). + findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + + // Set this information only when required. In pipelined shuffle, + // multiple events would end up adding up to final output size. + // This is needed for auto-reduce parallelism to work properly. + vmBuilder.setOutputSize(outputSize); + + //set partition stats + if (sizePerPartition != null && sizePerPartition.length > 0) { + RoaringBitmap stats = ShuffleUtils.getPartitionStatsForPhysicalOutput( + sizePerPartition); + DataOutputBuffer dout = new DataOutputBuffer(); + stats.serialize(dout); + ByteString partitionStatsBytes = + TezCommonUtils.compressByteArrayToByteString(dout.getData()); + vmBuilder.setPartitionStats(partitionStatsBytes); + } + + VertexManagerEvent vmEvent = VertexManagerEvent.create( + outputContext.getDestinationVertexName(), + vmBuilder.build().toByteString().asReadOnlyByteBuffer()); + return vmEvent; + } + private Event generateDMEvent() throws IOException { BitSet emptyPartitions = getEmptyPartitions(numRecordsPerPartition); return generateDMEvent(false, -1, false, outputContext.getUniqueIdentifier(), emptyPartitions); @@ -609,12 +666,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private boolean finalSpill() throws IOException { if (currentBuffer.nextPosition == 0) { if (pipelinedShuffle) { + List<Event> eventList = Lists.newLinkedList(); + eventList.add(generateVMEvent(new long[numPartitions])); //Send final event with all empty partitions and null path component. BitSet emptyPartitions = new BitSet(numPartitions); emptyPartitions.flip(0, numPartitions); - - outputContext.sendEvents( - Collections.singletonList(generateDMEvent(true, numSpills.get(), true, null, emptyPartitions))); + eventList.add(generateDMEvent(true, numSpills.get(), true, + null, emptyPartitions)); + outputContext.sendEvents(eventList); } return false; } else { @@ -785,6 +844,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit writer.append(key, value); outputLargeRecordsCounter.increment(1); numRecordsPerPartition[i]++; + sizePerPartition[i] += writer.getRawLength(); writer.close(); additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(), @@ -805,7 +865,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } handleSpillIndex(spillPathDetails, spillRecord); - sendPipelinedEventForSpill(emptyPartitions, spillIndex, false); + sendPipelinedEventForSpill(emptyPartitions, sizePerPartition, + spillIndex, false); LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex); if (LOG.isDebugEnabled()) { @@ -862,6 +923,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final int[] partitionPositions; private final int[] recordsPerPartition; + // uncompressed size for each partition + private final long[] sizePerPartition; private final int numPartitions; private final int size; @@ -878,10 +941,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit WrappedBuffer(int numPartitions, int size) { this.partitionPositions = new int[numPartitions]; this.recordsPerPartition = new int[numPartitions]; + this.sizePerPartition = new long[numPartitions]; this.numPartitions = numPartitions; for (int i = 0; i < numPartitions; i++) { this.partitionPositions[i] = PARTITION_ABSENT_POSITION; this.recordsPerPartition[i] = 0; + this.sizePerPartition[i] = 0; } size = size - (size % INT_SIZE); this.size = size; @@ -894,6 +959,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit for (int i = 0; i < numPartitions; i++) { this.partitionPositions[i] = PARTITION_ABSENT_POSITION; this.recordsPerPartition[i] = 0; + this.sizePerPartition[i] = 0; } numRecords = 0; nextPosition = 0; @@ -908,27 +974,36 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean isFinalUpdate) { + private void sendPipelinedEventForSpill( + BitSet emptyPartitions, long[] sizePerPartition, int spillNumber, + boolean isFinalUpdate) { + List<Event> eventList = Lists.newLinkedList(); if (!pipelinedShuffle) { return; } //Send out an event for consuming. try { String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); + if (isFinalUpdate) { + eventList.add(generateVMEvent(sizePerPartition)); + } Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, pathComponent, emptyPartitions); + eventList.add(compEvent); LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber); - outputContext.sendEvents(Collections.singletonList(compEvent)); + outputContext.sendEvents(eventList); } catch (IOException e) { LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e); outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in sending pipelined events"); } } - private void sendPipelinedEventForSpill(int[] recordsPerPartition, int spillNumber, boolean isFinalUpdate) { + private void sendPipelinedEventForSpill(int[] recordsPerPartition, + long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) { BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition); - sendPipelinedEventForSpill(emptyPartitions, spillNumber, isFinalUpdate); + sendPipelinedEventForSpill(emptyPartitions, sizePerPartition, spillNumber, + isFinalUpdate); } private class SpillCallback implements FutureCallback<SpillResult> { @@ -943,7 +1018,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit public void onSuccess(SpillResult result) { spilledSize += result.spillSize; - sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, spillNumber, false); + sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, + result.wrappedBuffer.sizePerPartition, spillNumber, false); try { result.wrappedBuffer.reset(); http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 8c935eb..3b82690 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -32,12 +32,15 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -46,6 +49,10 @@ import java.util.UUID; import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; @@ -197,7 +204,7 @@ public class TestUnorderedPartitionedKVWriter { } @Test(timeout = 10000) - public void testNoSpill_SinglPartition() throws IOException, InterruptedException { + public void testNoSpill_SinglePartition() throws IOException, InterruptedException { baseTest(10, 1, null, shouldCompress); } @@ -336,20 +343,24 @@ public class TestUnorderedPartitionedKVWriter { return; } - // Validate the event - assertEquals(1, events.size()); - assertTrue(events.get(0) instanceof CompositeDataMovementEvent); - CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0); + // Validate the events + assertEquals(2, events.size()); + + assertTrue(events.get(0) instanceof VertexManagerEvent); + VertexManagerEvent vme = (VertexManagerEvent) events.get(0); + verifyPartitionStats(vme, partitionsWithData); + + assertTrue(events.get(1) instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1); assertEquals(0, cdme.getSourceIndexStart()); assertEquals(numPartitions, cdme.getCount()); DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom( - ByteString.copyFrom(cdme - .getUserPayload())); + ByteString.copyFrom(cdme.getUserPayload())); BitSet emptyPartitionBits = null; if (partitionsWithData.cardinality() != numPartitions) { assertTrue(eventProto.hasEmptyPartitions()); - byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto - .getEmptyPartitions()); + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray( + eventProto.getEmptyPartitions()); emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions); assertEquals(numPartitions - partitionsWithData.cardinality(), emptyPartitionBits.cardinality()); @@ -404,6 +415,43 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(0, expectedValues.size()); } + private long[] getPartitionStats( + VertexManagerEvent vme) throws IOException { + RoaringBitmap partitionStats = new RoaringBitmap(); + ShuffleUserPayloads.VertexManagerEventPayloadProto + payload = ShuffleUserPayloads.VertexManagerEventPayloadProto + .parseFrom(ByteString.copyFrom(vme.getUserPayload())); + assertTrue(payload.hasPartitionStats()); + ByteString compressedPartitionStats = payload.getPartitionStats(); + byte[] rawData = TezCommonUtils.decompressByteStringToByteArray( + compressedPartitionStats); + ByteArrayInputStream bin = new ByteArrayInputStream(rawData); + partitionStats.deserialize(new DataInputStream(bin)); + long[] stats = new long[partitionStats.getCardinality()]; + Iterator<Integer> it = partitionStats.iterator(); + final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values(); + final int RANGE_LEN = RANGES.length; + while (it.hasNext()) { + int pos = it.next(); + int index = ((pos) / RANGE_LEN); + int rangeIndex = ((pos) % RANGE_LEN); + if (RANGES[rangeIndex].getSizeInMB() > 0) { + stats[index] += RANGES[rangeIndex].getSizeInMB(); + } + } + return stats; + } + + private void verifyPartitionStats(VertexManagerEvent vme, + BitSet expectedPartitionsWithData) throws IOException { + long[] stats = getPartitionStats(vme); + for (int i = 0; i < stats.length; i++) { + // The stats should be greater than zero if and only if + // the partition has data + assertTrue(expectedPartitionsWithData.get(i) == (stats[i] > 0)); + } + } + @Test(timeout = 10000) public void testNoSpill_WithPipelinedShuffle() throws IOException, InterruptedException { baseTestWithPipelinedTransfer(10, 10, null, shouldCompress); @@ -469,6 +517,7 @@ public class TestUnorderedPartitionedKVWriter { int sizePerRecord = 4 + 8; // IntW + LongW int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD + BitSet partitionsWithData = new BitSet(numPartitions); IntWritable intWritable = new IntWritable(); LongWritable longWritable = new LongWritable(); for (int i = 0; i < numRecords; i++) { @@ -478,6 +527,7 @@ public class TestUnorderedPartitionedKVWriter { if (skippedPartitions != null && skippedPartitions.contains(partition)) { continue; } + partitionsWithData.set(partition); kvWriter.write(intWritable, longWritable); numRecordsWritten++; } @@ -486,15 +536,29 @@ public class TestUnorderedPartitionedKVWriter { int numExpectedSpills = numRecordsWritten / recordsPerBuffer; ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class); - List<Event> events = kvWriter.close(); - assertTrue(events.size() == 0); //no events are sent to kvWriter upon close with pipelining - + List<Event> lastEvents = kvWriter.close(); + //no events are sent to kvWriter upon close with pipelining + assertTrue(lastEvents.size() == 0); verify(outputContext, atLeast(numExpectedSpills)).sendEvents(eventCaptor.capture()); - events = eventCaptor.getValue(); - - assertTrue(events.size() == 1); //the last event which was sent out + int numOfCapturedEvents = eventCaptor.getAllValues().size(); + lastEvents = eventCaptor.getAllValues().get(numOfCapturedEvents - 1); + VertexManagerEvent VMEvent = (VertexManagerEvent)lastEvents.get(0); + + for (int i=0; i<numOfCapturedEvents; i++) { + List<Event> events = eventCaptor.getAllValues().get(i); + if (i < numOfCapturedEvents - 1) { + assertTrue(events.size() == 1); + assertTrue(events.get(0) instanceof CompositeDataMovementEvent); + } else { + assertTrue(events.size() == 2); + assertTrue(events.get(0) instanceof VertexManagerEvent); + assertTrue(events.get(1) instanceof CompositeDataMovementEvent); + } + } + verifyPartitionStats(VMEvent, partitionsWithData); - verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class), any(String.class)); + verify(outputContext, never()).reportFailure(any(TaskFailureType.class), + any(Throwable.class), any(String.class)); // Verify the status of the buffers if (numExpectedSpills == 0) { @@ -506,19 +570,24 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(0, kvWriter.availableBuffers.size()); // Verify the counters - TezCounter outputRecordBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES); - TezCounter outputRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_RECORDS); - TezCounter outputBytesWithOverheadCounter = counters - .findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); - TezCounter fileOutputBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); - TezCounter spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); + TezCounter outputRecordBytesCounter = + counters.findCounter(TaskCounter.OUTPUT_BYTES); + TezCounter outputRecordsCounter = + counters.findCounter(TaskCounter.OUTPUT_RECORDS); + TezCounter outputBytesWithOverheadCounter = + counters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); + TezCounter fileOutputBytesCounter = + counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); + TezCounter spilledRecordsCounter = + counters.findCounter(TaskCounter.SPILLED_RECORDS); TezCounter additionalSpillBytesWritternCounter = counters .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN); TezCounter additionalSpillBytesReadCounter = counters .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); TezCounter numAdditionalSpillsCounter = counters .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); - assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue()); + assertEquals(numRecordsWritten * sizePerRecord, + outputRecordBytesCounter.getValue()); assertEquals(numRecordsWritten, outputRecordsCounter.getValue()); assertEquals(numRecordsWritten * sizePerRecordWithOverhead, outputBytesWithOverheadCounter.getValue()); @@ -531,8 +600,10 @@ public class TestUnorderedPartitionedKVWriter { } else { assertEquals(0, fileOutputBytes); } - assertEquals(recordsPerBuffer * numExpectedSpills, spilledRecordsCounter.getValue()); - long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue(); + assertEquals(recordsPerBuffer * numExpectedSpills, + spilledRecordsCounter.getValue()); + long additionalSpillBytesWritten = + additionalSpillBytesWritternCounter.getValue(); long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue(); //No additional spill bytes written when final merge is disabled. @@ -545,20 +616,22 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(numAdditionalSpillsCounter.getValue(), 0); BitSet emptyPartitionBits = null; - assertTrue(events.size() > 0); + assertTrue(lastEvents.size() > 0); //Get the last event - int index = events.size() - 1; - assertTrue(events.get(index) instanceof CompositeDataMovementEvent); - CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(index); + int index = lastEvents.size() - 1; + assertTrue(lastEvents.get(index) instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent cdme = + (CompositeDataMovementEvent)lastEvents.get(index); assertEquals(0, cdme.getSourceIndexStart()); assertEquals(numOutputs, cdme.getCount()); DataMovementEventPayloadProto eventProto = - DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload())); + DataMovementEventPayloadProto.parseFrom( + ByteString.copyFrom(cdme.getUserPayload())); //Ensure that this is the last event assertTrue(eventProto.getLastEvent()); if (eventProto.hasEmptyPartitions()) { - byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto - .getEmptyPartitions()); + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray( + eventProto.getEmptyPartitions()); emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions); if (numRecordsWritten == 0) { assertEquals(numPartitions, emptyPartitionBits.cardinality()); @@ -636,6 +709,7 @@ public class TestUnorderedPartitionedKVWriter { IntWritable intWritable = new IntWritable(); LongWritable longWritable = new LongWritable(); + BitSet partitionsWithData = new BitSet(numPartitions); for (int i = 0; i < numRecords; i++) { intWritable.set(i); longWritable.set(i); @@ -643,6 +717,7 @@ public class TestUnorderedPartitionedKVWriter { if (skippedPartitions != null && skippedPartitions.contains(partition)) { continue; } + partitionsWithData.set(partition); expectedValues.get(partition).put(intWritable.get(), longWritable.get()); kvWriter.write(intWritable, longWritable); numRecordsWritten++; @@ -709,10 +784,13 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue()); BitSet emptyPartitionBits = null; - // Verify the event returned - assertEquals(1, events.size()); - assertTrue(events.get(0) instanceof CompositeDataMovementEvent); - CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0); + // Verify the events returned + assertEquals(2, events.size()); + assertTrue(events.get(0) instanceof VertexManagerEvent); + VertexManagerEvent vme = (VertexManagerEvent) events.get(0); + verifyPartitionStats(vme, partitionsWithData); + assertTrue(events.get(1) instanceof CompositeDataMovementEvent); + CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1); assertEquals(0, cdme.getSourceIndexStart()); assertEquals(numOutputs, cdme.getCount()); DataMovementEventPayloadProto eventProto = http://git-wip-us.apache.org/repos/asf/tez/blob/2ecef252/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 884f0e6..38a60a2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -143,8 +143,8 @@ public class TestOnFileUnorderedKVOutput { events = kvOutput.close(); assertEquals(45, task.getTaskStatistics().getIOStatistics().values().iterator().next().getDataSize()); assertEquals(5, task.getTaskStatistics().getIOStatistics().values().iterator().next().getItemsProcessed()); - assertTrue(events != null && events.size() == 1); - CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0); + assertTrue(events != null && events.size() == 2); + CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(1); assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart()); @@ -191,7 +191,7 @@ public class TestOnFileUnorderedKVOutput { events = eventsCaptor.getValue(); - CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0); + CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(1); assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart()); DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
