Repository: tez Updated Branches: refs/heads/master 1864e4b23 -> cc1d89cba
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cc1d89cb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cc1d89cb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cc1d89cb Branch: refs/heads/master Commit: cc1d89cba984a9c43e6a966b2c822355b6c85c07 Parents: 1864e4b Author: Rajesh Balamohan <[email protected]> Authored: Wed Aug 5 12:49:23 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Aug 5 12:49:23 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/impl/TestVertexImpl.java | 74 ++++++++++++++++++++ .../vertexmanager/ShuffleVertexManager.java | 11 ++- 3 files changed, 83 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cc1d89cb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 26ce333..c1a4e31 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ INCOMPATIBLE CHANGES TEZ-2647. Add input causality dependency for attempts ALL CHANGES: + TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized. TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. TEZ-2645. Provide standard analyzers for job analysis. http://git-wip-us.apache.org/repos/asf/tez/blob/cc1d89cb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index c55ea23..3c0dd1e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -47,7 +47,12 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.protobuf.ByteString; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.runtime.api.VertexStatistics; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -5575,6 +5580,75 @@ public class TestVertexImpl { } @Test(timeout = 5000) + public void testTez2684() throws AMUserCodeException, IOException { + setupPreDagCreation(); + dagPlan = createSamplerDAGPlan2(); + setupPostDagCreation(); + + VertexImpl vA = vertices.get("A"); + VertexImpl vB = vertices.get("B"); + VertexImpl vC = vertices.get("C"); + + //vA init & start + dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(), + VertexEventType.V_INIT)); + dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(), + VertexEventType.V_START)); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, vA.getState()); + Assert.assertEquals(VertexState.NEW, vB.getState()); + Assert.assertEquals(VertexState.NEW, vC.getState()); + + //vB init + dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(), + VertexEventType.V_INIT)); + dispatcher.await(); + Assert.assertEquals(VertexState.INITED, vB.getState()); + Assert.assertEquals(VertexState.INITED, vC.getState()); + + //Send VertexManagerEvent + long[] sizes = new long[]{(100 * 1000l * 1000l)}; + Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C"); + TezEvent tezEvent = new TezEvent(vmEvent, null); + dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(), + Lists.newArrayList(tezEvent))); + dispatcher.await(); + Assert.assertEquals(VertexState.INITED, vC.getState()); + + //vB start + dispatcher.getEventHandler().handle(new VertexEvent(vB.getVertexId(), VertexEventType.V_START)); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, vC.getState()); + + } + + VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) + throws IOException { + ByteBuffer payload = null; + if (sizes != null) { + RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes); + DataOutputBuffer dout = new DataOutputBuffer(); + partitionStats.serialize(dout); + ByteString + partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); + payload = + ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder() + .setOutputSize(totalSize) + .setPartitionStats(partitionStatsBytes) + .build().toByteString() + .asReadOnlyByteBuffer(); + } else { + payload = + ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder() + .setOutputSize(totalSize) + .build().toByteString() + .asReadOnlyByteBuffer(); + } + VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload); + return vmEvent; + } + + @Test(timeout = 5000) public void testExceptionFromVM_Initialize() throws AMUserCodeException { useCustomInitializer = true; setupPreDagCreation(); http://git-wip-us.apache.org/repos/asf/tez/blob/cc1d89cb/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 0aef8bd..6c3e3f8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -589,8 +589,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin { void updatePendingTasks() { + int tasks = getContext().getVertexNumTasks(getContext().getVertexName()); + if (tasks == pendingTasks.size() || tasks <= 0) { + return; + } pendingTasks.clear(); - for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) { + for (int i = 0; i < tasks; ++i) { pendingTasks.add(new PendingTaskInfo(i)); } totalTasksToSchedule = pendingTasks.size(); @@ -801,7 +805,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { boolean computedPartitionSizes = false; for (PendingTaskInfo taskInfo : pendingTasks) { int index = taskInfo.index; - if (targetIndexes != null) { //things have been reconfigured. + if (targetIndexes != null) { //parallelism has changed. Preconditions.checkState(index < targetIndexes.length, "index=" + index +", targetIndexes length=" + targetIndexes.length); int[] mapping = targetIndexes[index]; @@ -957,7 +961,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin { + slowStartMaxSrcCompletionFraction + " auto:" + enableAutoParallelism + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:" + minTaskParallelism); - + + updatePendingTasks(); if (enableAutoParallelism) { getContext().vertexReconfigurationPlanned(); }
