Repository: tez Updated Branches: refs/heads/master 4ed4a5693 -> d9f542f4c
TEZ-3697. Adding #output_record in vertex manager event payload (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d9f542f4 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d9f542f4 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d9f542f4 Branch: refs/heads/master Commit: d9f542f4ca2168ac7485b1b6d816a9458465e66d Parents: 4ed4a56 Author: Zhiyuan Yang <[email protected]> Authored: Mon May 8 00:18:14 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Mon May 8 00:18:14 2017 -0700 ---------------------------------------------------------------------- .../library/common/shuffle/ShuffleUtils.java | 2 ++ .../src/main/proto/ShufflePayloads.proto | 1 + .../TestUnorderedPartitionedKVWriter.java | 20 ++++++++++++++++++-- 3 files changed, 21 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d9f542f4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index caddbc8..efcba70 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -461,6 +461,8 @@ public class ShuffleUtils { // multiple events would end up adding up to final output size. // This is needed for auto-reduce parallelism to work properly. vmBuilder.setOutputSize(outputSize); + vmBuilder.setNumRecord(context.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue() + + context.getCounters().findCounter(TaskCounter.OUTPUT_LARGE_RECORDS).getValue()); //set partition stats if (sizePerPartition != null && sizePerPartition.length > 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/d9f542f4/tez-runtime-library/src/main/proto/ShufflePayloads.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto index f78cbac..0a4f4a6 100644 --- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto +++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto @@ -51,6 +51,7 @@ message VertexManagerEventPayloadProto { optional int64 output_size = 1; optional bytes partition_stats = 2; optional DetailedPartitionStatsProto detailed_partition_stats = 3; + optional int64 num_record = 4; } message ShuffleEdgeManagerConfigPayloadProto { http://git-wip-us.apache.org/repos/asf/tez/blob/d9f542f4/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index d970b95..5cda126 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.writers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -51,6 +52,7 @@ import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; @@ -399,6 +401,20 @@ public class TestUnorderedPartitionedKVWriter { List<Event> events = kvWriter.close(); verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class), any(String.class)); + if (!pipeliningEnabled) { + VertexManagerEvent vmEvent = null; + for (Event event : events) { + if (event instanceof VertexManagerEvent) { + assertNull(vmEvent); + vmEvent = (VertexManagerEvent) event; + } + } + VertexManagerEventPayloadProto vmEventPayload = + VertexManagerEventPayloadProto.parseFrom( + ByteString.copyFrom(vmEvent.getUserPayload().asReadOnlyBuffer())); + assertEquals(numRecordsWritten, vmEventPayload.getNumRecord()); + } + TezCounter outputLargeRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS); assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs, outputLargeRecordsCounter.getValue()); @@ -481,8 +497,8 @@ public class TestUnorderedPartitionedKVWriter { private int[] getPartitionStats(VertexManagerEvent vme) throws IOException { RoaringBitmap partitionStats = new RoaringBitmap(); - ShuffleUserPayloads.VertexManagerEventPayloadProto - payload = ShuffleUserPayloads.VertexManagerEventPayloadProto + VertexManagerEventPayloadProto + payload = VertexManagerEventPayloadProto .parseFrom(ByteString.copyFrom(vme.getUserPayload())); if (!reportPartitionStats.isEnabled()) { assertFalse(payload.hasPartitionStats());
