TEZ-2232. Allow setParallelism to be called multiple times before tasks get scheduled (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5e2a55fb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5e2a55fb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5e2a55fb Branch: refs/heads/TEZ-2003 Commit: 5e2a55fb12334ad272dc4d6990f6e79e98f1e9b3 Parents: 09a9608 Author: Bikas Saha <[email protected]> Authored: Fri Apr 3 12:36:49 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Apr 3 12:36:49 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 208 +++++++++++-------- .../tez/dag/app/dag/impl/TestVertexImpl.java | 115 ++++++++-- .../dag/app/dag/impl/TestVertexRecovery.java | 62 ++++++ 4 files changed, 277 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a669147..8fad569 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar) ALL CHANGES: + TEZ-2232. Allow setParallelism to be called multiple times before tasks get + scheduled TEZ-2265. All inputs/outputs in a task share the same counter object TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang. TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late. http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 7f124b4..81e9bb9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -685,8 +685,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private final UserGroupInformation dagUgi; - private boolean parallelismSet = false; - private AtomicBoolean committed = new AtomicBoolean(false); private AtomicBoolean aborted = new AtomicBoolean(false); private boolean commitVertexOutputs = false; @@ -1121,19 +1119,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private void handleParallelismUpdate(int newParallelism, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, - Map<String, InputSpecUpdate> rootInputSpecUpdates) { - LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks; - Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet() - .iterator(); - int i = 0; - while (iter.hasNext()) { - i++; - iter.next(); - if (i <= newParallelism) { - continue; - } - iter.remove(); - } + Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldParallelism) { + // initial parallelism must have been set by this time + // parallelism update is recorded in history only for change from an initialized value + Preconditions.checkArgument(oldParallelism != -1, getLogIdentifier()); + if (oldParallelism < newParallelism) { + addTasks(newParallelism); + } else if (oldParallelism > newParallelism) { + removeTasks(newParallelism); + } + Preconditions.checkState(this.numTasks == newParallelism, getLogIdentifier()); this.recoveredSourceEdgeManagers = sourceEdgeManagers; this.recoveredRootInputSpecUpdates = rootInputSpecUpdates; } @@ -1168,17 +1163,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } return recoveredState; case VERTEX_PARALLELISM_UPDATED: + // TODO TEZ-1019 this should flow through setParallelism method VertexParallelismUpdatedEvent updatedEvent = (VertexParallelismUpdatedEvent) historyEvent; + int oldNumTasks = numTasks; + int newNumTasks = updatedEvent.getNumTasks(); + handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeManagers(), + updatedEvent.getRootInputSpecUpdates(), oldNumTasks); + Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier()); if (updatedEvent.getVertexLocationHint() != null) { - setTaskLocationHints(updatedEvent.getVertexLocationHint()); + setVertexLocationHint(updatedEvent.getVertexLocationHint()); } - int oldNumTasks = numTasks; - numTasks = updatedEvent.getNumTasks(); stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); - handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers(), - updatedEvent.getRootInputSpecUpdates()); if (LOG.isDebugEnabled()) { LOG.debug("Recovered state for vertex after parallelism updated event" + ", vertex=" + logIdentifier @@ -1250,6 +1247,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertexLocationHint.getTaskLocationHints() != null && !vertexLocationHint.getTaskLocationHints().isEmpty()) { List<TaskLocationHint> locHints = vertexLocationHint.getTaskLocationHints(); + // TODO TEZ-2246 hints size must match num tasks taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]); } } @@ -1348,14 +1346,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, writeLock.unlock(); } } - Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " - + parallelism + " for vertex: " + logIdentifier); - setVertexLocationHint(vertexLocationHint); + Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " + parallelism + + " for vertex: " + logIdentifier); writeLock.lock(); try { - if (parallelismSet == true) { - String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier; + // disallow changing things after a vertex has started + if (!tasksNotYetScheduled) { + String msg = "setParallelism cannot be called after scheduling tasks. Vertex: " + + getLogIdentifier(); LOG.info(msg); throw new TezUncheckedException(msg); } @@ -1364,13 +1363,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // vertex is fully defined. setParallelism has been called. VertexManager should have // informed us about this. Otherwise we would have notified listeners that we are fully // defined before we are actually fully defined - Preconditions.checkState(vertexToBeReconfiguredByManager, "Vertex is fully configured but still" - + " the reconfiguration API has been called. VertexManager must notify the framework using " - + " context.vertexReconfigurationPlanned() before re-configuring the vertex."); + Preconditions + .checkState( + vertexToBeReconfiguredByManager, + "Vertex is fully configured but still" + + " the reconfiguration API has been called. VertexManager must notify the framework using " + + " context.vertexReconfigurationPlanned() before re-configuring the vertex."); } - parallelismSet = true; - // Input initializer/Vertex Manager/1-1 split expected to set parallelism. if (numTasks == -1) { if (getState() != VertexState.INITIALIZING) { @@ -1414,6 +1414,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); this.createTasks(); + setVertexLocationHint(vertexLocationHint); LOG.info("Vertex " + getLogIdentifier() + " parallelism set to " + parallelism); if (canInitVertex()) { @@ -1427,55 +1428,39 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // for a vertex to start. Preconditions.checkState(rootInputSpecUpdates == null, "Root Input specs can only be updated when the vertex is configured with -1 tasks"); - if (parallelism >= numTasks) { - // not that hard to support perhaps. but checking right now since there - // is no use case for it and checking may catch other bugs. - String msg = "Increasing parallelism is not supported, vertexId=" + logIdentifier; - LOG.warn(msg); - throw new TezUncheckedException(msg); + + int oldNumTasks = numTasks; + + // start buffering incoming events so that we can re-route existing events + for (Edge edge : sourceVertices.values()) { + edge.startEventBuffering(); } + if (parallelism == numTasks) { LOG.info("setParallelism same as current value: " + parallelism + " for vertex: " + logIdentifier); Preconditions.checkArgument(sourceEdgeManagers != null, "Source edge managers or RootInputSpecs must be set when not changing parallelism"); } else { - LOG.info( - "Resetting vertex location hints due to change in parallelism for vertex: " + logIdentifier); + LOG.info("Resetting vertex location hints due to change in parallelism for vertex: " + + logIdentifier); vertexLocationHint = null; - } - - // start buffering incoming events so that we can re-route existing events - for (Edge edge : sourceVertices.values()) { - edge.startEventBuffering(); - } - // assign to local variable of LinkedHashMap to make sure that changing - // type of task causes compile error. We depend on LinkedHashMap for order - LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks; - Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet() - .iterator(); - int i = 0; - while (iter.hasNext()) { - i++; - Map.Entry<TezTaskID, Task> entry = iter.next(); - Task task = entry.getValue(); - if (task.getState() != TaskState.NEW) { - String msg = "All tasks must be in initial state when changing parallelism" - + " for vertex: " + getLogIdentifier(); - LOG.warn(msg); - throw new TezUncheckedException(msg); - } - if (i <= parallelism) { - continue; + if (parallelism > numTasks) { + addTasks((parallelism)); + } else if (parallelism < numTasks) { + removeTasks(parallelism); } - LOG.info("Removing task: " + entry.getKey()); - iter.remove(); } - LOG.info("Vertex " + logIdentifier + - " parallelism set to " + parallelism + " from " + numTasks); - int oldNumTasks = numTasks; - this.numTasks = parallelism; + + Preconditions.checkState(this.numTasks == parallelism, getLogIdentifier()); + + // set new vertex location hints + setVertexLocationHint(vertexLocationHint); + LOG.info("Vertex " + getLogIdentifier() + " parallelism set to " + parallelism + " from " + + numTasks); + + // notify listeners stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); assert tasks.size() == numTasks; @@ -1495,12 +1480,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } - VertexParallelismUpdatedEvent parallelismUpdatedEvent = - new VertexParallelismUpdatedEvent(vertexId, numTasks, - vertexLocationHint, - sourceEdgeManagers, rootInputSpecUpdates, oldNumTasks); - appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(), - parallelismUpdatedEvent)); + // update history + VertexParallelismUpdatedEvent parallelismUpdatedEvent = new VertexParallelismUpdatedEvent( + vertexId, numTasks, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, + oldNumTasks); + appContext.getHistoryHandler().handle( + new DAGHistoryEvent(getDAGId(), parallelismUpdatedEvent)); // stop buffering events for (Edge edge : sourceVertices.values()) { @@ -2028,29 +2013,73 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } + private TaskImpl createTask(int taskIndex) { + ContainerContext conContext = getContainerContext(taskIndex); + return new TaskImpl(this.getVertexId(), taskIndex, + this.eventHandler, + vertexConf, + this.taskAttemptListener, + this.clock, + this.taskHeartbeatHandler, + this.appContext, + (this.targetVertices != null ? + this.targetVertices.isEmpty() : true), + this.taskResource, + conContext, + this.stateChangeNotifier); + } + private void createTasks() { for (int i=0; i < this.numTasks; ++i) { - ContainerContext conContext = getContainerContext(i); - TaskImpl task = - new TaskImpl(this.getVertexId(), i, - this.eventHandler, - vertexConf, - this.taskAttemptListener, - this.clock, - this.taskHeartbeatHandler, - this.appContext, - (this.targetVertices != null ? - this.targetVertices.isEmpty() : true), - this.taskResource, - conContext, - this.stateChangeNotifier); + TaskImpl task = createTask(i); this.addTask(task); if(LOG.isDebugEnabled()) { LOG.debug("Created task for vertex " + logIdentifier + ": " + task.getTaskId()); } } - + } + + private void addTasks(int newNumTasks) { + Preconditions.checkArgument(newNumTasks > this.numTasks, getLogIdentifier()); + int initialNumTasks = this.numTasks; + for (int i = initialNumTasks; i < newNumTasks; ++i) { + TaskImpl task = createTask(i); + this.addTask(task); + this.numTasks++; + if(LOG.isDebugEnabled()) { + LOG.debug("Created task for vertex " + logIdentifier + ": " + + task.getTaskId()); + } + } + } + + private void removeTasks(int newNumTasks) { + Preconditions.checkArgument(newNumTasks < this.numTasks, getLogIdentifier()); + // assign to local variable of LinkedHashMap to make sure that changing + // type of task causes compile error. We depend on LinkedHashMap for order + LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks; + Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet() + .iterator(); + // remove tasks from the end to maintain index numbers + int i = 0; + while (iter.hasNext()) { + i++; + Map.Entry<TezTaskID, Task> entry = iter.next(); + Task task = entry.getValue(); + if (task.getState() != TaskState.NEW) { + String msg = "All tasks must be in initial state when changing parallelism" + + " for vertex: " + getLogIdentifier(); + LOG.warn(msg); + throw new TezUncheckedException(msg); + } + if (i <= newNumTasks) { + continue; + } + LOG.info("Removing task: " + entry.getKey()); + iter.remove(); + this.numTasks--; + } } private VertexState setupVertex() { @@ -2709,6 +2738,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } boolean successSetParallelism ; try { + // recovering only edge manager vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false); successSetParallelism = true; http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 35f35e6..e643a5b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -2468,16 +2468,27 @@ public class TestVertexImpl { Assert.assertTrue(e.getMessage().contains("context.vertexReconfigurationPlanned() cannot be called after initialize()")); } } + + private void checkTasks(Vertex v, int numTasks) { + Assert.assertEquals(numTasks, v.getTotalTasks()); + Map<TezTaskID, Task> tasks = v.getTasks(); + Assert.assertEquals(numTasks, tasks.size()); + // check all indices + int i = 0; + // iteration maintains order due to linked hash map + for(Task task : tasks.values()) { + Assert.assertEquals(i, task.getTaskId().getId()); + i++; + } + } @Test(timeout = 5000) - public void testVertexSetParallelism() throws Exception { + public void testVertexSetParallelismDecrease() throws Exception { VertexImpl v3 = vertices.get("vertex3"); v3.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); Assert.assertEquals(2, v3.getTotalTasks()); - Map<TezTaskID, Task> tasks = v3.getTasks(); - Assert.assertEquals(2, tasks.size()); - TezTaskID firstTask = tasks.keySet().iterator().next(); + Assert.assertEquals(2, v3.getTasks().size()); VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); @@ -2492,15 +2503,56 @@ public class TestVertexImpl { v3.doneReconfiguringVertex(); assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof EdgeManagerForTest); - Assert.assertEquals(1, v3.getTotalTasks()); - Assert.assertEquals(1, tasks.size()); - // the last one is removed - assertTrue(tasks.keySet().iterator().next().equals(firstTask)); + checkTasks(v3, 1); + } + + @Test(timeout = 5000) + public void testVertexSetParallelismIncrease() throws Exception { + VertexImpl v3 = vertices.get("vertex3"); + v3.vertexReconfigurationPlanned(); + initAllVertices(VertexState.INITED); + Assert.assertEquals(2, v3.getTotalTasks()); + Assert.assertEquals(2, v3.getTasks().size()); + + VertexImpl v1 = vertices.get("vertex1"); + startVertex(vertices.get("vertex2")); + startVertex(v1); + + EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = + EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()); + + Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors = + Collections.singletonMap( + v1.getName(), mockEdgeManagerDescriptor); + v3.setParallelism(10, null, edgeManagerDescriptors, null, true); + v3.doneReconfiguringVertex(); + assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof + EdgeManagerForTest); + checkTasks(v3, 10); + } + + @Test(timeout = 5000) + public void testVertexSetParallelismMultiple() throws Exception { + VertexImpl v3 = vertices.get("vertex3"); + v3.vertexReconfigurationPlanned(); + initAllVertices(VertexState.INITED); + Assert.assertEquals(2, v3.getTotalTasks()); + Map<TezTaskID, Task> tasks = v3.getTasks(); + Assert.assertEquals(2, tasks.size()); + VertexImpl v1 = vertices.get("vertex1"); + startVertex(vertices.get("vertex2")); + startVertex(v1); + v3.setParallelism(10, null, null, null, true); + checkTasks(v3, 10); + + v3.setParallelism(5, null, null, null, true); + checkTasks(v3, 5); + v3.doneReconfiguringVertex(); } @Test(timeout = 5000) - public void testVertexSetParallelismIncreaseException() throws Exception { + public void testVertexSetParallelismMultipleFailAfterDone() throws Exception { VertexImpl v3 = vertices.get("vertex3"); v3.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); @@ -2511,19 +2563,43 @@ public class TestVertexImpl { VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); + v3.setParallelism(10, null, null, null, true); + checkTasks(v3, 10); + v3.doneReconfiguringVertex(); - // increase not supported try { - v3.setParallelism(100, null, null, null, true); - v3.doneReconfiguringVertex(); + v3.setParallelism(5, null, null, null, true); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Vertex is fully configured but still")); + } + } + + @Test(timeout = 5000) + public void testVertexSetParallelismMultipleFailAfterSchedule() throws Exception { + VertexImpl v3 = vertices.get("vertex3"); + v3.vertexReconfigurationPlanned(); + initAllVertices(VertexState.INITED); + Assert.assertEquals(2, v3.getTotalTasks()); + Map<TezTaskID, Task> tasks = v3.getTasks(); + Assert.assertEquals(2, tasks.size()); + + VertexImpl v1 = vertices.get("vertex1"); + startVertex(vertices.get("vertex2")); + startVertex(v1); + v3.setParallelism(10, null, null, null, true); + checkTasks(v3, 10); + v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null))); + try { + v3.setParallelism(5, null, null, null, true); Assert.fail(); } catch (TezUncheckedException e) { - Assert.assertTrue(e.getMessage().contains("Increasing parallelism is not supported")); + Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling")); } } @Test(timeout = 5000) - public void testVertexSetParallelismMultipleException() throws Exception { + public void testVertexSetParallelismFailAfterSchedule() throws Exception { VertexImpl v3 = vertices.get("vertex3"); v3.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); @@ -2534,18 +2610,15 @@ public class TestVertexImpl { VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); - v3.setParallelism(1, null, null, null, true); - - // multiple invocations not supported + v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null))); try { - v3.setParallelism(1, null, null, null, true); + v3.setParallelism(5, null, null, null, true); Assert.fail(); } catch (TezUncheckedException e) { - Assert.assertTrue(e.getMessage().contains("Parallelism can only be set dynamically once per vertex")); + Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling")); } - v3.doneReconfiguringVertex(); } - + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexPendingTaskEvents() { http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java index 3c658d4..24defea 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java @@ -69,6 +69,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; @@ -362,6 +363,67 @@ public class TestVertexRecovery { } + @Test(timeout = 5000) + public void testRecovery_SetParallelism() { + VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); + int oldNumTasks = 10; + VertexState recoveredState = vertex1 + .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", + initRequestedTime, initedTime, oldNumTasks, "", null)); + assertEquals(VertexState.INITED, recoveredState); + recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1 + .getVertexId(), 5, null, null, null, oldNumTasks)); + assertEquals(5, vertex1.getTotalTasks()); + vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), + VertexState.SUCCEEDED)); + dispatcher.await(); + assertEquals(VertexState.SUCCEEDED, vertex1.getState()); + assertEquals(vertex1.numTasks, vertex1.succeededTaskCount); + assertEquals(vertex1.numTasks, vertex1.completedTaskCount); + // recover its task + assertTaskRecoveredEventSent(vertex1); + + // vertex3 is still in NEW, when the desiredState is + // Completed State, each vertex recovery by itself, not depend on its parent + VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); + assertEquals(VertexState.NEW, vertex3.getState()); + // no VertexEvent pass to downstream vertex + assertEquals(0, vertexEventHandler.getEvents().size()); + } + + @Test(timeout = 5000) + public void testRecovery_SetParallelismMultiple() { + VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); + int oldNumTasks = 10; + VertexState recoveredState = vertex1 + .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", + initRequestedTime, initedTime, oldNumTasks, "", null)); + assertEquals(VertexState.INITED, recoveredState); + recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1 + .getVertexId(), 5, null, null, null, oldNumTasks)); + assertEquals(5, vertex1.getTotalTasks()); + recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1 + .getVertexId(), 7, null, null, null, 5)); + assertEquals(7, vertex1.getTotalTasks()); + vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), + VertexState.SUCCEEDED)); + dispatcher.await(); + assertEquals(VertexState.SUCCEEDED, vertex1.getState()); + assertEquals(vertex1.numTasks, vertex1.succeededTaskCount); + assertEquals(vertex1.numTasks, vertex1.completedTaskCount); + // recover its task + assertTaskRecoveredEventSent(vertex1); + + // vertex3 is still in NEW, when the desiredState is + // Completed State, each vertex recovery by itself, not depend on its parent + VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); + assertEquals(VertexState.NEW, vertex3.getState()); + // no VertexEvent pass to downstream vertex + assertEquals(0, vertexEventHandler.getEvents().size()); + + } + + /** * vertex1(New) -> StartRecoveryTransition(SUCCEEDED) * @throws IOException
