Repository: tez Updated Branches: refs/heads/master 19464785a -> c97414920
TEZ-2987. TestVertexImpl.testTez2684 fails (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c9741492 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c9741492 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c9741492 Branch: refs/heads/master Commit: c9741492056758e3cac1a3d34cd1c7cbd7d9ddd4 Parents: 1946478 Author: Bikas Saha <[email protected]> Authored: Sun Dec 13 19:48:17 2015 -0800 Committer: Bikas Saha <[email protected]> Committed: Sun Dec 13 19:48:17 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/impl/TestVertexImpl.java | 4 ++-- .../vertexmanager/ShuffleVertexManager.java | 17 ++++++++++++++++ .../vertexmanager/TestShuffleVertexManager.java | 21 +++++++++++++++++++- 4 files changed, 40 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c9741492/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2f63a13..42e20e1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job. ALL CHANGES: + TEZ-2987. TestVertexImpl.testTez2684 fails TEZ-2995. Timeline primary filter should only be on callerId and not type. TEZ-2994. LocalProgress in tez-runtime-library missing Apache header, rat check warnings from the new licenses after TEZ-2592 merge. TEZ-2977. Make HadoopShim selection be overridable for distro-specific implementations. http://git-wip-us.apache.org/repos/asf/tez/blob/c9741492/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 9453df8..1aaf588 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -5870,11 +5870,11 @@ public class TestVertexImpl { //Send VertexManagerEvent long[] sizes = new long[]{(100 * 1000l * 1000l)}; - Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C"); + Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "B"); TezTaskAttemptID taId = TezTaskAttemptID.getInstance( TezTaskID.getInstance(vC.getVertexId(), 1), 1); - EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "C", "C", taId); + EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "B", "C", taId); TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo); dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(), Lists.newArrayList(tezEvent))); http://git-wip-us.apache.org/repos/asf/tez/blob/c9741492/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index c88c7a2..410ad73 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -145,6 +145,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { int numBipartiteSourceTasksCompleted = 0; int numVertexManagerEventsReceived = 0; List<PendingTaskInfo> pendingTasks = Lists.newLinkedList(); + List<VertexManagerEvent> pendingVMEvents = Lists.newLinkedList(); int totalTasksToSchedule = 0; private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); @@ -501,10 +502,16 @@ public class ShuffleVertexManager extends VertexManagerPlugin { if(bipartiteSources == 0) { throw new TezUncheckedException("Atleast 1 bipartite source should exist"); } + for (VertexStateUpdate stateUpdate : pendingStateUpdates) { handleVertexStateUpdate(stateUpdate); } pendingStateUpdates.clear(); + + for (VertexManagerEvent vmEvent : pendingVMEvents) { + handleVertexManagerEvent(vmEvent); + } + pendingVMEvents.clear(); // track the tasks in this vertex updatePendingTasks(); @@ -567,6 +574,16 @@ public class ShuffleVertexManager extends VertexManagerPlugin { @Override public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { + if (onVertexStartedDone.get()) { + // internal data structures have been initialized - so handle the events directly + handleVertexManagerEvent(vmEvent); + } else { + // save this event for processing after vertex starts + pendingVMEvents.add(vmEvent); + } + } + + private void handleVertexManagerEvent(VertexManagerEvent vmEvent) { // currently events from multiple attempts of the same task can be ignored because // their output will be the same. However, with pipelined events that may not hold. TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier(); http://git-wip-us.apache.org/repos/asf/tez/blob/c9741492/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 9d53ebc..9c21aed 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -307,6 +307,25 @@ public class TestShuffleVertexManager { Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize); /** + * Test vmEvent and vertexStatusUpdate before started + */ + scheduledTasks.clear(); + //{5,9,12,18} in bitmap + vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); + + manager = createManager(conf, mockContext, 0.01f, 0.75f); + Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + + TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexManagerEventReceived(vmEvent); + Assert.assertEquals(0, manager.numVertexManagerEventsReceived); // nothing happens + manager.onVertexStarted(emptyCompletions); // now the processing happens + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + + /** * Test partition stats */ scheduledTasks.clear(); @@ -320,7 +339,7 @@ public class TestShuffleVertexManager { Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); + taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); manager.onVertexManagerEventReceived(vmEvent); Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
