TEZ-2242. Refactor ShuffleVertexManager code (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/505febd6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/505febd6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/505febd6 Branch: refs/heads/TEZ-2003 Commit: 505febd63e6c2fdaf882540d5c55f75fb30b7190 Parents: 17d2388 Author: Bikas Saha <[email protected]> Authored: Fri Mar 27 11:46:00 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Mar 27 11:46:00 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../vertexmanager/ShuffleVertexManager.java | 68 ++++++++++++++------ .../vertexmanager/TestShuffleVertexManager.java | 27 ++++---- 3 files changed, 65 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/505febd6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 91653bf..3c859c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -91,6 +91,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2242. Refactor ShuffleVertexManager code TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false. TEZ-2047. Build fails against hadoop-2.2 post TEZ-2018 TEZ-2064. SessionNotRunning Exception not thrown is all cases http://git-wip-us.apache.org/repos/asf/tez/blob/505febd6/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index f923319..b6d69dc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -141,6 +141,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { @VisibleForTesting int bipartiteSources = 0; long completedSourceTasksOutputSize = 0; + List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList(); static class SourceVertexInfo { EdgeProperty edgeProperty; @@ -326,7 +327,26 @@ public class ShuffleVertexManager extends VertexManagerPlugin { @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { + public synchronized void onVertexStarted(Map<String, List<Integer>> completions) { + // examine edges after vertex started because until then these may not have been defined + Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties(); + for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) { + srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue())); + // TODO what if derived class has already called this + getContext().registerForVertexStateUpdates(entry.getKey(), + EnumSet.of(VertexState.CONFIGURED)); + if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) { + bipartiteSources++; + } + } + if(bipartiteSources == 0) { + throw new TezUncheckedException("Atleast 1 bipartite source should exist"); + } + for (VertexStateUpdate stateUpdate : pendingStateUpdates) { + handleVertexStateUpdate(stateUpdate); + } + pendingStateUpdates.clear(); + // track the tasks in this vertex updatePendingTasks(); updateSourceTaskCount(); @@ -348,7 +368,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) { + public synchronized void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) { updateSourceTaskCount(); SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName); @@ -368,7 +388,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @Override - public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { + public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { // TODO handle duplicates from retries if (enableAutoParallelism) { // save output size @@ -678,19 +698,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin { + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:" + minTaskParallelism); - Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties(); - for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) { - srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue())); - getContext().registerForVertexStateUpdates(entry.getKey(), - EnumSet.of(VertexState.CONFIGURED)); - if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) { - bipartiteSources++; - } - } - if(bipartiteSources == 0) { - throw new TezUncheckedException("Atleast 1 bipartite source should exist"); - } - if (enableAutoParallelism) { getContext().vertexReconfigurationPlanned(); } @@ -699,8 +706,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } - @Override - public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + private void handleVertexStateUpdate(VertexStateUpdate stateUpdate) { Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED, "Received incorrect state notification : " + stateUpdate.getVertexState() + " for vertex: " + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName()); @@ -716,7 +722,31 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @Override - public void onRootVertexInitialized(String inputName, + public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + if (stateUpdate.getVertexState() == VertexState.CONFIGURED) { + // we will not register for updates until our vertex starts. + // derived classes can make other update requests for other states that we should + // ignore. However that will not be allowed until the state change notified supports + // multiple registers for the same vertex + if (onVertexStartedDone.get()) { + // normally this if check will always be true because we register after vertex + // start. + handleVertexStateUpdate(stateUpdate); + } else { + // normally this code will not trigger since we are the ones who register for + // the configured states updates and that will happen after vertex starts. + // So this code will only trigger if a derived class also registers for updates + // for the same vertices but multiple registers to the same vertex is currently + // not supported by the state change notifier code. This is just future proofing + // when that is supported + // vertex not started yet. So edge info may not have been defined correctly yet. + pendingStateUpdates.add(stateUpdate); + } + } + } + + @Override + public synchronized void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) { // Not allowing this for now. Nothing to do. } http://git-wip-us.apache.org/repos/asf/tez/blob/505febd6/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 76c0aa6..4d9302e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -212,17 +212,16 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig - verify(mockContext, times(2)).vertexReconfigurationPlanned(); - Assert.assertTrue(manager.bipartiteSources == 2); - - // source vertices have 0 tasks. when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1); - // check waiting for notification before scheduling manager.onVertexStarted(null); + verify(mockContext, times(2)).vertexReconfigurationPlanned(); + Assert.assertTrue(manager.bipartiteSources == 2); + + // check waiting for notification before scheduling Assert.assertFalse(manager.pendingTasks.isEmpty()); // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); @@ -235,12 +234,14 @@ public class TestShuffleVertexManager { // check scheduling only after onVertexStarted manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig verify(mockContext, times(3)).vertexReconfigurationPlanned(); - Assert.assertTrue(manager.bipartiteSources == 2); // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling + // normally this event will not come before onVertexStarted() is called manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done + verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled + // trigger start and processing of pending notification events manager.onVertexStarted(null); + Assert.assertTrue(manager.bipartiteSources == 2); verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled @@ -500,6 +501,7 @@ public class TestShuffleVertexManager { mockInputVertices.put(mockSrcVertexId3, eProp3); try { manager = createManager(conf, mockContext, 0.1f, 0.1f); + manager.onVertexStarted(null); Assert.assertFalse(true); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains( @@ -511,6 +513,7 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); + manager.onVertexStarted(null); Assert.assertTrue(manager.bipartiteSources == 2); final HashSet<Integer> scheduledTasks = new HashSet<Integer>(); @@ -791,7 +794,6 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); - Assert.assertTrue(manager.bipartiteSources == 3); final HashSet<Integer> scheduledTasks = new HashSet<Integer>(); doAnswer(new Answer() { @@ -806,6 +808,7 @@ public class TestShuffleVertexManager { }}).when(mockContext_R2).scheduleVertexTasks(anyList()); manager.onVertexStarted(null); + Assert.assertTrue(manager.bipartiteSources == 3); manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); @@ -915,10 +918,6 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(m2)).thenReturn(3); when(mockContext.getVertexNumTasks(m3)).thenReturn(3); - // check initialization - manager = createManager(conf, mockContext, 0.001f, 0.001f); - Assert.assertTrue(manager.bipartiteSources == 1); - final HashSet<Integer> scheduledTasks = new HashSet<Integer>(); doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -931,7 +930,11 @@ public class TestShuffleVertexManager { return null; }}).when(mockContext).scheduleVertexTasks(anyList()); + // check initialization + manager = createManager(conf, mockContext, 0.001f, 0.001f); manager.onVertexStarted(null); + Assert.assertTrue(manager.bipartiteSources == 1); + manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
