Repository: tez Updated Branches: refs/heads/branch-0.7 9968e569b -> d3d4b051d
TEZ-2421. Deadlock in AM because attempt and vertex locking each other out (zjffdu) (cherry picked from commit ed7f1abbce54093f56f33c35c8ac92d9e433760f) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d3d4b051 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d3d4b051 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d3d4b051 Branch: refs/heads/branch-0.7 Commit: d3d4b051d3c896b601a0c492cd6d5a9b27f04cff Parents: 9968e56 Author: Jeff Zhang <[email protected]> Authored: Mon May 11 13:51:59 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Mon May 11 13:55:22 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/dag/app/dag/Task.java | 7 +- .../app/dag/event/TaskEventScheduleTask.java | 42 ++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 25 ++- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 29 ++- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 206 +++++++++++-------- .../tez/dag/app/dag/impl/TestDAGImpl.java | 14 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 79 +++---- .../tez/dag/app/dag/impl/TestTaskImpl.java | 26 +-- .../tez/dag/app/dag/impl/TestVertexImpl.java | 27 +++ .../apache/tez/runtime/api/impl/TaskSpec.java | 29 +++ 11 files changed, 327 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 24b5d8f..75411b5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES Default max limit increased. Should not affect existing users. ALL CHANGES: + TEZ-2421. Deadlock in AM because attempt and vertex locking each other out TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts. TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java index b798fce..177ee8a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java @@ -22,11 +22,13 @@ import java.util.List; import java.util.Map; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskReport; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; /** @@ -65,5 +67,8 @@ public interface Task { TaskState restoreFromEvent(HistoryEvent historyEvent); public void registerTezEvent(TezEvent tezEvent); - + + public TaskSpec getBaseTaskSpec(); + + public TaskLocationHint getTaskLocationHint(); } http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java new file mode 100644 index 0000000..696602a --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java @@ -0,0 +1,42 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.runtime.api.impl.TaskSpec; + +public class TaskEventScheduleTask extends TaskEvent { + private final TaskSpec baseTaskSpec; + private final TaskLocationHint locationHint; + + public TaskEventScheduleTask(TezTaskID taskId, TaskSpec baseTaskSpec, TaskLocationHint locationHint) { + super(taskId, TaskEventType.T_SCHEDULE); + this.baseTaskSpec = baseTaskSpec; + this.locationHint = locationHint; + } + + public TaskSpec getBaseTaskSpec() { + return baseTaskSpec; + } + + public TaskLocationHint getTaskLocationHint() { + return locationHint; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index b1c0acc..036022e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.Records; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; @@ -456,14 +455,19 @@ public class TaskAttemptImpl implements TaskAttempt, } TaskSpec createRemoteTaskSpec() throws AMUserCodeException { - Vertex vertex = getVertex(); - ProcessorDescriptor procDesc = vertex.getProcessorDescriptor(); - int taskId = getTaskID().getId(); + TaskSpec baseTaskSpec = task.getBaseTaskSpec(); + if (baseTaskSpec == null) { + // since recovery does not follow normal transitions, TaskEventScheduleTask + // is not being honored by the recovery code path. Using this to workaround + // until recovery is fixed. Calling the non-locking internal method of the vertex + // to get the taskSpec directly. Since everything happens on the central dispatcher + // during recovery this is deadlock free for now. TEZ-1019 should remove the need for this. + baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId()); + } return new TaskSpec(getID(), - vertex.getDAG().getName(), - vertex.getName(), vertex.getTotalTasks(), procDesc, - vertex.getInputSpecList(taskId), vertex.getOutputSpecList(taskId), - vertex.getGroupInputSpecList(taskId)); + baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), + baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), + baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs()); } @Override @@ -935,9 +939,8 @@ public class TaskAttemptImpl implements TaskAttempt, // sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext)); } - @VisibleForTesting - protected TaskLocationHint getTaskLocationHint() { - return getVertex().getTaskLocationHint(getTaskID()); + private TaskLocationHint getTaskLocationHint() { + return task.getTaskLocationHint(); } protected String[] resolveHosts(String[] src) { http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 2e884e7..de5ab2a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -37,7 +37,6 @@ import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; @@ -73,6 +73,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; +import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventTermination; import org.apache.tez.dag.app.dag.event.TaskEventType; @@ -92,6 +93,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.OutputCommitter; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; @@ -128,6 +130,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { //private final MRAppMetrics metrics; protected final AppContext appContext; private final Resource taskResource; + private TaskSpec baseTaskSpec; + private TaskLocationHint locationHint; private final ContainerContext containerContext; @VisibleForTesting long scheduledTime; @@ -516,6 +520,26 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { readLock.unlock(); } } + + @Override + public TaskSpec getBaseTaskSpec() { + readLock.lock(); + try { + return baseTaskSpec; + } finally { + readLock.unlock(); + } + } + + @Override + public TaskLocationHint getTaskLocationHint() { + readLock.lock(); + try { + return locationHint; + } finally { + readLock.unlock(); + } + } @Override public List<String> getDiagnostics() { @@ -1021,6 +1045,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event; + task.locationHint = scheduleEvent.getTaskLocationHint(); + task.baseTaskSpec = scheduleEvent.getBaseTaskSpec(); task.addAndScheduleAttempt(); task.scheduledTime = task.clock.getTime(); task.logJobHistoryTaskStartedEvent(); http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 6b208b0..80a0358 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -114,6 +114,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; +import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTermination; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; @@ -171,6 +172,7 @@ import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; @@ -1417,66 +1419,96 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } - void setupEdgeRouting() throws AMUserCodeException { + boolean setupEdgeRouting() throws AMUserCodeException { + boolean doOnDemand = useOnDemandRouting; for (Edge e : sourceVertices.values()) { boolean edgeDoingOnDemand = e.routingToBegin(); - if (useOnDemandRouting && !edgeDoingOnDemand) { - useOnDemandRouting = false; + if (doOnDemand && !edgeDoingOnDemand) { + doOnDemand = false; LOG.info("Not using ondemand routing because of edge between " + e.getSourceVertexName() + " and " + getLogIdentifier()); } } + return doOnDemand; } private void unsetTasksNotYetScheduled() throws AMUserCodeException { if (tasksNotYetScheduled) { - setupEdgeRouting(); - tasksNotYetScheduled = false; - // only now can we be sure of the edge manager type. so until now - // we will accumulate pending tasks in case legacy routing gets used. - // this is only needed to support mixed mode routing. Else for - // on demand routing events can be directly added to taskEvents when - // they arrive in handleRoutedEvents instead of first caching them in - // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents - // can be removed. - if (!pendingTaskEvents.isEmpty()) { - LOG.info("Routing pending task events for vertex: " + logIdentifier); - try { - handleRoutedTezEvents(pendingTaskEvents, false, true); - } catch (AMUserCodeException e) { - String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier; - LOG.error(msg, e); - addDiagnostic(msg + ", " + e.getMessage() + ", " - + ExceptionUtils.getStackTrace(e.getCause())); - eventHandler.handle(new VertexEventTermination(vertexId, - VertexTerminationCause.AM_USERCODE_FAILURE)); - return; + boolean doOnDemand = setupEdgeRouting(); + // change state under lock + writeLock.lock(); + try { + useOnDemandRouting = doOnDemand; + tasksNotYetScheduled = false; + // only now can we be sure of the edge manager type. so until now + // we will accumulate pending tasks in case legacy routing gets used. + // this is only needed to support mixed mode routing. Else for + // on demand routing events can be directly added to taskEvents when + // they arrive in handleRoutedEvents instead of first caching them in + // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents + // can be removed. + if (!pendingTaskEvents.isEmpty()) { + LOG.info("Routing pending task events for vertex: " + logIdentifier); + try { + handleRoutedTezEvents(pendingTaskEvents, false, true); + } catch (AMUserCodeException e) { + String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier; + LOG.error(msg, e); + addDiagnostic(msg + ", " + e.getMessage() + ", " + + ExceptionUtils.getStackTrace(e.getCause())); + eventHandler.handle(new VertexEventTermination(vertexId, + VertexTerminationCause.AM_USERCODE_FAILURE)); + return; + } + pendingTaskEvents.clear(); } - pendingTaskEvents.clear(); + } finally { + writeLock.unlock(); } } } + TaskSpec createRemoteTaskSpec(int taskIndex) throws AMUserCodeException { + return TaskSpec.createBaseTaskSpec(getDAG().getName(), + getName(), getTotalTasks(), getProcessorDescriptor(), + getInputSpecList(taskIndex), getOutputSpecList(taskIndex), + getGroupInputSpecList(taskIndex)); + } + @Override public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) { - writeLock.lock(); try { unsetTasksNotYetScheduled(); - for (TaskWithLocationHint task : tasksToSchedule) { - if (numTasks <= task.getTaskIndex().intValue()) { - throw new TezUncheckedException( - "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier); - } - TaskLocationHint locationHint = task.getTaskLocationHint(); - if (locationHint != null) { - if (taskLocationHints == null) { - taskLocationHints = new TaskLocationHint[numTasks]; + // update state under write lock + writeLock.lock(); + try { + for (TaskWithLocationHint task : tasksToSchedule) { + if (numTasks <= task.getTaskIndex().intValue()) { + throw new TezUncheckedException( + "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier); } - taskLocationHints[task.getTaskIndex().intValue()] = locationHint; + TaskLocationHint locationHint = task.getTaskLocationHint(); + if (locationHint != null) { + if (taskLocationHints == null) { + taskLocationHints = new TaskLocationHint[numTasks]; + } + taskLocationHints[task.getTaskIndex().intValue()] = locationHint; + } + } + } finally { + writeLock.unlock(); + } + + readLock.lock(); + try { + for (TaskWithLocationHint task : tasksToSchedule) { + TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue()); + TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId()); + eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec, + getTaskLocationHint(taskId))); } - eventHandler.handle(new TaskEvent( - TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue()), - TaskEventType.T_SCHEDULE)); + } finally { + readLock.unlock(); } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier(); @@ -1485,8 +1517,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e)); // throw an unchecked exception to stop the vertex manager that invoked this. throw new TezUncheckedException(e); - } finally { - writeLock.unlock(); } } @@ -4632,50 +4662,58 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return taskLocationHints; } - // TODO Eventually remove synchronization. @Override - public synchronized List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException { - List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() - + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size())); - if (rootInputDescriptors != null) { - for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> - rootInputDescriptorEntry : rootInputDescriptors.entrySet()) { - inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(), - rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get( - rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex))); + public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException { + readLock.lock(); + try { + List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() + + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size())); + if (rootInputDescriptors != null) { + for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> + rootInputDescriptorEntry : rootInputDescriptors.entrySet()) { + inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(), + rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get( + rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex))); + } } + for(Vertex vertex : getInputVertices().keySet()) { + /** + * It is possible that setParallelism is in the middle of processing in target vertex with + * its write lock. So we need to get inputspec by acquiring read lock in target vertex to + * get consistent view. + * Refer TEZ-2251 + */ + InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); + // TODO DAGAM This should be based on the edge type. + inputSpecList.add(inputSpec); + } + return inputSpecList; + } finally { + readLock.unlock(); } - for(Vertex vertex : getInputVertices().keySet()) { - /** - * It is possible that setParallelism is in the middle of processing in target vertex with - * its write lock. So we need to get inputspec by acquiring read lock in target vertex to - * get consistent view. - * Refer TEZ-2251 - */ - InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); - // TODO DAGAM This should be based on the edge type. - inputSpecList.add(inputSpec); - } - return inputSpecList; } - // TODO Eventually remove synchronization. @Override - public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException { - List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() - + this.additionalOutputSpecs.size()); - outputSpecList.addAll(additionalOutputSpecs); - for(Vertex vertex : targetVertices.keySet()) { - /** - * It is possible that setParallelism (which could change numTasks) is in the middle of - * processing in target vertex with its write lock. So we need to get outputspec by - * acquiring read lock in target vertex to get consistent view. - * Refer TEZ-2251 - */ - OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); - outputSpecList.add(outputSpec); - } - return outputSpecList; + public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException { + readLock.lock(); + try { + List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() + + this.additionalOutputSpecs.size()); + outputSpecList.addAll(additionalOutputSpecs); + for(Vertex vertex : targetVertices.keySet()) { + /** + * It is possible that setParallelism (which could change numTasks) is in the middle of + * processing in target vertex with its write lock. So we need to get outputspec by + * acquiring read lock in target vertex to get consistent view. + * Refer TEZ-2251 + */ + OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); + outputSpecList.add(outputSpec); + } + return outputSpecList; + } finally { + readLock.unlock(); + } } private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws @@ -4703,10 +4741,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } - //TODO Eventually remove synchronization. @Override - public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) { - return groupInputSpecList; + public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) { + readLock.lock(); + try { + return groupInputSpecList; + } finally { + readLock.unlock(); + } } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index d2aa2d0..fff95b5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -93,7 +93,6 @@ import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.DAGTerminationCause; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; -import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; @@ -975,12 +974,7 @@ public class TestDAGImpl { dispatcher.await(); VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2"); - LOG.info(String.valueOf(v2.getTasks().size())); - Task t1= v2.getTask(0); - TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0)); - - Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState()); - String diag = StringUtils.join(ta1.getDiagnostics(), ","); + String diag = StringUtils.join(v2.getDiagnostics(), ","); Assert.assertTrue(diag.contains(ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name())); } @@ -998,11 +992,7 @@ public class TestDAGImpl { Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState()); VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1"); - Task t1= v1.getTask(0); - TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0)); - - Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState()); - String diag = StringUtils.join(ta1.getDiagnostics(), ","); + String diag = StringUtils.join(v1.getDiagnostics(), ","); Assert.assertTrue(diag.contains(ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name())); } http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 60c4c88..86251cc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -65,6 +65,7 @@ import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEvent; @@ -100,6 +101,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -113,11 +115,19 @@ public class TestTaskAttempt { return new FileStatus(1, false, 1, 1, 1, f); } } + + Task mockTask; + TaskLocationHint locationHint; @BeforeClass public static void setup() { MockDNSToSwitchMapping.initializeMockRackResolver(); } + + @Before + public void setupTest() { + mockTask = mock(Task.class); + } @Test(timeout = 5000) public void testLocalityRequest() { @@ -129,14 +139,14 @@ public class TestTaskAttempt { hosts.add("host1"); hosts.add("host2"); hosts.add("host3"); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(hosts, null); + locationHint = TaskLocationHint.createTaskLocationHint(hosts, null); TezTaskID taskID = TezTaskID.getInstance( TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), - locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); + false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); @@ -148,6 +158,8 @@ public class TestTaskAttempt { fail("Second event not of type " + AMSchedulerEventTALaunchRequest.class.getName()); } + + verify(mockTask, times(1)).getTaskLocationHint(); // TODO Move the Rack request check to the client after TEZ-125 is fixed. Set<String> requestedRacks = taImpl.taskRacks; assertEquals(1, requestedRacks.size()); @@ -169,12 +181,12 @@ public class TestTaskAttempt { TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), - null, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); + false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), - null, true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); + true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); @@ -224,7 +236,7 @@ public class TestTaskAttempt { String hosts[] = new String[] { "127.0.0.1", "host2", "host3" }; Set<String> resolved = new TreeSet<String>( Arrays.asList(new String[]{ "host1", "host2", "host3" })); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new TreeSet<String>(Arrays.asList(hosts)), null); TezTaskID taskID = TezTaskID.getInstance( @@ -232,7 +244,7 @@ public class TestTaskAttempt { TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), - mock(AppContext.class), locationHint, false, Resource.newInstance(1024, + mock(AppContext.class), false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); TaskAttemptImpl spyTa = spy(taImpl); @@ -280,7 +292,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -289,7 +301,7 @@ public class TestTaskAttempt { TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false, + mock(TaskHeartbeatHandler.class), mockAppContext, false, resource, createFakeContainerContext(), false); NodeId nid = NodeId.newInstance("127.0.0.1", 0); @@ -330,7 +342,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -353,7 +365,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); @@ -431,7 +443,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -454,7 +466,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); @@ -496,7 +508,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -519,7 +531,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); @@ -589,7 +601,7 @@ public class TestTaskAttempt { taskConf.setBoolean("fs.file.impl.disable.cache", true); taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -612,7 +624,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); @@ -720,7 +732,7 @@ public class TestTaskAttempt { taskConf.setBoolean("fs.file.impl.disable.cache", true); taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -743,7 +755,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); @@ -811,7 +823,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -834,7 +846,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); @@ -906,7 +918,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -929,7 +941,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); @@ -1009,7 +1021,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -1032,7 +1044,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), true); TezTaskAttemptID taskAttemptID = taImpl.getID(); @@ -1109,7 +1121,7 @@ public class TestTaskAttempt { taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); taskConf.setBoolean("fs.file.impl.disable.cache", true); - TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint( + locationHint = TaskLocationHint.createTaskLocationHint( new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); @@ -1132,7 +1144,7 @@ public class TestTaskAttempt { TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), - mockHeartbeatHandler, appCtx, locationHint, false, + mockHeartbeatHandler, appCtx, false, resource, createFakeContainerContext(), false); TezTaskAttemptID taskAttemptID = taImpl.getID(); @@ -1231,29 +1243,24 @@ public class TestTaskAttempt { }; private class MockTaskAttemptImpl extends TaskAttemptImpl { - TaskLocationHint locationHint; - + public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, - TaskLocationHint locationHint, boolean isRescheduled, + boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex) { super(taskId, attemptNumber, eventHandler, tal, conf, clock, taskHeartbeatHandler, appContext, - isRescheduled, resource, containerContext, leafVertex, mock(TaskImpl.class)); - this.locationHint = locationHint; + isRescheduled, resource, containerContext, leafVertex, mockTask); + when(mockTask.getTaskLocationHint()).thenReturn(locationHint); } + Vertex mockVertex = mock(Vertex.class); boolean inputFailedReported = false; @Override - public TaskLocationHint getTaskLocationHint() { - return locationHint; - } - - @Override protected Vertex getVertex() { return mockVertex; } http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 9da3fab..1ecabef 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -56,7 +56,7 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate; -import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventTermination; import org.apache.tez.dag.app.dag.event.TaskEventType; @@ -70,6 +70,7 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Assert; import org.junit.Before; @@ -105,6 +106,7 @@ public class TestTaskImpl { private NodeId mockNodeId; private MockTaskImpl mockTask; + private TaskSpec mockTaskSpec; @SuppressWarnings("rawtypes") class TestEventHandler implements EventHandler<Event> { @@ -149,8 +151,9 @@ public class TestTaskImpl { mockTask = new MockTaskImpl(vertexId, partition, eventHandler, conf, taskAttemptListener, clock, - taskHeartbeatHandler, appContext, leafVertex, locationHint, + taskHeartbeatHandler, appContext, leafVertex, taskResource, containerContext, vertex); + mockTaskSpec = mock(TaskSpec.class); } private TezTaskID getNewTaskID() { @@ -159,8 +162,10 @@ public class TestTaskImpl { } private void scheduleTaskAttempt(TezTaskID taskId) { - mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE)); + mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint)); assertTaskScheduledState(); + assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec()); + assertEquals(locationHint, mockTask.getTaskLocationHint()); } private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) { @@ -671,19 +676,17 @@ public class TestTaskImpl { private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>(); private Vertex vertex; - TaskLocationHint locationHint; public MockTaskImpl(TezVertexID vertexId, int partition, EventHandler eventHandler, Configuration conf, TaskAttemptListener taskAttemptListener, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex, - TaskLocationHint locationHint, Resource resource, + Resource resource, ContainerContext containerContext, Vertex vertex) { super(vertexId, partition, eventHandler, conf, taskAttemptListener, clock, thh, appContext, leafVertex, resource, containerContext, mock(StateChangeNotifier.class), vertex); this.vertex = vertex; - this.locationHint = locationHint; } @Override @@ -691,7 +694,7 @@ public class TestTaskImpl { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, - locationHint, true, taskResource, containerContext); + true, taskResource, containerContext); taskAttempts.add(attempt); return attempt; } @@ -730,21 +733,14 @@ public class TestTaskImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; - TaskLocationHint locationHint; public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, - TaskLocationHint locationHint, boolean isRescheduled, + boolean isRescheduled, Resource resource, ContainerContext containerContext) { super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class)); - this.locationHint = locationHint; - } - - @Override - public TaskLocationHint getTaskLocationHint() { - return locationHint; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index a8eaca1..6c94465 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -131,6 +131,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; @@ -352,9 +353,11 @@ public class TestVertexImpl { } private class TaskEventDispatcher implements EventHandler<TaskEvent> { + List<TaskEvent> events = Lists.newArrayList(); @SuppressWarnings("unchecked") @Override public void handle(TaskEvent event) { + events.add(event); VertexImpl vertex = vertexIdMap.get(event.getTaskID().getVertexID()); Task task = vertex.getTask(event.getTaskID()); if (task != null) { @@ -2706,6 +2709,30 @@ public class TestVertexImpl { } @Test(timeout = 5000) + public void testVertexScheduleSendEvent() throws Exception { + VertexImpl v3 = vertices.get("vertex3"); + v3.vertexReconfigurationPlanned(); + initAllVertices(VertexState.INITED); + Assert.assertEquals(2, v3.getTotalTasks()); + Map<TezTaskID, Task> tasks = v3.getTasks(); + Assert.assertEquals(2, tasks.size()); + + VertexImpl v1 = vertices.get("vertex1"); + startVertex(vertices.get("vertex2")); + startVertex(v1); + v3.reconfigureVertex(10, null, null); + checkTasks(v3, 10); + taskEventDispatcher.events.clear(); + TaskLocationHint mockLocation = mock(TaskLocationHint.class); + v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), mockLocation))); + dispatcher.await(); + Assert.assertEquals(1, taskEventDispatcher.events.size()); + TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0); + Assert.assertEquals(mockLocation, event.getTaskLocationHint()); + Assert.assertNotNull(event.getBaseTaskSpec()); + } + + @Test(timeout = 5000) public void testVertexSetParallelismFailAfterSchedule() throws Exception { VertexImpl v3 = vertices.get("vertex3"); v3.vertexReconfigurationPlanned(); http://git-wip-us.apache.org/repos/asf/tez/blob/d3d4b051/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java index cce063f..4dc57e2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java @@ -45,6 +45,35 @@ public class TaskSpec implements Writable { public TaskSpec() { } + + public static TaskSpec createBaseTaskSpec(String dagName, String vertexName, + int vertexParallelism, ProcessorDescriptor processorDescriptor, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList) { + return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList, + outputSpecList, groupInputSpecList); + } + + public TaskSpec( + String dagName, String vertexName, + int vertexParallelism, + ProcessorDescriptor processorDescriptor, + List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, + @Nullable List<GroupInputSpec> groupInputSpecList) { + checkNotNull(dagName, "dagName is null"); + checkNotNull(vertexName, "vertexName is null"); + checkNotNull(processorDescriptor, "processorDescriptor is null"); + checkNotNull(inputSpecList, "inputSpecList is null"); + checkNotNull(outputSpecList, "outputSpecList is null"); + this.taskAttemptId = null; + this.dagName = StringInterner.weakIntern(dagName); + this.vertexName = StringInterner.weakIntern(vertexName); + this.processorDescriptor = processorDescriptor; + this.inputSpecList = inputSpecList; + this.outputSpecList = outputSpecList; + this.groupInputSpecList = groupInputSpecList; + this.vertexParallelism = vertexParallelism; + } public TaskSpec(TezTaskAttemptID taskAttemptID, String dagName, String vertexName,
