Repository: hadoop Updated Branches: refs/heads/YARN-1051 9fbe77a24 -> ba72e4f1c
rebasing branch yarn-1051 with trunk to keep pace Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e209552c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e209552c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e209552c Branch: refs/heads/YARN-1051 Commit: e209552c6b525f09dc092bf94b56d4f15c1908d7 Parents: addc053 6b66ce3 Author: subru <[email protected]> Authored: Wed Sep 24 14:41:50 2014 -0700 Committer: subru <[email protected]> Committed: Wed Sep 24 14:47:52 2014 -0700 ---------------------------------------------------------------------- .../src/main/proto/yarn_protos.proto | 31 ++++++++ .../server/resourcemanager/rmapp/RMAppImpl.java | 82 +++++++++++++++++--- 2 files changed, 101 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e209552c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --cc hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index a66257d,a66257d..4e415d6 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@@ -342,6 -342,6 +342,37 @@@ message QueueUserACLInfoProto } //////////////////////////////////////////////////////////////////////// ++////// From reservation_protocol ///////////////////////////////////// ++//////////////////////////////////////////////////////////////////////// ++ ++message ReservationIdProto { ++optional int64 id = 1; ++optional int64 cluster_timestamp = 2; ++} ++message ReservationRequestProto { ++optional ResourceProto capability = 1; ++optional int32 num_containers = 2 [default = 1]; ++optional int32 concurrency = 3 [default = 1]; ++optional int64 duration = 4 [default = -1]; ++} ++message ReservationRequestsProto { ++repeated ReservationRequestProto reservation_resources = 1; ++optional ReservationRequestInterpreterProto interpreter = 2 [default = R_ALL]; ++} ++message ReservationDefinitionProto { ++optional ReservationRequestsProto reservation_requests = 1; ++optional int64 arrival = 2; ++optional int64 deadline = 3; ++optional string reservation_name = 4; ++} ++enum ReservationRequestInterpreterProto { ++R_ANY = 0; ++R_ALL = 1; ++R_ORDER = 2; ++R_ORDER_NO_GAP = 3; ++} ++ ++//////////////////////////////////////////////////////////////////////// ////// From container_manager ////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/e209552c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --cc hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cf59c7f,cf59c7f..20d773c --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@@ -79,12 -79,12 +79,18 @@@ import org.apache.hadoop.yarn.server.re import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; ++import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; ++import org.apache.hadoop.yarn.util.Clock; ++import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; ++import org.apache.hadoop.yarn.webapp.util.WebAppUtils; ++ ++import com.google.common.annotations.VisibleForTesting; @SuppressWarnings({ "rawtypes", "unchecked" }) public class RMAppImpl implements RMApp, Recoverable { @@@ -113,6 -113,6 +119,12 @@@ private final String applicationType; private final Set<String> applicationTags; ++ private final long attemptFailuresValidityInterval; ++ ++ private Clock systemClock; ++ ++ private boolean isNumAttemptsBeyondThreshold = false; ++ // Mutable fields private long startTime; private long finishTime = 0; @@@ -331,6 -331,6 +343,8 @@@ ApplicationMasterService masterService, long submitTime, String applicationType, Set<String> applicationTags) { ++ this.systemClock = new SystemClock(); ++ this.applicationId = applicationId; this.name = name; this.rmContext = rmContext; @@@ -343,7 -343,7 +357,7 @@@ this.scheduler = scheduler; this.masterService = masterService; this.submitTime = submitTime; -- this.startTime = System.currentTimeMillis(); ++ this.startTime = this.systemClock.getTime(); this.applicationType = applicationType; this.applicationTags = applicationTags; @@@ -361,6 -361,6 +375,9 @@@ this.maxAppAttempts = individualMaxAppAttempts; } ++ this.attemptFailuresValidityInterval = ++ submissionContext.getAttemptFailuresValidityInterval(); ++ ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@@ -368,6 -368,6 +385,7 @@@ this.stateMachine = stateMachineFactory.make(this); rmContext.getRMApplicationHistoryWriter().applicationStarted(this); ++ rmContext.getSystemMetricsPublisher().appCreated(this, startTime); } @Override @@@ -529,6 -529,6 +547,7 @@@ float progress = 0.0f; org.apache.hadoop.yarn.api.records.Token amrmToken = null; if (allowAccess) { ++ trackingUrl = getDefaultProxyTrackingUrl(); if (this.currentAttempt != null) { currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); @@@ -589,6 -589,6 +608,20 @@@ } } ++ private String getDefaultProxyTrackingUrl() { ++ try { ++ final String scheme = WebAppUtils.getHttpSchemePrefix(conf); ++ String proxy = WebAppUtils.getProxyHostAndPort(conf); ++ URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy); ++ URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId); ++ return result.toASCIIString(); ++ } catch (URISyntaxException e) { ++ LOG.warn("Could not generate default proxy tracking URL for " ++ + applicationId); ++ return UNAVAILABLE; ++ } ++ } ++ @Override public long getFinishTime() { this.readLock.lock(); @@@ -631,6 -631,6 +664,20 @@@ } @Override ++ public String getOriginalTrackingUrl() { ++ this.readLock.lock(); ++ ++ try { ++ if (this.currentAttempt != null) { ++ return this.currentAttempt.getOriginalTrackingUrl(); ++ } ++ return null; ++ } finally { ++ this.readLock.unlock(); ++ } ++ } ++ ++ @Override public StringBuilder getDiagnostics() { this.readLock.lock(); @@@ -888,7 -888,7 +935,7 @@@ msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; -- } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { ++ } else if (this.isNumAttemptsBeyondThreshold) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@@ -921,7 -921,7 +968,7 @@@ RMAppState stateToBeStored) { rememberTargetTransitions(event, transitionToDo, targetFinalState); this.stateBeforeFinalSaving = getState(); -- this.storedFinishTime = System.currentTimeMillis(); ++ this.storedFinishTime = this.systemClock.getTime(); LOG.info("Updating application " + this.applicationId + " with final state: " + this.targetedFinalState); @@@ -1088,7 -1088,7 +1135,7 @@@ } app.finishTime = app.storedFinishTime; if (app.finishTime == 0 ) { -- app.finishTime = System.currentTimeMillis(); ++ app.finishTime = app.systemClock.getTime(); } // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. @@@ -1102,16 -1102,16 +1149,23 @@@ app.rmContext.getRMApplicationHistoryWriter() .applicationFinished(app, finalState); ++ app.rmContext.getSystemMetricsPublisher() ++ .appFinished(app, finalState, app.finishTime); }; } private int getNumFailedAppAttempts() { int completedAttempts = 0; ++ long endTime = this.systemClock.getTime(); // Do not count AM preemption, hardware failures or NM resync // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) { -- completedAttempts++; ++ if (this.attemptFailuresValidityInterval <= 0 ++ || (attempt.getFinishTime() > endTime ++ - this.attemptFailuresValidityInterval)) { ++ completedAttempts++; ++ } } } return completedAttempts; @@@ -1128,9 -1128,9 +1182,10 @@@ @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { ++ int numberOfFailure = app.getNumFailedAppAttempts(); if (!app.submissionContext.getUnmanagedAM() -- && app.getNumFailedAppAttempts() < app.maxAppAttempts) { -- boolean transferStateFromPreviousAttempt = false; ++ && numberOfFailure < app.maxAppAttempts) { ++ boolean transferStateFromPreviousAttempt; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = failedEvent.getTransferStateFromPreviousAttempt(); @@@ -1140,13 -1140,13 +1195,16 @@@ // Transfer the state from the previous attempt to the current attempt. // Note that the previous failed attempt may still be collecting the // container events from the scheduler and update its data structures -- // before the new attempt is created. -- if (transferStateFromPreviousAttempt) { -- ((RMAppAttemptImpl) app.currentAttempt) -- .transferStateFromPreviousAttempt(oldAttempt); -- } ++ // before the new attempt is created. We always transferState for ++ // finished containers so that they can be acked to NM, ++ // but when pulling finished container we will check this flag again. ++ ((RMAppAttemptImpl) app.currentAttempt) ++ .transferStateFromPreviousAttempt(oldAttempt); return initialState; } else { ++ if (numberOfFailure >= app.maxAppAttempts) { ++ app.isNumAttemptsBeyondThreshold = true; ++ } app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED);
