Repository: tez Updated Branches: refs/heads/master 4d6690ae1 -> 3b2933f01
TEZ-3847. AM web controller task counters are empty sometimes. Contributed by Jonathan Eagles Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b2933f0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b2933f0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b2933f0 Branch: refs/heads/master Commit: 3b2933f01d9cf3a431f8230d4a3a7f83e7099788 Parents: 4d6690a Author: Jason Lowe <[email protected]> Authored: Mon Oct 2 17:12:52 2017 -0500 Committer: Jason Lowe <[email protected]> Committed: Mon Oct 2 17:12:52 2017 -0500 ---------------------------------------------------------------------- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 ++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 66 ++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3b2933f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 1fe65a9..1218543 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1575,8 +1575,12 @@ public class TaskAttemptImpl implements TaskAttempt, TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent(); ta.reportedStatus.state = ta.getState(); ta.reportedStatus.progress = statusEvent.getProgress(); - ta.reportedStatus.counters = statusEvent.getCounters(); - ta.statistics = statusEvent.getStatistics(); + if (statusEvent.getCounters() != null) { + ta.reportedStatus.counters = statusEvent.getCounters(); + } + if (statusEvent.getStatistics() != null) { + ta.statistics = statusEvent.getStatistics(); + } if (statusEvent.getProgressNotified()) { ta.lastNotifyProgressTimestamp = ta.clock.getTime(); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/3b2933f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 7709bc0..2bad2ef 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.tez.common.MockDNSToSwitchMapping; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; @@ -1072,6 +1073,71 @@ public class TestTaskAttempt { Assert.assertTrue("This should have been an attempt failed event!", arg.getValue() instanceof TaskAttemptEventAttemptFailed); } + @Test + public void testStatusUpdateWithNullCounters() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, new SystemClock(), + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), TaskAttemptState.RUNNING); + verify(mockHeartbeatHandler).register(taskAttemptID); + + TezCounters counters = new TezCounters(); + counters.findCounter("group", "counter").increment(1); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false))); + assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue()); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + assertEquals(1, taImpl.getCounters().findCounter("group", "counter").getValue()); + counters.findCounter("group", "counter").increment(1); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(counters, 0.1f, null, false))); + assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue()); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue()); + } + @Test (timeout = 5000) public void testNoProgressFail() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2);
