Repository: tez Updated Branches: refs/heads/master 68f4cf93a -> c852dbecf
TEZ-3972. Tez DAG can hang when a single task fails to fetch (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c852dbec Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c852dbec Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c852dbec Branch: refs/heads/master Commit: c852dbecf5690dbf922d427701b0a3e8e7283f69 Parents: 68f4cf9 Author: Jonathan Eagles <[email protected]> Authored: Tue Sep 18 17:14:44 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue Sep 18 17:14:44 2018 -0500 ---------------------------------------------------------------------- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 108 ++++++++++++++++++- 2 files changed, 110 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c852dbec/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 6ad41f8..bbec9ea 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1797,9 +1797,9 @@ public class TaskAttemptImpl implements TaskAttempt, int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000); boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC; - float failureFraction = ((float) attempt.uniquefailedOutputReports.size()) - / outputFailedEvent.getConsumerTaskNumber(); - + int runningTasks = attempt.appContext.getCurrentDAG().getVertex( + failedDestTaId.getTaskID().getVertexID()).getRunningTasks(); + float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size()) / runningTasks : 0; boolean withinFailureFractionLimits = (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION); boolean withinOutputFailureLimits = http://git-wip-us.apache.org/repos/asf/tez/blob/c852dbec/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 503e418..5ab68f7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -1820,6 +1820,7 @@ public class TestTaskAttempt { doReturn(containers).when(appCtx).getAllContainers(); HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); + DAGImpl mockDAG = mock(DAGImpl.class); TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, @@ -1852,6 +1853,14 @@ public class TestTaskAttempt { EventMetaData mockMeta = mock(EventMetaData.class); TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class); when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); + TezTaskID destTaskID = mock(TezTaskID.class); + TezVertexID destVertexID = mock(TezVertexID.class); + when(mockDestId1.getTaskID()).thenReturn(destTaskID); + when(destTaskID.getVertexID()).thenReturn(destVertexID); + Vertex destVertex = mock(VertexImpl.class); + when(destVertex.getRunningTasks()).thenReturn(11); + when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex); + when(appCtx.getCurrentDAG()).thenReturn(mockDAG); TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta); taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); @@ -1868,7 +1877,14 @@ public class TestTaskAttempt { // different destination attempt reports error. now threshold crossed TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class); - when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2); + when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2); + destTaskID = mock(TezTaskID.class); + destVertexID = mock(TezVertexID.class); + when(mockDestId2.getTaskID()).thenReturn(destTaskID); + when(destTaskID.getVertexID()).thenReturn(destVertexID); + destVertex = mock(VertexImpl.class); + when(destVertex.getRunningTasks()).thenReturn(11); + when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex); taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); assertEquals("Task attempt is not in FAILED state", taImpl.getState(), @@ -1923,6 +1939,7 @@ public class TestTaskAttempt { mockReEvent = InputReadErrorEvent.create("", 1, 1); mockMeta = mock(EventMetaData.class); mockDestId1 = mock(TezTaskAttemptID.class); + when(mockDestId1.getTaskID()).thenReturn(destTaskID); when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); tzEvent = new TezEvent(mockReEvent, mockMeta); //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as @@ -1957,9 +1974,11 @@ public class TestTaskAttempt { mockReEvent = InputReadErrorEvent.create("", 1, 1); mockMeta = mock(EventMetaData.class); mockDestId1 = mock(TezTaskAttemptID.class); + when(mockDestId1.getTaskID()).thenReturn(destTaskID); when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); tzEvent = new TezEvent(mockReEvent, mockMeta); when(mockClock.getTime()).thenReturn(1000L); + when(destVertex.getRunningTasks()).thenReturn(1000); // time deadline not exceeded for a couple of read error events taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000)); assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), @@ -1978,6 +1997,93 @@ public class TestTaskAttempt { verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID3); } + @Test(timeout = 60000) + public void testTAFailureBasedOnRunningTasks() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler mockEh = new MockEventHandler(); + MockEventHandler eventHandler = spy(mockEh); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); + doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); + DAGImpl mockDAG = mock(DAGImpl.class); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, new SystemClock(), + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); + verify(mockHeartbeatHandler).register(taskAttemptID); + taImpl.handle(new TaskAttemptEvent(taskAttemptID, + TaskAttemptEventType.TA_DONE)); + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + verify(mockHeartbeatHandler).unregister(taskAttemptID); + + int expectedEventsTillSucceeded = 8; + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class); + verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture()); + verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and finish + DAGHistoryEvent histEvent = histArg.getValue(); + TaskAttemptFinishedEvent finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent(); + long finishTime = finishEvent.getFinishTime(); + verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2); + + InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1); + EventMetaData mockMeta = mock(EventMetaData.class); + TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class); + when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); + TezTaskID destTaskID = mock(TezTaskID.class); + TezVertexID destVertexID = mock(TezVertexID.class); + when(mockDestId1.getTaskID()).thenReturn(destTaskID); + when(destTaskID.getVertexID()).thenReturn(destVertexID); + Vertex destVertex = mock(VertexImpl.class); + when(destVertex.getRunningTasks()).thenReturn(5); + when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex); + when(appCtx.getCurrentDAG()).thenReturn(mockDAG); + TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta); + taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11)); + + // failure threshold is met due to running tasks. state is FAILED + assertEquals("Task attempt is not in FAILED state", taImpl.getState(), + TaskAttemptState.FAILED); + } + @SuppressWarnings("deprecation") @Test(timeout = 5000) public void testKilledInNew() throws ServicePluginException {
