Repository: tez Updated Branches: refs/heads/branch-0.7 8bcf6302a -> f478befcc
TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f478befc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f478befc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f478befc Branch: refs/heads/branch-0.7 Commit: f478befcc4ac7bae3af0516ce13e3e2144be7afd Parents: 8bcf630 Author: Rajesh Balamohan <[email protected]> Authored: Mon Jun 13 05:30:21 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon Jun 13 05:30:21 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 118 ++++++++++++------- 2 files changed, 78 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f478befc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 79d813d..8d70d97 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Release 0.7.2 Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + + TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3278. Hide Swimlane from Tez UI TEZ-909. Provide support for application tags http://git-wip-us.apache.org/repos/asf/tez/blob/f478befc/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 2df1a3d..0d6bc68 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1564,17 +1564,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } finally { writeLock.unlock(); } - - readLock.lock(); - try { - for (ScheduleTaskRequest task : tasksToSchedule) { - TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex()); - TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId()); - eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec, - getTaskLocationHint(taskId))); - } - } finally { - readLock.unlock(); + + /** + * read lock is not needed here. For e.g after starting task + * scheduling on the vertex, it would not change numTasks. Rest of + * the methods creating remote task specs have their + * own locking mechanisms. Ref: TEZ-3297 + */ + for (ScheduleTaskRequest task : tasksToSchedule) { + TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex()); + TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId()); + eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec, + getTaskLocationHint(taskId))); } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier(); @@ -4671,17 +4672,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public void setInputVertices(Map<Vertex, Edge> inVertices) { - this.sourceVertices = inVertices; - for (Vertex vertex : sourceVertices.keySet()) { - addIO(vertex.getName()); + writeLock.lock(); + try { + this.sourceVertices = inVertices; + for (Vertex vertex : sourceVertices.keySet()) { + addIO(vertex.getName()); + } + } finally { + writeLock.unlock(); } } @Override public void setOutputVertices(Map<Vertex, Edge> outVertices) { - this.targetVertices = outVertices; - for (Vertex vertex : targetVertices.keySet()) { - addIO(vertex.getName()); + writeLock.lock(); + try { + this.targetVertices = outVertices; + for (Vertex vertex : targetVertices.keySet()) { + addIO(vertex.getName()); + } + } finally { + writeLock.unlock();; } } @@ -4881,9 +4892,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException { + // For locking strategy, please refer to getOutputSpecList() readLock.lock(); + List<InputSpec> inputSpecList = null; try { - List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() + inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size())); if (rootInputDescriptors != null) { for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> @@ -4893,44 +4906,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex))); } } - for(Vertex vertex : getInputVertices().keySet()) { - /** - * It is possible that setParallelism is in the middle of processing in target vertex with - * its write lock. So we need to get inputspec by acquiring read lock in target vertex to - * get consistent view. - * Refer TEZ-2251 - */ - InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); - // TODO DAGAM This should be based on the edge type. - inputSpecList.add(inputSpec); - } - return inputSpecList; } finally { readLock.unlock(); } + + for(Vertex vertex : getInputVertices().keySet()) { + /** + * It is possible that setParallelism is in the middle of processing in target vertex with + * its write lock. So we need to get inputspec by acquiring read lock in target vertex to + * get consistent view. + * Refer TEZ-2251 + */ + InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); + // TODO DAGAM This should be based on the edge type. + inputSpecList.add(inputSpec); + } + return inputSpecList; } @Override public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException { + /** + * Ref: TEZ-3297 + * Locking entire method could introduce a nested lock and + * could lead to deadlock in corner cases. Example of deadlock with nested lock here: + * 1. In thread#1, Downstream vertex is in the middle of processing setParallelism and gets + * writeLock. + * 2. In thread#2, currentVertex acquires read lock + * 3. In thread#3, central dispatcher tries to process an event for current vertex, + * so tries to acquire write lock. + * + * In further processing, + * 4. In thread#1, it tries to acquire readLock on current vertex for setting edges. But + * this would be blocked as #3 already requested for write lock + * 5. In thread#2, getting readLock on downstream vertex would be blocked as writeLock + * is held by thread#1. + * 6. thread#3 is anyways blocked due to thread#2's read lock on current vertex. + */ + + List<OutputSpec> outputSpecList = null; readLock.lock(); try { - List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() + outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() + this.additionalOutputSpecs.size()); outputSpecList.addAll(additionalOutputSpecs); - for(Vertex vertex : targetVertices.keySet()) { - /** - * It is possible that setParallelism (which could change numTasks) is in the middle of - * processing in target vertex with its write lock. So we need to get outputspec by - * acquiring read lock in target vertex to get consistent view. - * Refer TEZ-2251 - */ - OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); - outputSpecList.add(outputSpec); - } - return outputSpecList; } finally { readLock.unlock(); } + + for(Vertex vertex : targetVertices.keySet()) { + /** + * It is possible that setParallelism (which could change numTasks) is in the middle of + * processing in target vertex with its write lock. So we need to get outputspec by + * acquiring read lock in target vertex to get consistent view. + * Refer TEZ-2251 + */ + OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); + outputSpecList.add(outputSpec); + } + return outputSpecList; } private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws
