Repository: tez Updated Branches: refs/heads/master a812c3462 -> de3a0748f
TEZ-3117. Deadlock in Edge and Vertex code (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/de3a0748 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/de3a0748 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/de3a0748 Branch: refs/heads/master Commit: de3a0748ff19b5ced87050596d088bdb573cae05 Parents: a812c34 Author: Bikas Saha <[email protected]> Authored: Wed Feb 17 17:48:55 2016 -0800 Committer: Bikas Saha <[email protected]> Committed: Wed Feb 17 17:48:55 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/tez/dag/app/dag/impl/Edge.java | 59 ++++++++++++-------- 2 files changed, 37 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/de3a0748/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index af643dd..d10b47a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3117. Deadlock in Edge and Vertex code TEZ-3103. Shuffle can hang when memory to memory merging enabled TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier @@ -334,6 +335,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3117. Deadlock in Edge and Vertex code TEZ-3103. Shuffle can hang when memory to memory merging enabled TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier http://git-wip-us.apache.org/repos/asf/tez/blob/de3a0748/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index 0be7790..bb4d319 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -174,19 +174,24 @@ public class Edge { + getEdgeInfo(), e); } } - destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT, - destinationVertex.getName(), - sourceVertex.getName(), - null); + synchronized (this) { + destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT, + destinationVertex.getName(), + sourceVertex.getName(), + null); + } } - public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException { - this.edgeProperty = newEdgeProperty; - boolean wasUnInitialized = (edgeManager == null); - try { - createEdgeManager(); - } catch (TezException e) { - throw new AMUserCodeException(Source.EdgeManager, e); + public void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException { + boolean wasUnInitialized; + synchronized (this) { + this.edgeProperty = newEdgeProperty; + wasUnInitialized = (edgeManager == null); + try { + createEdgeManager(); + } catch (TezException e) { + throw new AMUserCodeException(Source.EdgeManager, e); + } } initialize(); if (wasUnInitialized) { @@ -199,7 +204,7 @@ public class Edge { // Test only method for creating specific scenarios @VisibleForTesting - synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor) + void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor) throws AMUserCodeException { EdgeProperty modifiedEdgeProperty = EdgeProperty.create(descriptor, @@ -210,22 +215,28 @@ public class Edge { setEdgeProperty(modifiedEdgeProperty); } - public synchronized void routingToBegin() throws AMUserCodeException { - if (edgeManagerContext.getDestinationVertexNumTasks() == 0) { - routingNeeded = false; - } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) { - throw new TezUncheckedException( - "Internal error. Not expected to route events to a destination until parallelism is determined" + - " sourceVertex=" + sourceVertex.getLogIdentifier() + - " edgeManager=" + edgeManager.getClass().getName()); + public void routingToBegin() throws AMUserCodeException { + int numDestTasks = edgeManagerContext.getDestinationVertexNumTasks(); + synchronized (this) { + if (numDestTasks == 0) { + routingNeeded = false; + } else if (numDestTasks < 0) { + throw new TezUncheckedException( + "Internal error. Not expected to route events to a destination until parallelism is determined" + + " sourceVertex=" + sourceVertex.getLogIdentifier() + + " edgeManager=" + edgeManager.getClass().getName()); + } + if (edgeManager instanceof EdgeManagerPluginOnDemand) { + onDemandRouting = true; + } } - if (edgeManager instanceof EdgeManagerPluginOnDemand) { - onDemandRouting = true; + + if (onDemandRouting) { try { - ((EdgeManagerPluginOnDemand)edgeManager).prepareForRouting(); + ((EdgeManagerPluginOnDemand) edgeManager).prepareForRouting(); } catch (Exception e) { throw new AMUserCodeException(Source.EdgeManager, - "Fail to prepareForRouting " + getEdgeInfo(), e); + "Fail to prepareForRouting " + getEdgeInfo(), e); } }
