YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b5b71f7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b5b71f7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b5b71f7 Branch: refs/heads/YARN-1051 Commit: 7b5b71f7899e490cfe0aa00d835ffb3dba39e9e0 Parents: fd86531 Author: subru <[email protected]> Authored: Thu Sep 18 15:30:27 2014 -0700 Committer: subru <[email protected]> Committed: Thu Sep 25 13:18:11 2014 -0700 ---------------------------------------------------------------------- YARN-1051-CHANGES.txt | 3 + .../yarn/client/api/impl/TestYarnClient.java | 1 - .../yarn/server/resourcemanager/RMContext.java | 3 - .../server/resourcemanager/rmapp/RMAppImpl.java | 82 +++----------------- 4 files changed, 15 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5b71f7/YARN-1051-CHANGES.txt ---------------------------------------------------------------------- diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt index 56b3c12..c4106b2 100644 --- a/YARN-1051-CHANGES.txt +++ b/YARN-1051-CHANGES.txt @@ -20,3 +20,6 @@ on user reservations. (Carlo Curino and Subru Krishnan via curino) YARN-1712. Plan follower that synchronizes the current state of reservation subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru) + +YARN-2080. Integrating reservation system with ResourceManager and +client-RM protocol. (Subru Krishnan and Carlo Curino via subru) http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5b71f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 97dee15..d7bea7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -949,5 +949,4 @@ public class TestYarnClient { ReservationSystemTestUtil.reservationQ); return request; } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5b71f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 66cdfd7..31c9ef5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -110,10 +110,7 @@ public interface RMContext { long getEpoch(); - ReservationSystem getReservationSystem(); - boolean isSchedulerReadyForAllocatingContainers(); ReservationSystem getReservationSystem(); - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5b71f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 20d773c..cf59c7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -79,18 +79,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; - -import com.google.common.annotations.VisibleForTesting; @SuppressWarnings({ "rawtypes", "unchecked" }) public class RMAppImpl implements RMApp, Recoverable { @@ -119,12 +113,6 @@ public class RMAppImpl implements RMApp, Recoverable { private final String applicationType; private final Set<String> applicationTags; - private final long attemptFailuresValidityInterval; - - private Clock systemClock; - - private boolean isNumAttemptsBeyondThreshold = false; - // Mutable fields private long startTime; private long finishTime = 0; @@ -343,8 +331,6 @@ public class RMAppImpl implements RMApp, Recoverable { ApplicationMasterService masterService, long submitTime, String applicationType, Set<String> applicationTags) { - this.systemClock = new SystemClock(); - this.applicationId = applicationId; this.name = name; this.rmContext = rmContext; @@ -357,7 +343,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.scheduler = scheduler; this.masterService = masterService; this.submitTime = submitTime; - this.startTime = this.systemClock.getTime(); + this.startTime = System.currentTimeMillis(); this.applicationType = applicationType; this.applicationTags = applicationTags; @@ -375,9 +361,6 @@ public class RMAppImpl implements RMApp, Recoverable { this.maxAppAttempts = individualMaxAppAttempts; } - this.attemptFailuresValidityInterval = - submissionContext.getAttemptFailuresValidityInterval(); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -385,7 +368,6 @@ public class RMAppImpl implements RMApp, Recoverable { this.stateMachine = stateMachineFactory.make(this); rmContext.getRMApplicationHistoryWriter().applicationStarted(this); - rmContext.getSystemMetricsPublisher().appCreated(this, startTime); } @Override @@ -547,7 +529,6 @@ public class RMAppImpl implements RMApp, Recoverable { float progress = 0.0f; org.apache.hadoop.yarn.api.records.Token amrmToken = null; if (allowAccess) { - trackingUrl = getDefaultProxyTrackingUrl(); if (this.currentAttempt != null) { currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); @@ -608,20 +589,6 @@ public class RMAppImpl implements RMApp, Recoverable { } } - private String getDefaultProxyTrackingUrl() { - try { - final String scheme = WebAppUtils.getHttpSchemePrefix(conf); - String proxy = WebAppUtils.getProxyHostAndPort(conf); - URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy); - URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId); - return result.toASCIIString(); - } catch (URISyntaxException e) { - LOG.warn("Could not generate default proxy tracking URL for " - + applicationId); - return UNAVAILABLE; - } - } - @Override public long getFinishTime() { this.readLock.lock(); @@ -664,20 +631,6 @@ public class RMAppImpl implements RMApp, Recoverable { } @Override - public String getOriginalTrackingUrl() { - this.readLock.lock(); - - try { - if (this.currentAttempt != null) { - return this.currentAttempt.getOriginalTrackingUrl(); - } - return null; - } finally { - this.readLock.unlock(); - } - } - - @Override public StringBuilder getDiagnostics() { this.readLock.lock(); @@ -935,7 +888,7 @@ public class RMAppImpl implements RMApp, Recoverable { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (this.isNumAttemptsBeyondThreshold) { + } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -968,7 +921,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppState stateToBeStored) { rememberTargetTransitions(event, transitionToDo, targetFinalState); this.stateBeforeFinalSaving = getState(); - this.storedFinishTime = this.systemClock.getTime(); + this.storedFinishTime = System.currentTimeMillis(); LOG.info("Updating application " + this.applicationId + " with final state: " + this.targetedFinalState); @@ -1135,7 +1088,7 @@ public class RMAppImpl implements RMApp, Recoverable { } app.finishTime = app.storedFinishTime; if (app.finishTime == 0 ) { - app.finishTime = app.systemClock.getTime(); + app.finishTime = System.currentTimeMillis(); } // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. @@ -1149,23 +1102,16 @@ public class RMAppImpl implements RMApp, Recoverable { app.rmContext.getRMApplicationHistoryWriter() .applicationFinished(app, finalState); - app.rmContext.getSystemMetricsPublisher() - .appFinished(app, finalState, app.finishTime); }; } private int getNumFailedAppAttempts() { int completedAttempts = 0; - long endTime = this.systemClock.getTime(); // Do not count AM preemption, hardware failures or NM resync // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) { - if (this.attemptFailuresValidityInterval <= 0 - || (attempt.getFinishTime() > endTime - - this.attemptFailuresValidityInterval)) { - completedAttempts++; - } + completedAttempts++; } } return completedAttempts; @@ -1182,10 +1128,9 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - int numberOfFailure = app.getNumFailedAppAttempts(); if (!app.submissionContext.getUnmanagedAM() - && numberOfFailure < app.maxAppAttempts) { - boolean transferStateFromPreviousAttempt; + && app.getNumFailedAppAttempts() < app.maxAppAttempts) { + boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = failedEvent.getTransferStateFromPreviousAttempt(); @@ -1195,16 +1140,13 @@ public class RMAppImpl implements RMApp, Recoverable { // Transfer the state from the previous attempt to the current attempt. // Note that the previous failed attempt may still be collecting the // container events from the scheduler and update its data structures - // before the new attempt is created. We always transferState for - // finished containers so that they can be acked to NM, - // but when pulling finished container we will check this flag again. - ((RMAppAttemptImpl) app.currentAttempt) - .transferStateFromPreviousAttempt(oldAttempt); + // before the new attempt is created. + if (transferStateFromPreviousAttempt) { + ((RMAppAttemptImpl) app.currentAttempt) + .transferStateFromPreviousAttempt(oldAttempt); + } return initialState; } else { - if (numberOfFailure >= app.maxAppAttempts) { - app.isNumAttemptsBeyondThreshold = true; - } app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED);
