Repository: tez Updated Branches: refs/heads/master 1d11ad275 -> 51fb3e4cf
TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/51fb3e4c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/51fb3e4c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/51fb3e4c Branch: refs/heads/master Commit: 51fb3e4cf0213a39e52476127f390639247e089b Parents: 1d11ad2 Author: Rajesh Balamohan <[email protected]> Authored: Mon Jun 13 05:15:51 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon Jun 13 05:15:51 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 120 ++++++++++++------- 2 files changed, 80 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/51fb3e4c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2229708..73a11fa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3295. TestOrderedWordCount should handle relative input/output paths. TEZ-3290. Set full task attempt id string in MRInput configuration object. @@ -58,6 +59,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3290. Set full task attempt id string in MRInput configuration object. TEZ-3280. LOG MRInputHelpers split generation message as INFO @@ -506,6 +508,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3280. LOG MRInputHelpers split generation message as INFO TEZ-3257. Fix flaky test TestUnorderedPartitionedKVWriter. http://git-wip-us.apache.org/repos/asf/tez/blob/51fb3e4c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index b22af1a..6b79e98 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1593,18 +1593,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } finally { writeLock.unlock(); } - - readLock.lock(); - try { - for (ScheduleTaskRequest task : tasksToSchedule) { - TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex()); - TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId()); - boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null; - eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec, - getTaskLocationHint(taskId), fromRecovery)); - } - } finally { - readLock.unlock(); + + /** + * read lock is not needed here. For e.g after starting task + * scheduling on the vertex, it would not change numTasks. Rest of + * the methods creating remote task specs have their + * own locking mechanisms. Ref: TEZ-3297 + */ + for (ScheduleTaskRequest task : tasksToSchedule) { + TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex()); + TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId()); + boolean fromRecovery = recoveryData == null ? false : recoveryData.getTaskRecoveryData(taskId) != null; + eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec, + getTaskLocationHint(taskId), fromRecovery)); } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier(); @@ -4040,17 +4041,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public void setInputVertices(Map<Vertex, Edge> inVertices) { - this.sourceVertices = inVertices; - for (Vertex vertex : sourceVertices.keySet()) { - addIO(vertex.getName()); + writeLock.lock(); + try { + this.sourceVertices = inVertices; + for (Vertex vertex : sourceVertices.keySet()) { + addIO(vertex.getName()); + } + } finally { + writeLock.unlock(); } } @Override public void setOutputVertices(Map<Vertex, Edge> outVertices) { - this.targetVertices = outVertices; - for (Vertex vertex : targetVertices.keySet()) { - addIO(vertex.getName()); + writeLock.lock(); + try { + this.targetVertices = outVertices; + for (Vertex vertex : targetVertices.keySet()) { + addIO(vertex.getName()); + } + } finally { + writeLock.unlock();; } } @@ -4250,9 +4261,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException { + // For locking strategy, please refer to getOutputSpecList() readLock.lock(); + List<InputSpec> inputSpecList = null; try { - List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() + inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount() + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size())); if (rootInputDescriptors != null) { for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> @@ -4262,44 +4275,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex))); } } - for(Vertex vertex : getInputVertices().keySet()) { - /** - * It is possible that setParallelism is in the middle of processing in target vertex with - * its write lock. So we need to get inputspec by acquiring read lock in target vertex to - * get consistent view. - * Refer TEZ-2251 - */ - InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); - // TODO DAGAM This should be based on the edge type. - inputSpecList.add(inputSpec); - } - return inputSpecList; } finally { readLock.unlock(); } + + for(Vertex vertex : getInputVertices().keySet()) { + /** + * It is possible that setParallelism is in the middle of processing in target vertex with + * its write lock. So we need to get inputspec by acquiring read lock in target vertex to + * get consistent view. + * Refer TEZ-2251 + */ + InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); + // TODO DAGAM This should be based on the edge type. + inputSpecList.add(inputSpec); + } + return inputSpecList; } @Override public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException { + /** + * Ref: TEZ-3297 + * Locking entire method could introduce a nested lock and + * could lead to deadlock in corner cases. Example of deadlock with nested lock here: + * 1. In thread#1, Downstream vertex is in the middle of processing setParallelism and gets + * writeLock. + * 2. In thread#2, currentVertex acquires read lock + * 3. In thread#3, central dispatcher tries to process an event for current vertex, + * so tries to acquire write lock. + * + * In further processing, + * 4. In thread#1, it tries to acquire readLock on current vertex for setting edges. But + * this would be blocked as #3 already requested for write lock + * 5. In thread#2, getting readLock on downstream vertex would be blocked as writeLock + * is held by thread#1. + * 6. thread#3 is anyways blocked due to thread#2's read lock on current vertex. + */ + + List<OutputSpec> outputSpecList = null; readLock.lock(); try { - List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() + outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount() + this.additionalOutputSpecs.size()); outputSpecList.addAll(additionalOutputSpecs); - for(Vertex vertex : targetVertices.keySet()) { - /** - * It is possible that setParallelism (which could change numTasks) is in the middle of - * processing in target vertex with its write lock. So we need to get outputspec by - * acquiring read lock in target vertex to get consistent view. - * Refer TEZ-2251 - */ - OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); - outputSpecList.add(outputSpec); - } - return outputSpecList; } finally { readLock.unlock(); } + + for(Vertex vertex : targetVertices.keySet()) { + /** + * It is possible that setParallelism (which could change numTasks) is in the middle of + * processing in target vertex with its write lock. So we need to get outputspec by + * acquiring read lock in target vertex to get consistent view. + * Refer TEZ-2251 + */ + OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); + outputSpecList.add(outputSpec); + } + return outputSpecList; } private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws
