Repository: tez Updated Branches: refs/heads/branch-0.9 2b3f7ac62 -> 5c8ac9516
TEZ-3932. TaskSchedulerManager can throw NullPointerException during DAGAppMaster container cleanup race (Jonathan Eagles via jlowe) (cherry picked from commit 72c458a431545565d38681862a883065f4efa44e) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5c8ac951 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5c8ac951 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5c8ac951 Branch: refs/heads/branch-0.9 Commit: 5c8ac9516fc779c629543694ad010016643e78ca Parents: 2b3f7ac Author: Jason Lowe <[email protected]> Authored: Wed May 9 10:25:32 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Wed May 9 10:28:26 2018 -0500 ---------------------------------------------------------------------- .../tez/dag/app/rm/TaskSchedulerManager.java | 41 ++++--- .../dag/app/rm/TestTaskSchedulerManager.java | 108 ++++++++++++++++++- 2 files changed, 136 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5c8ac951/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 5777a2a..61e3702 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -420,10 +420,14 @@ public class TaskSchedulerManager extends AbstractService implements // Inform the Node - the task has asked to be STOPPED / has already // stopped. // AMNodeImpl blacklisting logic does not account for KILLED attempts. - sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers(). - get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), - attemptContainerId, - attempt.getID(), event.getState() == TaskAttemptState.FAILED)); + AMContainer amContainer = appContext.getAllContainers().get(attemptContainerId); + // DAG can be shutting down so protect against container cleanup race + if (amContainer != null) { + Container container = amContainer.getContainer(); + sendEvent(new AMNodeEventTaskAttemptEnded(container.getNodeId(), event.getSchedulerId(), + attemptContainerId, + attempt.getID(), event.getState() == TaskAttemptState.FAILED)); + } } } @@ -436,9 +440,14 @@ public class TaskSchedulerManager extends AbstractService implements if (event.getUsedContainerId() != null) { sendEvent(new AMContainerEventTASucceeded(usedContainerId, event.getAttemptID())); - sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers(). - get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId, - event.getAttemptID())); + AMContainer amContainer = appContext.getAllContainers().get(usedContainerId); + // DAG can be shutting down so protect against container cleanup race + if (amContainer != null) { + Container container = amContainer.getContainer(); + sendEvent(new AMNodeEventTaskAttemptSucceeded(container.getNodeId(), event.getSchedulerId(), + usedContainerId, + event.getAttemptID())); + } } boolean wasContainerAllocated = false; @@ -742,10 +751,15 @@ public class TaskSchedulerManager extends AbstractService implements // because the deallocateTask downcall may have raced with the // taskAllocated() upcall assert task.equals(taskAttempt); - - if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) { - sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(), - event.getContainerContext(), event.getLauncherId(), event.getTaskCommId())); + + AMContainer amContainer = appContext.getAllContainers().get(containerId); + // Even though we just added this container, + // DAG can be shutting down so protect against container cleanup race + if (amContainer != null) { + if (amContainer.getState() == AMContainerState.ALLOCATED) { + sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(), + event.getContainerContext(), event.getLauncherId(), event.getTaskCommId())); + } } sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(), event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event @@ -951,7 +965,10 @@ public class TaskSchedulerManager extends AbstractService implements // An AMContainer instance should already exist if an attempt is being made to preempt it AMContainer amContainer = appContext.getAllContainers().get(containerId); try { - taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId); + // DAG can be shutting down so protect against container cleanup race + if (amContainer != null) { + taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId); + } } catch (Exception e) { String msg = "Error in TaskScheduler when preempting container" + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(amContainer.getTaskSchedulerIdentifier(), appContext) http://git-wip-us.apache.org/repos/asf/tez/blob/5c8ac951/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 5df25de..dcf9a5d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -70,6 +70,7 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.client.DAGClientServer; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; @@ -84,6 +85,7 @@ import org.apache.tez.dag.app.dag.impl.VertexImpl; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA; import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; +import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.AMContainerState; @@ -217,6 +219,79 @@ public class TestTaskSchedulerManager { assertEquals(mockAttemptId, assignEvent.getTaskAttemptId()); } + @Test(timeout = 5000) + public void testTASucceededAfterContainerCleanup() throws Exception { + Configuration conf = new Configuration(false); + schedulerHandler.init(conf); + schedulerHandler.start(); + + TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); + TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class); + when(mockAttemptId.getId()).thenReturn(0); + when(mockTaskAttempt.getID()).thenReturn(mockAttemptId); + Resource resource = Resource.newInstance(1024, 1); + ContainerContext containerContext = + new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(), + new HashMap<String, String>(), ""); + int priority = 10; + TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(new HashSet<String>(), null); + + ContainerId mockCId = mock(ContainerId.class); + Container container = mock(Container.class); + when(container.getId()).thenReturn(mockCId); + + AMContainer mockAMContainer = mock(AMContainer.class); + when(mockAMContainer.getContainerId()).thenReturn(mockCId); + when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE); + + // Returning null container will replicate container cleanup scenario + when(mockAMContainerMap.get(mockCId)).thenReturn(null); + + AMSchedulerEventTALaunchRequest lr = + new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, + priority, containerContext, 0, 0, 0); + schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); + assertEquals(1, mockEventHandler.events.size()); + assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA); + AMContainerEventAssignTA assignEvent = + (AMContainerEventAssignTA) mockEventHandler.events.get(0); + assertEquals(priority, assignEvent.getPriority()); + assertEquals(mockAttemptId, assignEvent.getTaskAttemptId()); + } + + @Test(timeout = 5000) + public void testTAUnsuccessfulAfterContainerCleanup() throws Exception { + Configuration conf = new Configuration(false); + schedulerHandler.init(conf); + schedulerHandler.start(); + + TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class); + TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class); + when(mockAttemptId.getId()).thenReturn(0); + when(mockTaskAttempt.getID()).thenReturn(mockAttemptId); + + ContainerId mockCId = mock(ContainerId.class); + Container container = mock(Container.class); + when(container.getId()).thenReturn(mockCId); + + AMContainer mockAMContainer = mock(AMContainer.class); + when(mockAMContainer.getContainerId()).thenReturn(mockCId); + when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE); + when(mockTaskAttempt.getAssignedContainerID()).thenReturn(mockCId); + + // Returning null container will replicate container cleanup scenario + when(mockAMContainerMap.get(mockCId)).thenReturn(null); + + schedulerHandler.handleEvent( + new AMSchedulerEventTAEnded( + mockTaskAttempt, mockCId, TaskAttemptState.KILLED, null, null, 0)); + assertEquals(1, mockEventHandler.events.size()); + assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventStopRequest); + AMContainerEventStopRequest stopEvent = + (AMContainerEventStopRequest) mockEventHandler.events.get(0); + assertEquals(mockCId, stopEvent.getContainerId()); + } + @Test (timeout = 5000) public void testTaskBasedAffinity() throws Exception { Configuration conf = new Configuration(false); @@ -288,7 +363,7 @@ public class TestTaskSchedulerManager { schedulerHandler.stop(); schedulerHandler.close(); } - + @Test (timeout = 5000) public void testContainerInternalPreempted() throws IOException, ServicePluginException { Configuration conf = new Configuration(false); @@ -318,6 +393,37 @@ public class TestTaskSchedulerManager { schedulerHandler.stop(); schedulerHandler.close(); } + + @Test(timeout = 5000) + public void testContainerInternalPreemptedAfterContainerCleanup() throws IOException, ServicePluginException { + Configuration conf = new Configuration(false); + schedulerHandler.init(conf); + schedulerHandler.start(); + + AMContainer mockAmContainer = mock(AMContainer.class); + when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0); + when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0); + when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0); + ContainerId mockCId = mock(ContainerId.class); + verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId) any()); + // Returning null container will replicate container cleanup scenario + when(mockAMContainerMap.get(mockCId)).thenReturn(null); + schedulerHandler.preemptContainer(0, mockCId); + verify(mockTaskScheduler, times(0)).deallocateContainer(mockCId); + assertEquals(1, mockEventHandler.events.size()); + Event event = mockEventHandler.events.get(0); + assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); + AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; + assertEquals(mockCId, completedEvent.getContainerId()); + assertEquals("Container preempted internally", completedEvent.getDiagnostics()); + assertTrue(completedEvent.isPreempted()); + Assert.assertFalse(completedEvent.isDiskFailed()); + assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, + completedEvent.getTerminationCause()); + + schedulerHandler.stop(); + schedulerHandler.close(); + } @Test (timeout = 5000) public void testContainerDiskFailed() throws IOException {
