Repository: tez Updated Branches: refs/heads/master 9ca694789 -> 414258e40
TEZ-808. Handle task attempts that are not making progress (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/414258e4 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/414258e4 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/414258e4 Branch: refs/heads/master Commit: 414258e40ff72961c6d1e6fb287b3d2c9732a7a8 Parents: 9ca6947 Author: Bikas Saha <[email protected]> Authored: Thu Oct 29 22:10:56 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Oct 29 22:10:56 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 14 +++ .../tez/runtime/api/ProcessorContext.java | 4 +- .../org/apache/tez/runtime/api/TaskContext.java | 10 ++ .../records/TaskAttemptTerminationCause.java | 1 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 25 +++++ .../apache/tez/dag/app/MockDAGAppMaster.java | 3 +- .../dag/app/TestTaskCommunicatorManager1.java | 4 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 98 +++++++++++++++++++- .../org/apache/tez/runtime/RuntimeTask.java | 11 +++ .../api/events/TaskStatusUpdateEvent.java | 11 ++- .../api/impl/TezProcessorContextImpl.java | 1 + .../runtime/api/impl/TezTaskContextImpl.java | 5 + .../apache/tez/runtime/task/TaskReporter.java | 4 +- .../TestLogicalIOProcessorRuntimeTask.java | 6 ++ .../runtime/api/impl/TestProcessorContext.java | 8 +- .../tez/runtime/task/TestTaskReporter.java | 10 ++ .../org/apache/tez/test/TestFaultTolerance.java | 30 +++++- 18 files changed, 232 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 96300fa..03ccdeb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2679. Admin forms of launch env settings ALL CHANGES: + TEZ-808. Handle task attempts that are not making progress TEZ-2553. Tez UI: Tez UI Nits TEZ-2814. ATSImportTool has a return statement in a finally block TEZ-2906. Compilation fails with hadoop 2.2.0 http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index ac3dd4a..0ea8999 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -690,6 +690,20 @@ public class TezConfiguration extends Configuration { public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX + "max-events-per-heartbeat"; public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500; + + /** + * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output + * components need to make successive progress notifications. If the progress is not notified + * for this interval then the task will be considered hung and terminated. + * The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS} + * and larger than 2 times the value of {@link TezConfiguration#TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS}. + * A config value <=0 disables this. + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty + public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = TEZ_TASK_PREFIX + + "progress.stuck.interval-ms"; + public static final long TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT = -1; /** * Whether to generate counters per IO or not. Enabling this will rename http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java index 2ac6e4c..8b88289 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java @@ -31,7 +31,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; public interface ProcessorContext extends TaskContext { /** - * Set the overall progress of this Task Attempt + * Set the overall progress of this Task Attempt. + * This automatically results in invocation of {@link ProcessorContext#notifyProgress()} + * and so invoking that separately is not required. * @param progress Progress in the range from [0.0 - 1.0f] */ public void setProgress(float progress); http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java index 92d5575..457b0de 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java @@ -120,6 +120,16 @@ public interface TaskContext { * @return {@link ObjectRegistry} */ public ObjectRegistry getObjectRegistry(); + + /** + * Notifies the framework that progress is being made by this component. + * This is used to identify hung components that are not making progress. + * Must be called periodically until processing has completed for this component. + * Care must be taken to call this when real progress has been made. Simply + * calling this continuously from a thread without regard to real work may prevent + * identification of hung components and delay/stall job completion. + */ + public void notifyProgress(); /** * Report a fatal error to the framework. This will cause the entire task to http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java index 7112d9e..a5214fb 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java @@ -34,6 +34,7 @@ public enum TaskAttemptTerminationCause { INPUT_READ_ERROR, // Failed due to error in reading inputs OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs OUTPUT_LOST, // Failed because attempts output were reported lost + NO_PROGRESS, // Failed because no progress was being made TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task CONTAINER_LAUNCH_FAILED, // Failed to launch container http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 35a23f9..27eb69b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -191,6 +191,8 @@ public class TaskAttemptImpl implements TaskAttempt, private DAGCounter localityCounter; org.apache.tez.runtime.api.impl.TaskStatistics statistics; + + long lastNotifyProgressTimestamp = 0; // Used to store locality information when Set<String> taskHosts = new HashSet<String>(); @@ -1372,6 +1374,29 @@ public class TaskAttemptImpl implements TaskAttempt, ta.reportedStatus.progress = statusEvent.getProgress(); ta.reportedStatus.counters = statusEvent.getCounters(); ta.statistics = statusEvent.getStatistics(); + if (statusEvent.getProgressNotified()) { + ta.lastNotifyProgressTimestamp = ta.clock.getTime(); + } else { + long currTime = ta.clock.getTime(); + long hungIntervalMax = ta.conf.getLong( + TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, + TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT); + if (hungIntervalMax > 0 && + currTime - ta.lastNotifyProgressTimestamp > hungIntervalMax) { + // task is hung + String diagnostics = "Attempt failed because it appears to make no progress for " + + hungIntervalMax + "ms"; + LOG.info(diagnostics + " " + ta.getID()); + // send event that will fail this attempt + ta.sendEvent( + new TaskAttemptEventAttemptFailed(ta.getID(), + TaskAttemptEventType.TA_FAILED, + diagnostics, + TaskAttemptTerminationCause.NO_PROGRESS) + ); + } + } + if (sEvent.getReadErrorReported()) { // if there is a read error then track the next last data event ta.appendNextDataEvent = true; http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 3cab2da..f2130ab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -414,7 +414,8 @@ public class MockDAGAppMaster extends DAGAppMaster { cData.numUpdates++; float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1; float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f; - events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData( + events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats, false), + new EventMetaData( EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), MockDAGAppMaster.this.getContext().getClock().getTime())); TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events, http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java index 03b7da9..17fa4d9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -235,7 +235,7 @@ public class TestTaskCommunicatorManager1 { @Test (timeout = 5000) public void testTaskEventRouting() throws Exception { List<TezEvent> events = Arrays.asList( - new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null), + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null), new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null), new TezEvent(new TaskAttemptCompletedEvent(), null) ); @@ -264,7 +264,7 @@ public class TestTaskCommunicatorManager1 { @Test (timeout = 5000) public void testTaskEventRoutingWithReadError() throws Exception { List<TezEvent> events = Arrays.asList( - new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null), + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null), new TezEvent(InputReadErrorEvent.create("", 0, 0), null), new TezEvent(new TaskAttemptCompletedEvent(), null) ); http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 3cf3309..17295cd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -646,7 +647,7 @@ public class TestTaskAttempt { when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2); when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2); TaskAttemptEventStatusUpdate statusEvent = - new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)); + new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)); assertEquals(0, taImpl.lastDataEvents.size()); taImpl.setLastEventSent(mockTezEvent1); @@ -729,7 +730,7 @@ public class TestTaskAttempt { arg.getAllValues().subList(0, expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); - taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null))); + taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0", TaskAttemptTerminationCause.APPLICATION_ERROR)); @@ -769,7 +770,95 @@ public class TestTaskAttempt { arg.getAllValues().subList(expectedEventsAtRunning, expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2); } - + + @Test (timeout = 5000) + public void testNoProgressFail() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + Clock mockClock = mock(Clock.class); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, mockClock, + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + verify(mockHeartbeatHandler).register(taskAttemptID); + + when(mockClock.getTime()).thenReturn(100l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true))); + // invocations and time updated + assertEquals(100l, taImpl.lastNotifyProgressTimestamp); + when(mockClock.getTime()).thenReturn(150l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true))); + // invocations and time updated + assertEquals(150l, taImpl.lastNotifyProgressTimestamp); + when(mockClock.getTime()).thenReturn(200l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + // invocations and time not updated + assertEquals(150l, taImpl.lastNotifyProgressTimestamp); + when(mockClock.getTime()).thenReturn(250l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + // invocations and time not updated + assertEquals(150l, taImpl.lastNotifyProgressTimestamp); + // failed event sent to self + verify(eventHandler, atLeast(1)).handle(arg.capture()); + TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue(); + assertEquals(taImpl.getID(), fEvent.getTaskAttemptID()); + assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause()); + taImpl.handle(fEvent); + + assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_IN_PROGRESS); + verify(mockHeartbeatHandler).unregister(taskAttemptID); + assertEquals(1, taImpl.getDiagnostics().size()); + assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, taImpl.getTerminationCause()); + } + @Test(timeout = 5000) public void testEventSerializingHash() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); @@ -858,7 +947,8 @@ public class TestTaskAttempt { arg.getAllValues().subList(0, expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); - taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null))); + taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, + new TaskStatusUpdateEvent(null, 0.1f, null, false))); taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index c9c6ba1..23e57b1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -54,6 +54,7 @@ public abstract class RuntimeTask { private final AtomicBoolean taskDone; private final TaskCounterUpdater counterUpdater; private final TaskStatistics statistics; + private volatile boolean progressNotified; protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) { @@ -104,6 +105,16 @@ public abstract class RuntimeTask { this.fatalErrorMessage = message; } + public void notifyProgressInvocation() { + progressNotified = true; + } + + public boolean getAndClearProgressNotification() { + boolean retVal = progressNotified; + progressNotified = false; + return retVal; + } + public Throwable getFatalError() { return this.fatalError.get(); } http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java index 6465bed..518cbf4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java @@ -31,15 +31,18 @@ public class TaskStatusUpdateEvent extends Event implements Writable { private TezCounters tezCounters; private float progress; + boolean progressNotified; private TaskStatistics statistics; public TaskStatusUpdateEvent() { } - public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics) { + public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics, + boolean progressNotified) { this.tezCounters = tezCounters; this.progress = progress; this.statistics = statistics; + this.progressNotified = progressNotified; } public TezCounters getCounters() { @@ -53,10 +56,15 @@ public class TaskStatusUpdateEvent extends Event implements Writable { public TaskStatistics getStatistics() { return statistics; } + + public boolean getProgressNotified() { + return progressNotified; + } @Override public void write(DataOutput out) throws IOException { out.writeFloat(progress); + out.writeBoolean(progressNotified); if (tezCounters != null) { out.writeBoolean(true); tezCounters.write(out); @@ -74,6 +82,7 @@ public class TaskStatusUpdateEvent extends Event implements Writable { @Override public void readFields(DataInput in) throws IOException { progress = in.readFloat(); + progressNotified = in.readBoolean(); if (in.readBoolean()) { tezCounters = new TezCounters(); tezCounters.readFields(in); http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index 6dc30ff..0c3283d 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -94,6 +94,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce @Override public void setProgress(float progress) { runtimeTask.setProgress(progress); + notifyProgress(); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index 5f04c80..211f9d7 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -174,6 +174,11 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { } @Override + public void notifyProgress() { + runtimeTask.notifyProgressInvocation(); + } + + @Override public ByteBuffer getServiceConsumerMetaData(String serviceName) { return (ByteBuffer) serviceConsumerMetadata.get(serviceName) .asReadOnlyBuffer().rewind(); http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 263300e..30a1b9c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -347,15 +347,17 @@ public class TaskReporter implements TaskReporterInterface { TezCounters counters = null; TaskStatistics stats = null; float progress = 0; + boolean progressNotified = false; if (task.hasInitialized()) { progress = task.getProgress(); + progressNotified = task.getAndClearProgressNotification(); if (sendCounters) { // send these potentially large objects at longer intervals to avoid overloading the AM counters = task.getCounters(); stats = task.getTaskStatistics(); } } - return new TaskStatusUpdateEvent(counters, progress, stats); + return new TaskStatusUpdateEvent(counters, progress, stats, progressNotified); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index 0fc3919..12fec7e 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -93,6 +93,9 @@ public class TestLogicalIOProcessorRuntimeTask { assertEquals(1, TestProcessor.runCount); assertEquals(1, TestInput.startCount); assertEquals(0, TestOutput.startCount); + // test that invocations of progress are counted correctly + assertEquals(true, lio1.getAndClearProgressNotification()); + assertEquals(false, lio1.getAndClearProgressNotification()); // cleared after getting assertEquals(30, TestInput.vertexParallelism); assertEquals(0, TestOutput.vertexParallelism); assertEquals(30, lio1.getProcessorContext().getVertexParallelism()); @@ -237,6 +240,7 @@ public class TestLogicalIOProcessorRuntimeTask { public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { runCount++; + getContext().notifyProgress(); } @Override @@ -271,6 +275,7 @@ public class TestLogicalIOProcessorRuntimeTask { public void start() throws Exception { startCount++; this.vertexParallelism = getContext().getVertexParallelism(); + getContext().notifyProgress(); } @Override @@ -310,6 +315,7 @@ public class TestLogicalIOProcessorRuntimeTask { System.err.println("Out started"); startCount++; this.vertexParallelism = getContext().getVertexParallelism(); + getContext().notifyProgress(); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java index e28df3a..ff94e7f 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java @@ -17,8 +17,7 @@ package org.apache.tez.runtime.api.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; import java.nio.ByteBuffer; import java.util.Arrays; @@ -97,6 +96,9 @@ public class TestProcessorContext { assertEquals(vertexName, procContext.getTaskVertexName()); assertEquals(vertexId.getId(), procContext.getTaskVertexIndex()); assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs())); - + + // test auto call of notifyProgress + procContext.setProgress(0.1f); + verify(runtimeTask, times(1)).notifyProgressInvocation(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java index b44c9f8..e137d50 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java @@ -116,6 +116,8 @@ public class TestTaskReporter { LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class); doReturn("vertexName").when(mockTask).getVertexName(); doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID(); + boolean progressNotified = false; + doReturn(progressNotified).when(mockTask).getAndClearProgressNotification(); TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class); float progress = 0.5f; @@ -136,9 +138,11 @@ public class TestTaskReporter { TaskStatusUpdateEvent event = heartbeatCallable.getStatusUpdateEvent(true); verify(mockTask, times(1)).hasInitialized(); verify(mockTask, times(0)).getProgress(); + verify(mockTask, times(0)).getAndClearProgressNotification(); verify(mockTask, times(0)).getTaskStatistics(); verify(mockTask, times(0)).getCounters(); Assert.assertEquals(0, event.getProgress(), 0); + Assert.assertEquals(false, event.getProgressNotified()); Assert.assertNull(event.getCounters()); Assert.assertNull(event.getStatistics()); @@ -147,20 +151,26 @@ public class TestTaskReporter { event = heartbeatCallable.getStatusUpdateEvent(false); verify(mockTask, times(2)).hasInitialized(); verify(mockTask, times(1)).getProgress(); + verify(mockTask, times(1)).getAndClearProgressNotification(); verify(mockTask, times(0)).getTaskStatistics(); verify(mockTask, times(0)).getCounters(); Assert.assertEquals(progress, event.getProgress(), 0); + Assert.assertEquals(progressNotified, event.getProgressNotified()); Assert.assertNull(event.getCounters()); Assert.assertNull(event.getStatistics()); // task is initialized - progress obtained and also counters since flag is true + progressNotified = true; + doReturn(progressNotified).when(mockTask).getAndClearProgressNotification(); doReturn(true).when(mockTask).hasInitialized(); event = heartbeatCallable.getStatusUpdateEvent(true); verify(mockTask, times(3)).hasInitialized(); verify(mockTask, times(2)).getProgress(); + verify(mockTask, times(2)).getAndClearProgressNotification(); verify(mockTask, times(1)).getTaskStatistics(); verify(mockTask, times(1)).getCounters(); Assert.assertEquals(progress, event.getProgress(), 0); + Assert.assertEquals(progressNotified, event.getProgressNotified()); Assert.assertEquals(counters, event.getCounters()); Assert.assertEquals(stats, event.getStatistics()); http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index ec89c4b..011e91d 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -23,6 +23,9 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -90,6 +93,7 @@ public class TestFaultTolerance { tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); tezConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.4); tezConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, 3); + tezConf.setInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, 100); tezSession = TezClient.create("TestFaultTolerance", tezConf, true); tezSession.start(); @@ -117,6 +121,11 @@ public class TestFaultTolerance { } void runDAGAndVerify(DAG dag, DAGStatus.State finalState, int checkFailedAttempts) throws Exception { + runDAGAndVerify(dag, finalState, checkFailedAttempts, null); + } + + void runDAGAndVerify(DAG dag, DAGStatus.State finalState, int checkFailedAttempts, + String diagnostics) throws Exception { tezSession.waitTillReady(); DAGClient dagClient = tezSession.submitDAG(dag); DAGStatus dagStatus = dagClient.getDAGStatus(null); @@ -129,12 +138,17 @@ public class TestFaultTolerance { dagStatus = dagClient.getDAGStatus(null); } + Assert.assertEquals(finalState, dagStatus.getState()); + if (checkFailedAttempts > 0) { Assert.assertEquals(checkFailedAttempts, dagStatus.getDAGProgress().getFailedTaskAttemptCount()); } - Assert.assertEquals(finalState, dagStatus.getState()); + if (diagnostics != null) { + Assert.assertNotNull(dagStatus.getDiagnostics()); + Assert.assertTrue(Joiner.on(":").join(dagStatus.getDiagnostics()).contains(diagnostics)); + } } @Test (timeout=60000) @@ -749,4 +763,18 @@ public class TestFaultTolerance { runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); } + @Test (timeout=240000) + public void testNoProgress() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, 1000*100); // long sleep + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + DAG dag = SimpleTestDAG.createDAG(testConf); + Vertex hung = dag.getVertex("v1"); + hung.setConf(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, Long.toString(1000)); + hung.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, Integer.toString(2)); + + // dag will fail with 2 attempts failing from vertex v1 + runDAGAndVerify(dag, DAGStatus.State.FAILED, 2, "no progress"); + } + }
