Repository: tez Updated Branches: refs/heads/master 8131896b3 -> 91279010b
TEZ-3303. Have ShuffleVertexManager consume more precise partition stats. Contributed by Tsuyoshi Ozawa. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/91279010 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/91279010 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/91279010 Branch: refs/heads/master Commit: 91279010b6b72afa2ab9ca357dcf07356bd90ac6 Parents: 8131896 Author: Siddharth Seth <[email protected]> Authored: Tue Jul 12 22:19:08 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Jul 12 22:19:08 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../vertexmanager/ShuffleVertexManager.java | 13 ++++ .../vertexmanager/TestShuffleVertexManager.java | 65 +++++++++++++++++--- 3 files changed, 69 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/91279010/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7c1f0cf..9aab833 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3303. Have ShuffleVertexManager consume more precise partition stats. TEZ-1248. Reduce slow-start should special case 1 reducer runs. TEZ-3327. ATS Parser: Populate config details available in dag. TEZ-3325. Flaky test in TestDAGImpl.testCounterLimits. http://git-wip-us.apache.org/repos/asf/tez/blob/91279010/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 7d9822c..c8a1f30 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -555,6 +555,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @VisibleForTesting + void parseDetailedPartitionStats(List<Integer> partitionStats) { + Preconditions.checkState(stats != null, "Stats should be initialized"); + for (int i = 0; i< partitionStats.size(); i++) { + stats[i] += partitionStats.get(i); + } + } + + @VisibleForTesting void parsePartitionStats(RoaringBitmap partitionStats) { Preconditions.checkState(stats != null, "Stats should be initialized"); Iterator<Integer> it = partitionStats.iterator(); @@ -618,10 +626,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin { partitionStats.deserialize(new DataInputStream(bin)); parsePartitionStats(partitionStats); + } catch (IOException e) { throw new TezUncheckedException(e); } + } else if (proto.hasDetailedPartitionStats()) { + List<Integer> detailedPartitionStats = proto.getDetailedPartitionStats().getSizeInMbList(); + parseDetailedPartitionStats(detailedPartitionStats); } + srcInfo.numVMEventsReceived++; srcInfo.outputSize += sourceTaskOutputSize; completedSourceTasksOutputSize += sourceTaskOutputSize; http://git-wip-us.apache.org/repos/asf/tez/blob/91279010/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 2566c94..a5a6581 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -330,9 +330,9 @@ public class TestShuffleVertexManager { */ scheduledTasks.clear(); //{5,9,12,18} in bitmap - long[] sizes = new long[]{(0l), (1000l * 1000l), - (1010 * 1000l * 1000l), (50 * 1000l * 1000l)}; - vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex"); + final long MB = 1024l * 1024l; + long[] sizes = new long[]{(0l), (1 * MB), (964 * MB), (48 * MB)}; + vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", false); manager = createManager(conf, mockContext, 0.01f, 0.75f); manager.onVertexStarted(emptyCompletions); @@ -362,6 +362,37 @@ public class TestShuffleVertexManager { Assert.assertEquals(100, manager.stats[2]); //100 MB bucket Assert.assertEquals(10, manager.stats[3]); //10 MB bucket + // Testing for detailed partition stats + vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", true); + + manager = createManager(conf, mockContext, 0.01f, 0.75f); + manager.onVertexStarted(emptyCompletions); + Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + + taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); + manager.onVertexManagerEventReceived(vmEvent); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + + Assert.assertEquals(4, manager.stats.length); + Assert.assertEquals(0, manager.stats[0]); + Assert.assertEquals(1, manager.stats[1]); + Assert.assertEquals(964, manager.stats[2]); + Assert.assertEquals(48, manager.stats[3]); + + // sending again from a different version of the same task has not impact + taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1"); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2)); + manager.onVertexManagerEventReceived(vmEvent); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + + Assert.assertEquals(4, manager.stats.length); + Assert.assertEquals(0, manager.stats[0]); + Assert.assertEquals(1, manager.stats[1]); + Assert.assertEquals(964, manager.stats[2]); + Assert.assertEquals(48, manager.stats[3]); + /** * Test for TEZ-978 * Delay determining parallelism until enough data has been received. @@ -1017,7 +1048,11 @@ public class TestShuffleVertexManager { Assert.assertTrue(scheduledTasks.size() == 3); } - VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) + VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) throws IOException { + return getVertexManagerEvent(sizes, totalSize, vertexName, false); + } + + VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName, boolean reportDetailedStats) throws IOException { ByteBuffer payload = null; if (sizes != null) { @@ -1026,12 +1061,22 @@ public class TestShuffleVertexManager { partitionStats.serialize(dout); ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); - payload = - VertexManagerEventPayloadProto.newBuilder() - .setOutputSize(totalSize) - .setPartitionStats(partitionStatsBytes) - .build().toByteString() - .asReadOnlyByteBuffer(); + if (reportDetailedStats) { + payload = + VertexManagerEventPayloadProto.newBuilder() + .setOutputSize(totalSize) + .setDetailedPartitionStats(ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(sizes)) + .build().toByteString() + .asReadOnlyByteBuffer(); + } else { + payload = + VertexManagerEventPayloadProto.newBuilder() + .setOutputSize(totalSize) + .setPartitionStats(partitionStatsBytes) + .build().toByteString() + .asReadOnlyByteBuffer(); + } + } else { payload = VertexManagerEventPayloadProto.newBuilder()
