Rebasing with trunk to create YARN-1051 merge patch
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c7d1df1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c7d1df1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c7d1df1 Branch: refs/heads/YARN-1051 Commit: 1c7d1df1deb88cd02ec8ba1604bf890424c7c94a Parents: 90ac0be Author: subru <[email protected]> Authored: Thu Sep 25 13:25:00 2014 -0700 Committer: subru <[email protected]> Committed: Thu Sep 25 13:25:00 2014 -0700 ---------------------------------------------------------------------- .../server/resourcemanager/rmapp/RMAppImpl.java | 82 +++++++++++++++++--- 1 file changed, 70 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c7d1df1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cf59c7f..20d773c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -79,12 +79,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.annotations.VisibleForTesting; @SuppressWarnings({ "rawtypes", "unchecked" }) public class RMAppImpl implements RMApp, Recoverable { @@ -113,6 +119,12 @@ public class RMAppImpl implements RMApp, Recoverable { private final String applicationType; private final Set<String> applicationTags; + private final long attemptFailuresValidityInterval; + + private Clock systemClock; + + private boolean isNumAttemptsBeyondThreshold = false; + // Mutable fields private long startTime; private long finishTime = 0; @@ -331,6 +343,8 @@ public class RMAppImpl implements RMApp, Recoverable { ApplicationMasterService masterService, long submitTime, String applicationType, Set<String> applicationTags) { + this.systemClock = new SystemClock(); + this.applicationId = applicationId; this.name = name; this.rmContext = rmContext; @@ -343,7 +357,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.scheduler = scheduler; this.masterService = masterService; this.submitTime = submitTime; - this.startTime = System.currentTimeMillis(); + this.startTime = this.systemClock.getTime(); this.applicationType = applicationType; this.applicationTags = applicationTags; @@ -361,6 +375,9 @@ public class RMAppImpl implements RMApp, Recoverable { this.maxAppAttempts = individualMaxAppAttempts; } + this.attemptFailuresValidityInterval = + submissionContext.getAttemptFailuresValidityInterval(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -368,6 +385,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.stateMachine = stateMachineFactory.make(this); rmContext.getRMApplicationHistoryWriter().applicationStarted(this); + rmContext.getSystemMetricsPublisher().appCreated(this, startTime); } @Override @@ -529,6 +547,7 @@ public class RMAppImpl implements RMApp, Recoverable { float progress = 0.0f; org.apache.hadoop.yarn.api.records.Token amrmToken = null; if (allowAccess) { + trackingUrl = getDefaultProxyTrackingUrl(); if (this.currentAttempt != null) { currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); @@ -589,6 +608,20 @@ public class RMAppImpl implements RMApp, Recoverable { } } + private String getDefaultProxyTrackingUrl() { + try { + final String scheme = WebAppUtils.getHttpSchemePrefix(conf); + String proxy = WebAppUtils.getProxyHostAndPort(conf); + URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy); + URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId); + return result.toASCIIString(); + } catch (URISyntaxException e) { + LOG.warn("Could not generate default proxy tracking URL for " + + applicationId); + return UNAVAILABLE; + } + } + @Override public long getFinishTime() { this.readLock.lock(); @@ -631,6 +664,20 @@ public class RMAppImpl implements RMApp, Recoverable { } @Override + public String getOriginalTrackingUrl() { + this.readLock.lock(); + + try { + if (this.currentAttempt != null) { + return this.currentAttempt.getOriginalTrackingUrl(); + } + return null; + } finally { + this.readLock.unlock(); + } + } + + @Override public StringBuilder getDiagnostics() { this.readLock.lock(); @@ -888,7 +935,7 @@ public class RMAppImpl implements RMApp, Recoverable { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { + } else if (this.isNumAttemptsBeyondThreshold) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -921,7 +968,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppState stateToBeStored) { rememberTargetTransitions(event, transitionToDo, targetFinalState); this.stateBeforeFinalSaving = getState(); - this.storedFinishTime = System.currentTimeMillis(); + this.storedFinishTime = this.systemClock.getTime(); LOG.info("Updating application " + this.applicationId + " with final state: " + this.targetedFinalState); @@ -1088,7 +1135,7 @@ public class RMAppImpl implements RMApp, Recoverable { } app.finishTime = app.storedFinishTime; if (app.finishTime == 0 ) { - app.finishTime = System.currentTimeMillis(); + app.finishTime = app.systemClock.getTime(); } // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. @@ -1102,16 +1149,23 @@ public class RMAppImpl implements RMApp, Recoverable { app.rmContext.getRMApplicationHistoryWriter() .applicationFinished(app, finalState); + app.rmContext.getSystemMetricsPublisher() + .appFinished(app, finalState, app.finishTime); }; } private int getNumFailedAppAttempts() { int completedAttempts = 0; + long endTime = this.systemClock.getTime(); // Do not count AM preemption, hardware failures or NM resync // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) { - completedAttempts++; + if (this.attemptFailuresValidityInterval <= 0 + || (attempt.getFinishTime() > endTime + - this.attemptFailuresValidityInterval)) { + completedAttempts++; + } } } return completedAttempts; @@ -1128,9 +1182,10 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + int numberOfFailure = app.getNumFailedAppAttempts(); if (!app.submissionContext.getUnmanagedAM() - && app.getNumFailedAppAttempts() < app.maxAppAttempts) { - boolean transferStateFromPreviousAttempt = false; + && numberOfFailure < app.maxAppAttempts) { + boolean transferStateFromPreviousAttempt; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = failedEvent.getTransferStateFromPreviousAttempt(); @@ -1140,13 +1195,16 @@ public class RMAppImpl implements RMApp, Recoverable { // Transfer the state from the previous attempt to the current attempt. // Note that the previous failed attempt may still be collecting the // container events from the scheduler and update its data structures - // before the new attempt is created. - if (transferStateFromPreviousAttempt) { - ((RMAppAttemptImpl) app.currentAttempt) - .transferStateFromPreviousAttempt(oldAttempt); - } + // before the new attempt is created. We always transferState for + // finished containers so that they can be acked to NM, + // but when pulling finished container we will check this flag again. + ((RMAppAttemptImpl) app.currentAttempt) + .transferStateFromPreviousAttempt(oldAttempt); return initialState; } else { + if (numberOfFailure >= app.maxAppAttempts) { + app.isNumAttemptsBeyondThreshold = true; + } app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED);
