TEZ-2269. Fix for DAGAppMaster becomint unresoponsive. Contributed by Rajesh Balamohan and Siddharth Seth.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f2d560cb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f2d560cb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f2d560cb Branch: refs/heads/TEZ-2003 Commit: f2d560cbeb85066d241cedb8023b6f7e61b36d86 Parents: b87a36f Author: Siddharth Seth <[email protected]> Authored: Tue Apr 7 00:09:24 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Apr 7 00:09:24 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 69 +++++++++++++------- 2 files changed, 45 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f2d560cb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 26a9d75..11b843d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2269. DAGAppMaster becomes unresponsive (post TEZ-2149). TEZ-2243. documentation should explicitly specify protobuf 2.5.0. TEZ-2232. Allow setParallelism to be called multiple times before tasks get scheduled http://git-wip-us.apache.org/repos/asf/tez/blob/f2d560cb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index e685f1b..3b282d6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -33,13 +33,17 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.state.OnStateChangedCallback; +import org.apache.tez.state.StateMachineTez; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -144,13 +148,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // TODO Recovery //private final List<AMInfo> amInfos; + private final Lock dagStatusLock = new ReentrantLock(); + private final Condition dagCompletionCondition = dagStatusLock.newCondition(); + private final AtomicBoolean isFinalState = new AtomicBoolean(false); private final Lock readLock; private final Lock writeLock; private final String dagName; private final TaskAttemptListener taskAttemptListener; private final TaskHeartbeatHandler taskHeartbeatHandler; private final Object tasksSyncHandle = new Object(); - private final Condition dagCompleteCondition; private volatile boolean committedOrAborted = false; private volatile boolean allOutputsCommitted = false; @@ -193,6 +199,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption; + private static final DagStateChangedCallback STATE_CHANGED_CALLBACK = new DagStateChangedCallback(); + private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); private static final InternalErrorTransition @@ -366,7 +374,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // create the topology tables .installTopology(); - private final StateMachine<DAGState, DAGEventType, DAGEvent> stateMachine; + private final StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl> stateMachine; //changing fields while the job is running @VisibleForTesting @@ -446,7 +454,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); - this.dagCompleteCondition = writeLock.newCondition(); this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(jobPlan .getLocalResourceList()); @@ -472,10 +479,38 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(dagConf); // This "this leak" is okay because the retained pointer is in an // instance variable. - stateMachine = stateMachineFactory.make(this); + stateMachine = new StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl>( + stateMachineFactory.make(this), this); + augmentStateMachine(); this.entityUpdateTracker = new StateChangeNotifier(this); } + private void augmentStateMachine() { + stateMachine + .registerStateEnteredCallback(DAGState.SUCCEEDED, + STATE_CHANGED_CALLBACK) + .registerStateEnteredCallback(DAGState.FAILED, + STATE_CHANGED_CALLBACK) + .registerStateEnteredCallback(DAGState.KILLED, + STATE_CHANGED_CALLBACK) + .registerStateEnteredCallback(DAGState.ERROR, + STATE_CHANGED_CALLBACK); + } + + private static class DagStateChangedCallback + implements OnStateChangedCallback<DAGState, DAGImpl> { + @Override + public void onStateChanged(DAGImpl dag, DAGState dagState) { + dag.isFinalState.set(true); + dag.dagStatusLock.lock(); + try { + dag.dagCompletionCondition.signal(); + } finally { + dag.dagStatusLock.unlock(); + } + } + } + protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() { return stateMachine; } @@ -749,22 +784,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // Return only on SUCCESS timeoutNanos = Long.MAX_VALUE; } - if (isComplete()) { + if (timeoutMillis == 0 || isComplete()) { return getDAGStatus(statusOptions); } while (true) { long nanosLeft; - writeLock.lock(); + dagStatusLock.lock(); try { // Check within the lock to ensure we don't end up waiting after the notify has happened - if (isComplete()) { + if (isFinalState.get()) { break; } - nanosLeft = dagCompleteCondition.awaitNanos(timeoutNanos); + nanosLeft = dagCompletionCondition.awaitNanos(timeoutNanos); } catch (InterruptedException e) { throw new TezException("Interrupted while waiting for dag to complete", e); } finally { - writeLock.unlock(); + dagStatusLock.unlock(); } if (nanosLeft <= 0) { // Time expired. @@ -1219,25 +1254,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, LOG.info("DAG: " + getID() + " finished with state: " + finalState); - // Signal dag completion. - // The state will move to the final state after the Transition which invoked this method completes. - // However, it is OK to send the signal from here itself. - // This happens within a writeLock. The dagCompletionCondition check attempts to check for - // dagCompletion within the associated lock - so it will block till the full transition - // completes and the state updates. - notifyDagFinished(); return finalState; } - private void notifyDagFinished() { - writeLock.lock(); - try { - dagCompleteCondition.signal(); - } finally { - writeLock.unlock(); - } - } - @Override public String getUserName() { return userName;
