Repository: tez Updated Branches: refs/heads/master 727584fd2 -> c3b8b8523
TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c3b8b852 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c3b8b852 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c3b8b852 Branch: refs/heads/master Commit: c3b8b852331c41f9a9b41c7c74995f52f4578d99 Parents: 727584f Author: Hitesh Shah <[email protected]> Authored: Mon May 2 11:00:48 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon May 2 11:00:48 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 61 ++++++-------------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 11 +++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 21 ++----- .../tez/dag/app/dag/impl/TestTaskImpl.java | 13 +++-- 5 files changed, 43 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b436b9a..37d47cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.9.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3193. Deadlock in AM during task commit request. TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput. TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests. @@ -27,6 +28,7 @@ Release 0.8.4: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3193. Deadlock in AM during task commit request. TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. TEZ-3224. User payload is not initialized before creating vertex manager plugin. @@ -465,6 +467,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3193. Deadlock in AM during task commit request. TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks TEZ-3224. User payload is not initialized before creating vertex manager plugin. TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 6d9247e..e39315b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; @@ -71,7 +70,6 @@ import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskHeartbeatHandler; -import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.Vertex; @@ -107,7 +105,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; -import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; @@ -190,8 +187,9 @@ public class TaskAttemptImpl implements TaskAttempt, private String nodeHttpAddress; private String nodeRackName; - private final Task task; private final Vertex vertex; + private final TaskLocationHint locationHint; + private final TaskSpec taskSpec; @VisibleForTesting boolean appendNextDataEvent = true; @@ -465,22 +463,25 @@ public class TaskAttemptImpl implements TaskAttempt, .installTopology(); @SuppressWarnings("rawtypes") - public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, + public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, - Task task) { - this(taskId, attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock, + Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) { + this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex, - task, null); + vertex, locationHint, taskSpec, null); } - public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, + + @SuppressWarnings("rawtypes") + public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, - Task task, TezTaskAttemptID schedulingCausalTA) { + Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec, + TezTaskAttemptID schedulingCausalTA) { MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration @@ -496,15 +497,16 @@ public class TaskAttemptImpl implements TaskAttempt, ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); - this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber); + this.attemptId = attemptId; this.eventHandler = eventHandler; //Reported status this.conf = conf; this.clock = clock; this.taskHeartbeatHandler = taskHeartbeatHandler; this.appContext = appContext; - this.task = task; - this.vertex = this.task.getVertex(); + this.vertex = vertex; + this.locationHint = locationHint; + this.taskSpec = taskSpec; this.creationCausalTA = schedulingCausalTA; this.creationTime = clock.getTime(); @@ -548,14 +550,6 @@ public class TaskAttemptImpl implements TaskAttempt, return creationCausalTA; } - TaskSpec createRemoteTaskSpec() throws AMUserCodeException { - TaskSpec baseTaskSpec = task.getBaseTaskSpec(); - return new TaskSpec(getID(), - baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), - baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), - baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs()); - } - @Override public TaskAttemptReport getReport() { TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class); @@ -1036,7 +1030,7 @@ public class TaskAttemptImpl implements TaskAttempt, } private TaskLocationHint getTaskLocationHint() { - return task.getTaskLocationHint(); + return locationHint; } protected String[] resolveHosts(String[] src) { @@ -1226,22 +1220,6 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event; ta.scheduledTime = ta.clock.getTime(); - // Create the remote task. - TaskSpec remoteTaskSpec; - try { - remoteTaskSpec = ta.createRemoteTaskSpec(); - if (LOG.isDebugEnabled()) { - LOG.debug("remoteTaskSpec:" + remoteTaskSpec); - } - } catch (AMUserCodeException e) { - String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta; - LOG.error(msg, e); - String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()); - new TerminateTransition(FAILED_HELPER).transition(ta, - new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diag, - TaskAttemptTerminationCause.APPLICATION_ERROR)); - return TaskAttemptStateInternal.FAILED; - } // Create startTaskRequest String[] requestHosts = new String[0]; @@ -1271,10 +1249,7 @@ public class TaskAttemptImpl implements TaskAttempt, locationHint = null; } - if (LOG.isDebugEnabled()) { - LOG.debug("Asking for container launch with taskAttemptContext: " - + remoteTaskSpec); - } + LOG.debug("Asking for container launch with taskAttemptContext: {}", ta.taskSpec); // Send out a launch request to the scheduler. int priority; @@ -1288,7 +1263,7 @@ public class TaskAttemptImpl implements TaskAttempt, // TODO Jira post TEZ-2003 getVertex implementation is very inefficient. This should be via references, instead of locked table lookups. Vertex vertex = ta.getVertex(); AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest( - ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint, + ta.attemptId, ta.taskResource, ta.taskSpec, ta, locationHint, priority, ta.containerContext, vertex.getTaskSchedulerIdentifier(), vertex.getContainerLauncherIdentifier(), http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 26ba004..28a1c5e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -93,6 +93,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; @@ -718,9 +719,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) { - return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, + TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber); + TaskSpec taskSpec = new TaskSpec(attemptId, + baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), + baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), + baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs()); + return new TaskAttemptImpl(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, - (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA); + (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(), + locationHint, taskSpec, schedulingCausalTA); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index e4cd956..a50ca49 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -78,7 +78,6 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TaskCommunicatorWrapper; import org.apache.tez.dag.app.TaskHeartbeatHandler; -import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEvent; @@ -112,10 +111,10 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; -import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.serviceplugins.api.ServicePluginException; import org.junit.Assert; @@ -139,7 +138,6 @@ public class TestTaskAttempt { } AppContext appCtx; - Task mockTask; TaskLocationHint locationHint; Vertex mockVertex; ServicePluginInfo servicePluginInfo = new ServicePluginInfo() @@ -156,9 +154,7 @@ public class TestTaskAttempt { when(appCtx.getContainerLauncherName(anyInt())).thenReturn( TezConstants.getTezYarnServicePluginName()); - mockTask = mock(Task.class); mockVertex = mock(Vertex.class); - when(mockTask.getVertex()).thenReturn(mockVertex); when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); @@ -196,7 +192,6 @@ public class TestTaskAttempt { + AMSchedulerEventTALaunchRequest.class.getName()); } - verify(mockTask, times(1)).getTaskLocationHint(); // TODO Move the Rack request check to the client after TEZ-125 is fixed. Set<String> requestedRacks = taImpl.taskRacks; assertEquals(1, requestedRacks.size()); @@ -1742,12 +1737,12 @@ public class TestTaskAttempt { TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex) { - super(taskId, attemptNumber, eventHandler, tal, conf, + super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber), + eventHandler, tal, conf, clock, taskHeartbeatHandler, appContext, - isRescheduled, resource, containerContext, leafVertex, mockTask, null); - when(mockTask.getTaskLocationHint()).thenReturn(locationHint); + isRescheduled, resource, containerContext, leafVertex, mockVertex, + locationHint, null, null); } - boolean inputFailedReported = false; @@ -1757,12 +1752,6 @@ public class TestTaskAttempt { } @Override - protected TaskSpec createRemoteTaskSpec() { - // FIXME - return null; - } - - @Override protected void logJobHistoryAttemptStarted() { taskAttemptStartedEventLogged++; super.logJobHistoryAttemptStarted(); http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index f88ab7c..fb2f543 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -82,6 +82,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -948,8 +949,9 @@ public class TestTaskImpl { @Override protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) { - MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), - attemptNumber, eventHandler, taskCommunicatorManagerInterface, + MockTaskAttemptImpl attempt = new MockTaskAttemptImpl( + TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber), + eventHandler, taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, true, taskResource, containerContext, schedCausalTA); taskAttempts.add(attempt); @@ -995,13 +997,14 @@ public class TestTaskImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; - public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, + public MockTaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) { - super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, - appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), schedCausalTA); + super(attemptId, eventHandler, tal, conf, clock, thh, + appContext, isRescheduled, resource, containerContext, false, null, + locationHint, mockTaskSpec, schedCausalTA); } @Override
