Updated Branches: refs/heads/TEZ-1 39d725c27 -> c86c279d2
TEZ-110. Port TaskAttempt and Task unit tests from trunk. Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c86c279d Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c86c279d Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c86c279d Branch: refs/heads/TEZ-1 Commit: c86c279d23ca12a5e4d70dcb5ba01faa9ac6b6e8 Parents: 39d725c Author: Siddharth Seth <[email protected]> Authored: Thu May 9 14:11:15 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu May 9 14:11:15 2013 -0700 ---------------------------------------------------------------------- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 21 +- .../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 25 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 - .../tez/dag/app/dag/impl/TestTaskAttempt.java | 581 +++++++++++++++ .../apache/tez/dag/app/dag/impl/TestTaskImpl.java | 461 ++++++++++++ 5 files changed, 1062 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 6922b44..c907e48 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -112,7 +111,6 @@ public class TaskAttemptImpl implements TaskAttempt, private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable? protected final TezConfiguration conf; - protected final Path jobFile; protected final int partition; @SuppressWarnings("rawtypes") protected EventHandler eventHandler; @@ -141,11 +139,11 @@ public class TaskAttemptImpl implements TaskAttempt, private TaskAttemptStatus reportedStatus; protected final TaskLocationHint locationHint; - private final Resource taskResource; - private final Map<String, LocalResource> localResources; - private final Map<String, String> environment; - private final String javaOpts; - private final boolean isRescheduled; + protected final Resource taskResource; + protected final Map<String, LocalResource> localResources; + protected final Map<String, String> environment; + protected final String javaOpts; + protected final boolean isRescheduled; private boolean speculatorContainerRequestSent = false; protected String processorName; @@ -255,7 +253,7 @@ public class TaskAttemptImpl implements TaskAttempt, // TODO Remove TaskAttemptListener from the constructor. @SuppressWarnings("rawtypes") public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, - TaskAttemptListener tal, Path jobFile, int partition, + TaskAttemptListener tal, int partition, TezConfiguration conf, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, @@ -269,7 +267,6 @@ public class TaskAttemptImpl implements TaskAttempt, this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber); this.eventHandler = eventHandler; //Reported status - this.jobFile = jobFile; this.partition = partition; this.conf = conf; this.jobToken = jobToken; @@ -787,7 +784,7 @@ public class TaskAttemptImpl implements TaskAttempt, } @SuppressWarnings("unchecked") - private void logJobHistoryAttemptStarted() { + protected void logJobHistoryAttemptStarted() { TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getTask().getVertex().getName(), launchTime, containerId, containerNodeId); @@ -797,7 +794,7 @@ public class TaskAttemptImpl implements TaskAttempt, } @SuppressWarnings("unchecked") - private void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) { + protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) { //Log finished events only if an attempt started. if (getLaunchTime() == 0) return; @@ -811,7 +808,7 @@ public class TaskAttemptImpl implements TaskAttempt, } @SuppressWarnings("unchecked") - private void logJobHistoryAttemptUnsuccesfulCompletion( + protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state) { TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getTask().getVertex().getName(), http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index e4538aa..cf49dbd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -28,12 +28,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -56,8 +55,8 @@ import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; -import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; +import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; @@ -85,7 +84,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private static final Log LOG = LogFactory.getLog(TaskImpl.class); protected final TezConfiguration conf; - protected final Path jobFile; protected final int partition; protected final TaskAttemptListener taskAttemptListener; protected final TaskHeartbeatHandler taskHeartbeatHandler; @@ -106,9 +104,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { protected Token<JobTokenIdentifier> jobToken; protected String processorName; protected TaskLocationHint locationHint; - private Resource taskResource; - private Map<String, LocalResource> localResources; - private Map<String, String> environment; + protected Resource taskResource; + protected Map<String, LocalResource> localResources; + protected Map<String, String> environment; // counts the number of attempts that are either running or in a state where // they will come to be running when they get a Container @@ -256,7 +254,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private final boolean leafVertex; - private String javaOpts; + protected String javaOpts; @Override public TaskState getState() { @@ -269,7 +267,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } public TaskImpl(TezVertexID vertexId, int partition, - EventHandler eventHandler, Path remoteJobConfFile, TezConfiguration conf, + EventHandler eventHandler, TezConfiguration conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, @@ -286,7 +284,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { String javaOpts) { this.conf = conf; this.clock = clock; - this.jobFile = remoteJobConfFile; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readLock = readWriteLock.readLock(); writeLock = readWriteLock.writeLock(); @@ -605,7 +602,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskAttemptImpl createAttempt(int attemptNumber) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, - taskAttemptListener, null, 0, conf, + taskAttemptListener, 0, conf, jobToken, credentials, clock, taskHeartbeatHandler, appContext, processorName, locationHint, taskResource, localResources, environment, javaOpts, (failedAttempts>0)); @@ -797,14 +794,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { return ""; } - private void logJobHistoryTaskStartedEvent() { + protected void logJobHistoryTaskStartedEvent() { TaskStartedEvent startEvt = new TaskStartedEvent(taskId, getVertex().getName(), scheduledTime, getLaunchTime()); this.eventHandler.handle(new DAGHistoryEvent( taskId.getVertexID().getDAGId(), startEvt)); } - private void logJobHistoryTaskFinishedEvent() { + protected void logJobHistoryTaskFinishedEvent() { // FIXME need to handle getting finish time as this function // is called from within a transition TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId, @@ -813,7 +810,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { taskId.getVertexID().getDAGId(), finishEvt)); } - private void logJobHistoryTaskFailedEvent(TaskState finalState) { + protected void logJobHistoryTaskFailedEvent(TaskState finalState) { TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId, getVertex().getName(), clock.getTime(), finalState); this.eventHandler.handle(new DAGHistoryEvent( http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f74b172..80de958 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -822,7 +822,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, TaskImpl task = new TaskImpl(vertex.getVertexId(), i, vertex.eventHandler, - null, conf, vertex.taskAttemptListener, vertex.jobToken, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java new file mode 100644 index 0000000..12ddcf0 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -0,0 +1,581 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.ClusterInfo; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.tez.common.TezTaskContext; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.dag.api.records.TaskAttemptState; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventType; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.rm.container.AMContainerMap; +import org.apache.tez.engine.common.security.JobTokenIdentifier; +import org.apache.tez.engine.records.TezDAGID; +import org.apache.tez.engine.records.TezTaskAttemptID; +import org.apache.tez.engine.records.TezTaskID; +import org.apache.tez.engine.records.TezVertexID; +import org.junit.Test; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class TestTaskAttempt { + + private static final String MAP_PROCESSOR_NAME = + "org.apache.tez.mapreduce.processor.map.MapProcessor"; + + static public class StubbedFS extends RawLocalFileSystem { + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return new FileStatus(1, false, 1, 1, 1, f); + } + } + + // @Test + // // Verifies # tasks, attempts and diagnostics for a failing job. + // // TODO Move to TestTask - to verify # retries + // public void testMRAppHistoryForMap() throws Exception { + // MRApp app = new FailingAttemptsMRApp(1, 0); + // testMRAppHistory(app); + // } + // + // @Test + // // Verifies # tasks, attempts and diagnostics for a failing job. + // // Move to TestTask - to verify # retries + // public void testMRAppHistoryForReduce() throws Exception { + // MRApp app = new FailingAttemptsMRApp(0, 1); + // testMRAppHistory(app); + // } + + // @Test + // // Verifies that the launch request is based on the hosts. + // // TODO Move to the client. + // // TODO Add a test that verifies that the LocationHint is used as it should + // be. + // public void testSingleRackRequest() throws Exception { + // TaskAttemptImpl.ScheduleTaskattemptTransition sta = + // new TaskAttemptImpl.ScheduleTaskattemptTransition(); + // + // EventHandler eventHandler = mock(EventHandler.class); + // String[] hosts = new String[3]; + // hosts[0] = "host1"; + // hosts[1] = "host2"; + // hosts[2] = "host3"; + // TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0, + // 128 * 1024 * 1024l); + // + // TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImpl2ForTest( + // eventHandler, splitInfo); + // TaskAttemptEventSchedule mockTAEvent = + // mock(TaskAttemptEventSchedule.class); + // doReturn(false).when(mockTAEvent).isRescheduled(); + // + // sta.transition(mockTaskAttempt, mockTAEvent); + // + // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + // verify(eventHandler, times(2)).handle(arg.capture()); + // if (!(arg.getAllValues().get(1) instanceof + // AMSchedulerTALaunchRequestEvent)) { + // Assert.fail("Second Event not of type ContainerRequestEvent"); + // } + // AMSchedulerTALaunchRequestEvent tlrE = (AMSchedulerTALaunchRequestEvent) + // arg + // .getAllValues().get(1); + // String[] requestedRacks = tlrE.getRacks(); + // // Only a single occurrence of /DefaultRack + // assertEquals(1, requestedRacks.length); + // } + + // @Test + // // Tests that an attempt is made to resolve the localized hosts to racks. + // // TODO Move to the client. + // public void testHostResolveAttempt() throws Exception { + // TaskAttemptImpl.ScheduleTaskattemptTransition sta = + // new TaskAttemptImpl.ScheduleTaskattemptTransition(); + // + // EventHandler eventHandler = mock(EventHandler.class); + // String hosts[] = new String[] {"192.168.1.1", "host2", "host3"}; + // String resolved[] = new String[] {"host1", "host2", "host3"}; + // TaskSplitMetaInfo splitInfo = + // new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); + // + // TaskAttemptImpl mockTaskAttempt = + // createMapTaskAttemptImpl2ForTest(eventHandler, splitInfo); + // TaskAttemptImpl spyTa = spy(mockTaskAttempt); + // when(spyTa.resolveHosts(hosts)).thenReturn(resolved); + // + // TaskAttemptEventSchedule mockTAEvent = + // mock(TaskAttemptEventSchedule.class); + // doReturn(false).when(mockTAEvent).isRescheduled(); + // + // sta.transition(spyTa, mockTAEvent); + // verify(spyTa).resolveHosts(hosts); + // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + // verify(eventHandler, times(2)).handle(arg.capture()); + // if (!(arg.getAllValues().get(1) instanceof + // AMSchedulerTALaunchRequestEvent)) { + // Assert.fail("Second Event not of type ContainerRequestEvent"); + // } + // Map<String, Boolean> expected = new HashMap<String, Boolean>(); + // expected.put("host1", true); + // expected.put("host2", true); + // expected.put("host3", true); + // AMSchedulerTALaunchRequestEvent cre = + // (AMSchedulerTALaunchRequestEvent) arg.getAllValues().get(1); + // String[] requestedHosts = cre.getHosts(); + // for (String h : requestedHosts) { + // expected.remove(h); + // } + // assertEquals(0, expected.size()); + // } + // + + // @Test + // // Verifies accounting of slot_milli counters. Time spent in running tasks. + // // TODO Fix this test to work without MRApp. + // public void testSlotMillisCounterUpdate() throws Exception { + // verifySlotMillis(2048, 2048, 1024); + // verifySlotMillis(2048, 1024, 1024); + // verifySlotMillis(10240, 1024, 2048); + // } + + // public void verifySlotMillis(int mapMemMb, int reduceMemMb, + // int minContainerSize) throws Exception { + // Clock actualClock = new SystemClock(); + // ControlledClock clock = new ControlledClock(actualClock); + // clock.setTime(10); + // MRApp app = + // new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock); + // Configuration conf = new Configuration(); + // conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb); + // conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb); + // app.setClusterInfo(new ClusterInfo(BuilderUtils + // .newResource(minContainerSize, 1), BuilderUtils.newResource(10240,1))); + // + // Job job = app.submit(conf); + // app.waitForState(job, JobState.RUNNING); + // Map<TaskId, Task> tasks = job.getTasks(); + // Assert.assertEquals("Num tasks is not correct", 2, tasks.size()); + // Iterator<Task> taskIter = tasks.values().iterator(); + // Task mTask = taskIter.next(); + // app.waitForState(mTask, TaskState.RUNNING); + // Task rTask = taskIter.next(); + // app.waitForState(rTask, TaskState.RUNNING); + // Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts(); + // Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size()); + // Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts(); + // Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size()); + // TaskAttempt mta = mAttempts.values().iterator().next(); + // TaskAttempt rta = rAttempts.values().iterator().next(); + // app.waitForState(mta, TaskAttemptState.RUNNING); + // app.waitForState(rta, TaskAttemptState.RUNNING); + // + // clock.setTime(11); + // app.getContext() + // .getEventHandler() + // .handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE)); + // app.getContext() + // .getEventHandler() + // .handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE)); + // app.waitForState(job, JobState.SUCCEEDED); + // Assert.assertEquals(mta.getFinishTime(), 11); + // Assert.assertEquals(mta.getLaunchTime(), 10); + // Assert.assertEquals(rta.getFinishTime(), 11); + // Assert.assertEquals(rta.getLaunchTime(), 10); + // Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize), + // job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS) + // .getValue()); + // Assert.assertEquals( + // (int) Math.ceil((float) reduceMemMb / minContainerSize), job + // .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES) + // .getValue()); + // } + // + + // private void testMRAppHistory(MRApp app) throws Exception { + // Configuration conf = new Configuration(); + // Job job = app.submit(conf); + // app.waitForState(job, JobState.FAILED); + // Map<TaskId, Task> tasks = job.getTasks(); + // + // Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); + // Task task = tasks.values().iterator().next(); + // Assert.assertEquals("Task state not correct", TaskState.FAILED, task + // .getReport().getTaskState()); + // Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next() + // .getAttempts(); + // Assert.assertEquals("Num attempts is not correct", 4, attempts.size()); + // + // Iterator<TaskAttempt> it = attempts.values().iterator(); + // TaskAttemptReport report = it.next().getReport(); + // Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, + // report.getTaskAttemptState()); + // Assert.assertEquals("Diagnostic Information is not Correct", + // "Test Diagnostic Event", report.getDiagnosticInfo()); + // report = it.next().getReport(); + // Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, + // report.getTaskAttemptState()); + // } + + @Test + // Ensure the dag does not go into an error state if a attempt kill is + // received while STARTING + public void testLaunchFailedWhileKilling() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + TezDAGID dagID = new TezDAGID(appId, 1); + TezVertexID vertexID = new TezVertexID(dagID, 1); + TezTaskID taskID = new TezTaskID(vertexID, 1); + TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + tezConf.setBoolean("fs.file.impl.disable.cache", true); + + TaskLocationHint locationHint = new TaskLocationHint( + new String[] { "127.0.0.1" }, null); + Resource resource = BuilderUtils.newResource(1024, 1); + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + Map<String, String> environment = new HashMap<String, String>(); + String javaOpts = ""; + + AppContext mockAppContext = mock(AppContext.class); + doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo(); + + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, 1, tezConf, mock(Token.class), new Credentials(), + new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext, + MAP_PROCESSOR_NAME, locationHint, resource, localResources, + environment, javaOpts, false); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, BuilderUtils + .newPriority(3))); + // At state STARTING. + taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null)); + // At some KILLING state. + taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null)); + // taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID, + // null)); + assertFalse(eventHandler.internalError); + } + + // TODO Add a similar test for TERMINATING. + // Ensure ContainerTerminated is handled correctly by the TaskAttempt + @Test + public void testContainerTerminatedWhileRunning() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + TezDAGID dagID = new TezDAGID(appId, 1); + TezVertexID vertexID = new TezVertexID(dagID, 1); + TezTaskID taskID = new TezTaskID(vertexID, 1); + TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + tezConf.setBoolean("fs.file.impl.disable.cache", true); + + TaskLocationHint locationHint = new TaskLocationHint( + new String[] { "127.0.0.1" }, null); + Resource resource = BuilderUtils.newResource(1024, 1); + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + Map<String, String> environment = new HashMap<String, String>(); + String javaOpts = ""; + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AppContext appCtx = mock(AppContext.class); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + appCtx); + containers.addContainerIfNew(container); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, 1, tezConf, mock(Token.class), new Credentials(), + new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, + MAP_PROCESSOR_NAME, locationHint, resource, localResources, + environment, javaOpts, false); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null, -1)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null)); + assertFalse( + "InternalError occurred trying to handle TA_CONTAINER_TERMINATED", + eventHandler.internalError); + // TODO Verify diagnostics + } + + @Test + // Ensure ContainerTerminated is handled correctly by the TaskAttempt + public void testContainerTerminatedWhileCommitting() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + TezDAGID dagID = new TezDAGID(appId, 1); + TezVertexID vertexID = new TezVertexID(dagID, 1); + TezTaskID taskID = new TezTaskID(vertexID, 1); + TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + tezConf.setBoolean("fs.file.impl.disable.cache", true); + + TaskLocationHint locationHint = new TaskLocationHint( + new String[] { "127.0.0.1" }, null); + Resource resource = BuilderUtils.newResource(1024, 1); + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + Map<String, String> environment = new HashMap<String, String>(); + String javaOpts = ""; + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AppContext appCtx = mock(AppContext.class); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + appCtx); + containers.addContainerIfNew(container); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, 1, tezConf, mock(Token.class), new Credentials(), + new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, + MAP_PROCESSOR_NAME, locationHint, resource, localResources, + environment, javaOpts, false); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null, -1)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(taskAttemptID, + TaskAttemptEventType.TA_COMMIT_PENDING)); + assertEquals("Task attempt is not in commit pending state", + taImpl.getState(), TaskAttemptState.COMMIT_PENDING); + taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null)); + assertFalse( + "InternalError occurred trying to handle TA_CONTAINER_TERMINATED", + eventHandler.internalError); + // TODO Verify diagnostics + } + + @Test + // Verifies that multiple TooManyFetchFailures are handled correctly by the + // TaskAttempt. + public void testMultipleTooManyFetchFailures() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + TezDAGID dagID = new TezDAGID(appId, 1); + TezVertexID vertexID = new TezVertexID(dagID, 1); + TezTaskID taskID = new TezTaskID(vertexID, 1); + TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + tezConf.setBoolean("fs.file.impl.disable.cache", true); + + TaskLocationHint locationHint = new TaskLocationHint( + new String[] { "127.0.0.1" }, null); + Resource resource = BuilderUtils.newResource(1024, 1); + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + Map<String, String> environment = new HashMap<String, String>(); + String javaOpts = ""; + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AppContext appCtx = mock(AppContext.class); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + appCtx); + containers.addContainerIfNew(container); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, 1, tezConf, mock(Token.class), new Credentials(), + new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, + MAP_PROCESSOR_NAME, locationHint, resource, localResources, + environment, javaOpts, false); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null, -1)); + taImpl.handle(new TaskAttemptEvent(taskAttemptID, + TaskAttemptEventType.TA_DONE)); + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + taImpl.handle(new TaskAttemptEvent(taskAttemptID, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES)); + assertEquals("Task attempt is not in FAILED state", taImpl.getState(), + TaskAttemptState.FAILED); + taImpl.handle(new TaskAttemptEvent(taskAttemptID, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES)); + assertEquals("Task attempt is not in FAILED state, still", + taImpl.getState(), TaskAttemptState.FAILED); + assertFalse( + "InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES", + eventHandler.internalError); + } + + public static class MockEventHandler implements EventHandler { + public boolean internalError; + + @Override + public void handle(Event event) { + if (event instanceof DAGEvent) { + DAGEvent je = ((DAGEvent) event); + if (DAGEventType.INTERNAL_ERROR == je.getType()) { + internalError = true; + } + } + } + }; + + private class MockTaskAttemptImpl extends TaskAttemptImpl { + + public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, + EventHandler eventHandler, TaskAttemptListener tal, int partition, + TezConfiguration conf, Token<JobTokenIdentifier> jobToken, + Credentials credentials, Clock clock, + TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, + String processorName, TaskLocationHint locationHint, Resource resource, + Map<String, LocalResource> localResources, + Map<String, String> environment, String javaOpts, boolean isRescheduled) { + super(taskId, attemptNumber, eventHandler, tal, partition, conf, + jobToken, credentials, clock, taskHeartbeatHandler, appContext, + processorName, locationHint, resource, localResources, environment, + javaOpts, isRescheduled); + } + + @Override + protected TezTaskContext createRemoteTask() { + // FIXME + return null; + } + + @Override + protected void logJobHistoryAttemptStarted() { + } + + @Override + protected void logJobHistoryAttemptFinishedEvent( + TaskAttemptStateInternal state) { + + } + + @Override + protected void logJobHistoryAttemptUnsuccesfulCompletion( + TaskAttemptState state) { + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java new file mode 100644 index 0000000..13a19b9 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.dag.api.records.TaskAttemptState; +import org.apache.tez.dag.api.records.TaskState; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.TaskStateInternal; +import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; +import org.apache.tez.dag.app.dag.event.TaskEventType; +import org.apache.tez.engine.common.security.JobTokenIdentifier; +import org.apache.tez.engine.records.TezDAGID; +import org.apache.tez.engine.records.TezTaskAttemptID; +import org.apache.tez.engine.records.TezTaskID; +import org.apache.tez.engine.records.TezVertexID; +import org.junit.Before; +import org.junit.Test; + +public class TestTaskImpl { + + private static final Log LOG = LogFactory.getLog(TestTaskImpl.class); + + private int taskCounter = 0; + private final int partition = 1; + + private InlineDispatcher dispatcher; + + private TezConfiguration conf; + private TaskAttemptListener taskAttemptListener; + private TaskHeartbeatHandler taskHeartbeatHandler; + private Token<JobTokenIdentifier> jobToken; + private Credentials credentials; + private Clock clock; + private TaskLocationHint locationHint; + + private ApplicationId appId; + private TezDAGID dagId; + private TezVertexID vertexId; + private AppContext appContext; + private Resource taskResource; + private Map<String, LocalResource> localResources; + private Map<String, String> environment; + private String javaOpts; + private boolean leafVertex; + + private MockTaskImpl mockTask; + + @SuppressWarnings("unchecked") + @Before + public void setup() { + dispatcher = new InlineDispatcher(); + conf = new TezConfiguration(); + taskAttemptListener = mock(TaskAttemptListener.class); + taskHeartbeatHandler = mock(TaskHeartbeatHandler.class); + jobToken = (Token<JobTokenIdentifier>) mock(Token.class); + credentials = null; + clock = new SystemClock(); + locationHint = new TaskLocationHint(new String[1], new String[1]); + + appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), 1); + dagId = new TezDAGID(appId, 1); + vertexId = new TezVertexID(dagId, 1); + appContext = mock(AppContext.class); + taskResource = BuilderUtils.newResource(1024, 1); + localResources = new HashMap<String, LocalResource>(); + environment = new HashMap<String, String>(); + javaOpts = ""; + leafVertex = false; + + mockTask = new MockTaskImpl(vertexId, partition, + dispatcher.getEventHandler(), conf, taskAttemptListener, jobToken, + credentials, clock, taskHeartbeatHandler, appContext, + "org.apache.tez.mapreduce.processor.map.MapProcessor", leafVertex, + locationHint, taskResource, localResources, environment, javaOpts); + } + + private TezTaskID getNewTaskID() { + TezTaskID taskID = new TezTaskID(vertexId, ++taskCounter); + return taskID; + } + + private void scheduleTaskAttempt(TezTaskID taskId) { + mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE)); + assertTaskScheduledState(); + } + + private void killTask(TezTaskID taskId) { + mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL)); + assertTaskKillWaitState(); + } + + private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) { + mockTask.handle(new TaskEventTAUpdate(attemptId, + TaskEventType.T_ATTEMPT_KILLED)); + assertTaskScheduledState(); + } + + private void launchTaskAttempt(TezTaskAttemptID attemptId) { + mockTask.handle(new TaskEventTAUpdate(attemptId, + TaskEventType.T_ATTEMPT_LAUNCHED)); + assertTaskRunningState(); + } + + private void commitTaskAttempt(TezTaskAttemptID attemptId) { + mockTask.handle(new TaskEventTAUpdate(attemptId, + TaskEventType.T_ATTEMPT_COMMIT_PENDING)); + assertTaskRunningState(); + } + + private void updateAttemptProgress(MockTaskAttemptImpl attempt, float p) { + attempt.setProgress(p); + } + + private void updateAttemptState(MockTaskAttemptImpl attempt, + TaskAttemptState s) { + attempt.setState(s); + } + + private void killRunningTaskAttempt(TezTaskAttemptID attemptId) { + mockTask.handle(new TaskEventTAUpdate(attemptId, + TaskEventType.T_ATTEMPT_KILLED)); + assertTaskRunningState(); + } + + private void failRunningTaskAttempt(TezTaskAttemptID attemptId) { + mockTask.handle(new TaskEventTAUpdate(attemptId, + TaskEventType.T_ATTEMPT_FAILED)); + assertTaskRunningState(); + } + + /** + * {@link TaskState#NEW} + */ + private void assertTaskNewState() { + assertEquals(TaskState.NEW, mockTask.getState()); + } + + /** + * {@link TaskState#SCHEDULED} + */ + private void assertTaskScheduledState() { + assertEquals(TaskState.SCHEDULED, mockTask.getState()); + } + + /** + * {@link TaskState#RUNNING} + */ + private void assertTaskRunningState() { + assertEquals(TaskState.RUNNING, mockTask.getState()); + } + + /** + * {@link TaskState#KILL_WAIT} + */ + private void assertTaskKillWaitState() { + assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState()); + } + + /** + * {@link TaskState#SUCCEEDED} + */ + private void assertTaskSucceededState() { + assertEquals(TaskState.SUCCEEDED, mockTask.getState()); + } + + @Test + public void testInit() { + LOG.info("--- START: testInit ---"); + assertTaskNewState(); + assert (mockTask.getAttemptList().size() == 0); + } + + @Test + /** + * {@link TaskState#NEW}->{@link TaskState#SCHEDULED} + */ + public void testScheduleTask() { + LOG.info("--- START: testScheduleTask ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + } + + @Test + /** + * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT} + */ + public void testKillScheduledTask() { + LOG.info("--- START: testKillScheduledTask ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + killTask(taskId); + } + + @Test + /** + * Kill attempt + * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED} + */ + public void testKillScheduledTaskAttempt() { + LOG.info("--- START: testKillScheduledTaskAttempt ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + killScheduledTaskAttempt(mockTask.getLastAttempt().getID()); + } + + @Test + /** + * Launch attempt + * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING} + */ + public void testLaunchTaskAttempt() { + LOG.info("--- START: testLaunchTaskAttempt ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + } + + @Test + /** + * Kill running attempt + * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} + */ + public void testKillRunningTaskAttempt() { + LOG.info("--- START: testKillRunningTaskAttempt ---"); + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + killRunningTaskAttempt(mockTask.getLastAttempt().getID()); + } + + @Test + public void testTaskProgress() { + LOG.info("--- START: testTaskProgress ---"); + + // launch task + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + float progress = 0f; + assert (mockTask.getProgress() == progress); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + + // update attempt1 + progress = 50f; + updateAttemptProgress(mockTask.getLastAttempt(), progress); + assert (mockTask.getProgress() == progress); + progress = 100f; + updateAttemptProgress(mockTask.getLastAttempt(), progress); + assert (mockTask.getProgress() == progress); + + progress = 0f; + // mark first attempt as killed + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.KILLED); + assert (mockTask.getProgress() == progress); + + // kill first attempt + // should trigger a new attempt + // as no successful attempts + killRunningTaskAttempt(mockTask.getLastAttempt().getID()); + assert (mockTask.getAttemptList().size() == 2); + + assert (mockTask.getProgress() == 0f); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + progress = 50f; + updateAttemptProgress(mockTask.getLastAttempt(), progress); + assert (mockTask.getProgress() == progress); + } + + @Test + public void testFailureDuringTaskAttemptCommit() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), + TaskAttemptState.COMMIT_PENDING); + commitTaskAttempt(mockTask.getLastAttempt().getID()); + + // During the task attempt commit there is an exception which causes + // the attempt to fail + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED); + failRunningTaskAttempt(mockTask.getLastAttempt().getID()); + + assertEquals(2, mockTask.getAttemptList().size()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED); + commitTaskAttempt(mockTask.getLastAttempt().getID()); + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + assertFalse("First attempt should not commit", + mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); + assertTrue("Second attempt should commit", + mockTask.canCommit(mockTask.getLastAttempt().getID())); + + assertTaskSucceededState(); + } + + @Test + public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + + // Add a speculative task attempt that succeeds + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + commitTaskAttempt(mockTask.getLastAttempt().getID()); + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + // The task should now have succeeded + assertTaskSucceededState(); + + // Now fail the first task attempt, after the second has succeeded + mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0) + .getID(), TaskEventType.T_ATTEMPT_FAILED)); + + // The task should still be in the succeeded state + assertTaskSucceededState(); + + } + + @SuppressWarnings("rawtypes") + private class MockTaskImpl extends TaskImpl { + + private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>(); + + public MockTaskImpl(TezVertexID vertexId, int partition, + EventHandler eventHandler, TezConfiguration conf, + TaskAttemptListener taskAttemptListener, + Token<JobTokenIdentifier> jobToken, Credentials credentials, + Clock clock, TaskHeartbeatHandler thh, AppContext appContext, + String processorName, boolean leafVertex, + TaskLocationHint locationHint, Resource resource, + Map<String, LocalResource> localResources, + Map<String, String> environment, String javaOpts) { + super(vertexId, partition, eventHandler, conf, taskAttemptListener, + jobToken, credentials, clock, thh, appContext, processorName, + leafVertex, locationHint, resource, localResources, environment, + javaOpts); + } + + @Override + protected TaskAttemptImpl createAttempt(int attemptNumber) { + MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), + attemptNumber, eventHandler, taskAttemptListener, attemptNumber, + conf, jobToken, credentials, clock, taskHeartbeatHandler, appContext, + processorName, locationHint, taskResource, localResources, + environment, javaOpts, true); + taskAttempts.add(attempt); + return attempt; + } + + @Override + protected void internalError(TaskEventType type) { + super.internalError(type); + fail("Internal error: " + type); + } + + MockTaskAttemptImpl getLastAttempt() { + return taskAttempts.get(taskAttempts.size() - 1); + } + + List<MockTaskAttemptImpl> getAttemptList() { + return taskAttempts; + } + + protected void logJobHistoryTaskStartedEvent() { + } + + protected void logJobHistoryTaskFinishedEvent() { + } + + protected void logJobHistoryTaskFailedEvent(TaskState finalState) { + } + } + + @SuppressWarnings("rawtypes") + public class MockTaskAttemptImpl extends TaskAttemptImpl { + + private float progress = 0; + private TaskAttemptState state = TaskAttemptState.NEW; + + public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, + EventHandler eventHandler, TaskAttemptListener tal, int partition, + TezConfiguration conf, Token<JobTokenIdentifier> jobToken, + Credentials credentials, Clock clock, TaskHeartbeatHandler thh, + AppContext appContext, String processorName, + TaskLocationHint locationHing, Resource resource, + Map<String, LocalResource> localResources, + Map<String, String> environment, String javaOpts, boolean isRescheduled) { + super(taskId, attemptNumber, eventHandler, tal, partition, conf, + jobToken, credentials, clock, thh, appContext, processorName, + locationHing, resource, localResources, environment, javaOpts, + isRescheduled); + } + + @Override + public float getProgress() { + return progress; + } + + public void setProgress(float progress) { + this.progress = progress; + } + + public void setState(TaskAttemptState state) { + this.state = state; + } + + @Override + public TaskAttemptState getState() { + return state; + } + } + +}
