Repository: tez Updated Branches: refs/heads/branch-0.7 c9ef246a6 -> e969a8f70
TEZ-2307. Possible wrong error message when submitting new dag (zjffdu) (cherry picked from commit 235841f77ebf88994c8d7af189cf1000aedbd69f) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e969a8f7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e969a8f7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e969a8f7 Branch: refs/heads/branch-0.7 Commit: e969a8f70ef3204b01b2accf667dc45cbe05a081 Parents: c9ef246 Author: Jeff Zhang <[email protected]> Authored: Tue Feb 2 13:21:45 2016 +0800 Committer: Jeff Zhang <[email protected]> Committed: Tue Feb 2 16:00:38 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 33 ++++++++++++++------ 2 files changed, 25 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e969a8f7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f97bf5c..a1c31e2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-2307. Possible wrong error message when submitting new dag TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services. TEZ-3036. Tez AM can hang on startup with no indication of error http://git-wip-us.apache.org/repos/asf/tez/blob/e969a8f7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c163b62..5acee83 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -264,6 +264,7 @@ public class DAGAppMaster extends AbstractService { private final UserGroupInformation appMasterUgi; private AtomicBoolean sessionStopped = new AtomicBoolean(false); + private final Object idleStateLock = new Object(); private long sessionTimeoutInterval; private long lastDAGCompletionTime; private Timer dagSubmissionTimer; @@ -728,7 +729,6 @@ public class DAGAppMaster extends AbstractService { // Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events. // However, eventually it needs to be moved out. this.taskSchedulerEventHandler.dagCompleted(); - state = DAGAppMasterState.IDLE; } else { LOG.info("Session shutting down now."); this.taskSchedulerEventHandler.setShouldUnregisterFlag(); @@ -768,6 +768,10 @@ public class DAGAppMaster extends AbstractService { TezDAGID.clearCache(); LOG.info("Completed cleanup for DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + cleanupEvent.getDag().getID()); + synchronized (idleStateLock) { + state = DAGAppMasterState.IDLE; + idleStateLock.notify(); + } break; case NEW_DAG_SUBMITTED: // Inform sub-components that a new DAG has been submitted. @@ -1246,21 +1250,33 @@ public class DAGAppMaster extends AbstractService { throw new SessionNotRunning("AM unable to accept new DAG submissions." + " In the process of shutting down"); } + + // dag is in cleanup when dag state is completed but AM state is still RUNNING + synchronized (idleStateLock) { + while (currentDAG != null && currentDAG.isComplete() && state == DAGAppMasterState.RUNNING) { + try { + LOG.info("wait for previous dag cleanup"); + idleStateLock.wait(); + } catch (InterruptedException e) { + throw new TezException(e); + } + } + } + synchronized (this) { if (this.versionMismatch) { throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is" + " incompatible with the client. " + versionMismatchDiagnostics); } - if (currentDAG != null - && !state.equals(DAGAppMasterState.IDLE)) { - throw new TezException("App master already running a DAG"); - } if (state.equals(DAGAppMasterState.ERROR) - || sessionStopped.get()) { + || sessionStopped.get()) { throw new SessionNotRunning("AM unable to accept new DAG submissions." - + " In the process of shutting down"); + + " In the process of shutting down"); + } + if (currentDAG != null + && !currentDAG.isComplete()) { + throw new TezException("App master already running a DAG"); } - // RPC server runs in the context of the job user as it was started in // the job user's UGI context LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName()); @@ -2289,7 +2305,6 @@ public class DAGAppMaster extends AbstractService { } startDAGExecution(newDAG, lrDiff); - // set state after curDag is set this.state = DAGAppMasterState.RUNNING; }
