Repository: tez Updated Branches: refs/heads/master 3e6fc355c -> 8386cca03
TEZ-2310. Deadlock caused by StateChangeNotifier sending notifications on thread holding locks (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8386cca0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8386cca0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8386cca0 Branch: refs/heads/master Commit: 8386cca03ab89248765b2ac87dd78a50ca84ff3c Parents: 3e6fc35 Author: Bikas Saha <[email protected]> Authored: Fri Apr 17 01:59:41 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Apr 17 01:59:41 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/tez/dag/app/dag/DAG.java | 4 + .../tez/dag/app/dag/StateChangeNotifier.java | 99 ++++++++++++++++++-- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 6 +- .../tez/dag/app/dag/impl/VertexManager.java | 4 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 2 - .../dag/app/dag/TestStateChangeNotifier.java | 57 +++++++++-- .../tez/dag/app/dag/impl/TestDAGImpl.java | 8 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 8 +- 9 files changed, 164 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1a7609f..b0ce3cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2310. Deadlock caused by StateChangeNotifier sending notifications on + thread holding locks TEZ-1969. Stop the DAGAppMaster when a local mode client is stopped TEZ-714. OutputCommitters should not run in the main AM dispatcher thread TEZ-2323. Fix TestOrderedWordcount to use MR memory configs. http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 1b64754..4c3426a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGStatusBuilder; @@ -55,6 +56,9 @@ public interface DAG { * @return job-counters and aggregate task-counters */ TezCounters getAllCounters(); + + @SuppressWarnings("rawtypes") + EventHandler getEventHandler(); /** * Get Vertex by vertex name http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java index d2b298b..260cbf3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java @@ -22,19 +22,27 @@ package org.apache.tez.dag.app.dag; import java.util.EnumSet; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tracks status updates from various components, and informs registered components about updates. @@ -42,18 +50,88 @@ import org.apache.tez.dag.records.TezVertexID; @InterfaceAudience.Private public class StateChangeNotifier { + private static final Logger LOG = LoggerFactory.getLogger(StateChangeNotifier.class); + private final DAG dag; private final SetMultimap<TezVertexID, ListenerContainer> vertexListeners; private final ListMultimap<TezVertexID, VertexStateUpdate> lastKnowStatesMap; private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock(); - private final ReentrantReadWriteLock.ReadLock readLock = listenersLock.readLock(); private final ReentrantReadWriteLock.WriteLock writeLock = listenersLock.writeLock(); + BlockingQueue<NotificationEvent> eventQueue = new LinkedBlockingQueue<NotificationEvent>(); + private Thread eventHandlingThread; + private volatile boolean stopEventHandling = false; + + private static class NotificationEvent { + final VertexStateUpdate update; + final VertexStateUpdateListener listener; + + public NotificationEvent(VertexStateUpdate update, VertexStateUpdateListener listener) { + this.update = update; + this.listener = listener; + } + + void sentUpdate() { + listener.onStateUpdated(update); + } + + @Override + public String toString() { + return "[ VertexState:(" + update + ") Listener:" + listener + " ]"; + } + } + public StateChangeNotifier(DAG dag) { this.dag = dag; this.vertexListeners = Multimaps.synchronizedSetMultimap( HashMultimap.<TezVertexID, ListenerContainer>create()); this.lastKnowStatesMap = LinkedListMultimap.create(); + startThread(); + } + + private void startThread() { + this.eventHandlingThread = new Thread("State Change Notifier DAG: " + dag.getID()) { + @SuppressWarnings("unchecked") + @Override + public void run() { + while (!stopEventHandling && !Thread.currentThread().isInterrupted()) { + NotificationEvent event; + try { + event = eventQueue.take(); + } catch (InterruptedException e) { + if(!stopEventHandling) { + LOG.warn("Continuing after interrupt : ", e); + } + continue; + } + try { + event.sentUpdate(); + processedEventFromQueue(); + } catch (Exception e) { + // TODO send user code exception - TEZ-2332 + LOG.error("Error in state update notification for " + event, e); + dag.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR)); + return; + } + } + } + }; + this.eventHandlingThread.setDaemon(true); // dont block exit on this + this.eventHandlingThread.start(); + } + + @VisibleForTesting + protected void processedEventFromQueue() { + } + + @VisibleForTesting + protected void addedEventToQueue() { + } + + public void stop() { + this.stopEventHandling = true; + if (eventHandlingThread != null) + eventHandlingThread.interrupt(); } // -------------- VERTEX STATE CHANGE SECTION --------------- @@ -99,14 +177,14 @@ public class StateChangeNotifier { } public void stateChanged(TezVertexID vertexId, VertexStateUpdate vertexStateUpdate) { - readLock.lock(); + writeLock.lock(); try { lastKnowStatesMap.put(vertexId, vertexStateUpdate); if (vertexListeners.containsKey(vertexId)) { sendStateUpdate(vertexId, vertexStateUpdate); } } finally { - readLock.unlock(); + writeLock.unlock(); } } @@ -115,11 +193,18 @@ public class StateChangeNotifier { for (ListenerContainer listenerContainer : vertexListeners.get(vertexId)) { listenerContainer.sendStateUpdate(event); } - } - - private static final class ListenerContainer { + private void enqueueNotification(NotificationEvent event) { + try { + eventQueue.put(event); + addedEventToQueue(); + } catch (InterruptedException e) { + LOG.error("Failed to put event", e); + } + } + + private final class ListenerContainer { final VertexStateUpdateListener listener; final Set<org.apache.tez.dag.api.event.VertexState> states; @@ -135,7 +220,7 @@ public class StateChangeNotifier { private void sendStateUpdate(VertexStateUpdate stateUpdate) { if (states.contains(stateUpdate.getVertexState())) { - listener.onStateUpdated(stateUpdate); + enqueueNotification(new NotificationEvent(stateUpdate, listener)); } } http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 1c93dc6..9e55088 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -180,7 +180,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final AppContext appContext; private final UserGroupInformation dagUGI; private final ACLManager aclManager; - private final StateChangeNotifier entityUpdateTracker; + @VisibleForTesting + StateChangeNotifier entityUpdateTracker; volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>(); @VisibleForTesting @@ -586,7 +587,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, return jobPlan; } - EventHandler getEventHandler() { + @Override + public EventHandler getEventHandler() { return this.eventHandler; } http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 0be0aaa..bcea22c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -340,7 +340,9 @@ public class VertexManager { } @Override - public synchronized void onStateUpdated(VertexStateUpdate event) { + public void onStateUpdated(VertexStateUpdate event) { + // this is not called by the vertex manager plugin. + // no need to synchronize this. similar to other external notification methods enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStateUpdate(event)); } http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 0985d58..250446d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -19,8 +19,6 @@ package org.apache.tez.dag.app.rm; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java index 6a505ef..e6d1c31 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java @@ -30,18 +30,60 @@ import static org.mockito.Mockito.verify; import java.util.EnumSet; import java.util.Iterator; import java.util.List; - +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Lists; + import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; public class TestStateChangeNotifier { + + // uses the thread based notification code path but effectively blocks update + // events till listeners have been notified + public static class StateChangeNotifierForTest extends StateChangeNotifier { + AtomicInteger count = new AtomicInteger(0); + AtomicInteger totalCount = new AtomicInteger(0); + + public StateChangeNotifierForTest(DAG dag) { + super(dag); + } + + public void reset() { + count.set(0); + totalCount.set(0); + } + + @Override + protected void processedEventFromQueue() { + synchronized (count) { + if (count.decrementAndGet() == 0) { + count.notifyAll(); + } + } + } + + @Override + protected void addedEventToQueue() { + totalCount.incrementAndGet(); + synchronized (count) { + // processing may finish by the time we get here + if (count.incrementAndGet() > 0) { + try { + count.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + } @Test(timeout = 5000) public void testEventsOnRegistration() { @@ -51,7 +93,7 @@ public class TestStateChangeNotifier { Vertex v3 = createMockVertex(dagId, 3); DAG dag = createMockDag(dagId, v1, v2, v3); - StateChangeNotifier tracker = new StateChangeNotifier(dag); + StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag); // Vertex has sent one event notifyTracker(tracker, v1, VertexState.RUNNING); @@ -72,7 +114,6 @@ public class TestStateChangeNotifier { VertexState.SUCCEEDED), mockListener14); ArgumentCaptor<VertexStateUpdate> argumentCaptor = ArgumentCaptor.forClass(VertexStateUpdate.class); - verify(mockListener11, times(1)).onStateUpdated(argumentCaptor.capture()); assertEquals(VertexState.RUNNING, argumentCaptor.getValue().getVertexState()); @@ -85,8 +126,10 @@ public class TestStateChangeNotifier { verify(mockListener14, never()).onStateUpdated(any(VertexStateUpdate.class)); // Vertex has not notified of state + tracker.reset(); VertexStateUpdateListener mockListener2 = mock(VertexStateUpdateListener.class); tracker.registerForVertexUpdates(v2.getName(), null, mockListener2); + Assert.assertEquals(0, tracker.totalCount.get()); // there should no be any event sent out verify(mockListener2, never()).onStateUpdated(any(VertexStateUpdate.class)); // Vertex has notified about parallelism update only @@ -104,7 +147,7 @@ public class TestStateChangeNotifier { Vertex v1 = createMockVertex(dagId, 1); DAG dag = createMockDag(dagId, v1); - StateChangeNotifier tracker = new StateChangeNotifier(dag); + StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag); VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class); tracker.registerForVertexUpdates(v1.getName(), null, mockListener); @@ -139,7 +182,7 @@ public class TestStateChangeNotifier { Vertex v1 = createMockVertex(dagId, 1); DAG dag = createMockDag(dagId, v1); - StateChangeNotifier tracker = new StateChangeNotifier(dag); + StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag); VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class); tracker.registerForVertexUpdates(v1.getName(), null, mockListener); @@ -157,7 +200,7 @@ public class TestStateChangeNotifier { Vertex v1 = createMockVertex(dagId, 1); DAG dag = createMockDag(dagId, v1); - StateChangeNotifier tracker = new StateChangeNotifier(dag); + StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag); VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class); tracker.registerForVertexUpdates(v1.getName(), EnumSet.of( @@ -199,7 +242,7 @@ public class TestStateChangeNotifier { Vertex v1 = createMockVertex(dagId, 1); DAG dag = createMockDag(dagId, v1); - StateChangeNotifier tracker = new StateChangeNotifier(dag); + StateChangeNotifierForTest tracker = new StateChangeNotifierForTest(dag); VertexStateUpdateListener mockListener = mock(VertexStateUpdateListener.class); tracker.registerForVertexUpdates(v1.getName(), null, mockListener); http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 7e944ef..228d6b8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -33,9 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -46,7 +44,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -97,6 +94,7 @@ import org.apache.tez.dag.app.dag.DAGTerminationCause; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.VertexTerminationCause; @@ -786,6 +784,7 @@ public class TestDAGImpl { dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, "user", thh, appContext); + dag.entityUpdateTracker = new StateChangeNotifierForTest(dag); doReturn(dag).when(appContext).getCurrentDAG(); mrrAppContext = mock(AppContext.class); doReturn(aclManager).when(mrrAppContext).getAMACLManager(); @@ -796,6 +795,7 @@ public class TestDAGImpl { dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, "user", thh, mrrAppContext); + mrrDag.entityUpdateTracker = new StateChangeNotifierForTest(mrrDag); doReturn(conf).when(mrrAppContext).getAMConf(); doReturn(mrrDag).when(mrrAppContext).getCurrentDAG(); doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId(); @@ -810,6 +810,7 @@ public class TestDAGImpl { dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, "user", thh, groupAppContext); + groupDag.entityUpdateTracker = new StateChangeNotifierForTest(groupDag); doReturn(conf).when(groupAppContext).getAMConf(); doReturn(groupDag).when(groupAppContext).getCurrentDAG(); doReturn(appAttemptId).when(groupAppContext).getApplicationAttemptId(); @@ -858,6 +859,7 @@ public class TestDAGImpl { dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge, dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext); + dagWithCustomEdge.entityUpdateTracker = new StateChangeNotifierForTest(dagWithCustomEdge); doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf(); doReturn(execService).when(dagWithCustomEdgeAppContext).getExecService(); doReturn(dagWithCustomEdge).when(dagWithCustomEdgeAppContext).getCurrentDAG(); http://git-wip-us.apache.org/repos/asf/tez/blob/8386cca0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 891da23..c752965 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -114,6 +114,7 @@ import org.apache.tez.dag.app.dag.RootInputInitializerManager; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.VertexStateUpdateListener; @@ -221,7 +222,7 @@ public class TestVertexImpl { private VertexEventDispatcher vertexEventDispatcher; private DagEventDispatcher dagEventDispatcher; private HistoryEventHandler historyEventHandler; - private StateChangeNotifier updateTracker; + private StateChangeNotifierForTest updateTracker; private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption; public static class CountingOutputCommitter extends OutputCommitter { @@ -2174,7 +2175,7 @@ public class TestVertexImpl { for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) { vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo)); } - updateTracker = new StateChangeNotifier(appContext.getCurrentDAG()); + updateTracker = new StateChangeNotifierForTest(appContext.getCurrentDAG()); setupVertices(); when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() { @Override @@ -2243,6 +2244,7 @@ public class TestVertexImpl { @After public void teardown() { + updateTracker.stop(); if (dispatcher.isInState(STATE.STARTED)) { dispatcher.await(); dispatcher.stop(); @@ -3734,7 +3736,6 @@ public class TestVertexImpl { v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED)); } dispatcher.await(); - Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); // At this point, 3 events should have been received - since the dispatcher is complete. @@ -4108,7 +4109,6 @@ public class TestVertexImpl { while (v3.getState() != VertexState.RUNNING) { Thread.sleep(10); } - Assert.assertEquals(VertexState.RUNNING, v3.getState()); // Events should have been cleared from the vertex. Assert.assertEquals(0, v3.pendingInitializerEvents.size());
