Repository: hadoop
Updated Branches:
  refs/heads/YARN-1051 9fbe77a24 -> ba72e4f1c


rebasing branch yarn-1051 with trunk to keep pace


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e209552c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e209552c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e209552c

Branch: refs/heads/YARN-1051
Commit: e209552c6b525f09dc092bf94b56d4f15c1908d7
Parents: addc053 6b66ce3
Author: subru <[email protected]>
Authored: Wed Sep 24 14:41:50 2014 -0700
Committer: subru <[email protected]>
Committed: Wed Sep 24 14:47:52 2014 -0700

----------------------------------------------------------------------
 .../src/main/proto/yarn_protos.proto            | 31 ++++++++
 .../server/resourcemanager/rmapp/RMAppImpl.java | 82 +++++++++++++++++---
 2 files changed, 101 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e209552c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --cc 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index a66257d,a66257d..4e415d6
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@@ -342,6 -342,6 +342,37 @@@ message QueueUserACLInfoProto 
  }
  
  ////////////////////////////////////////////////////////////////////////
++////// From reservation_protocol /////////////////////////////////////
++////////////////////////////////////////////////////////////////////////
++
++message ReservationIdProto {
++optional int64 id = 1;
++optional int64 cluster_timestamp = 2;
++}
++message ReservationRequestProto {
++optional ResourceProto capability = 1;
++optional int32 num_containers = 2 [default = 1];
++optional int32 concurrency = 3 [default = 1];
++optional int64 duration = 4 [default = -1];
++}
++message ReservationRequestsProto {
++repeated ReservationRequestProto reservation_resources = 1;
++optional ReservationRequestInterpreterProto interpreter = 2 [default = R_ALL];
++}
++message ReservationDefinitionProto {
++optional ReservationRequestsProto reservation_requests = 1;
++optional int64 arrival = 2;
++optional int64 deadline = 3;
++optional string reservation_name = 4;
++}
++enum ReservationRequestInterpreterProto {
++R_ANY = 0;
++R_ALL = 1;
++R_ORDER = 2;
++R_ORDER_NO_GAP = 3;
++}
++
++////////////////////////////////////////////////////////////////////////
  ////// From container_manager //////////////////////////////////////////
  ////////////////////////////////////////////////////////////////////////
  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e209552c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --cc 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index cf59c7f,cf59c7f..20d773c
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@@ -79,12 -79,12 +79,18 @@@ import org.apache.hadoop.yarn.server.re
  import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
  import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
  import org.apache.hadoop.yarn.server.utils.BuilderUtils;
++import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
  import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
  import org.apache.hadoop.yarn.state.MultipleArcTransition;
  import org.apache.hadoop.yarn.state.SingleArcTransition;
  import org.apache.hadoop.yarn.state.StateMachine;
  import org.apache.hadoop.yarn.state.StateMachineFactory;
++import org.apache.hadoop.yarn.util.Clock;
++import org.apache.hadoop.yarn.util.SystemClock;
  import org.apache.hadoop.yarn.util.resource.Resources;
++import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
++
++import com.google.common.annotations.VisibleForTesting;
  
  @SuppressWarnings({ "rawtypes", "unchecked" })
  public class RMAppImpl implements RMApp, Recoverable {
@@@ -113,6 -113,6 +119,12 @@@
    private final String applicationType;
    private final Set<String> applicationTags;
  
++  private final long attemptFailuresValidityInterval;
++
++  private Clock systemClock;
++
++  private boolean isNumAttemptsBeyondThreshold = false;
++
    // Mutable fields
    private long startTime;
    private long finishTime = 0;
@@@ -331,6 -331,6 +343,8 @@@
        ApplicationMasterService masterService, long submitTime,
        String applicationType, Set<String> applicationTags) {
  
++    this.systemClock = new SystemClock();
++
      this.applicationId = applicationId;
      this.name = name;
      this.rmContext = rmContext;
@@@ -343,7 -343,7 +357,7 @@@
      this.scheduler = scheduler;
      this.masterService = masterService;
      this.submitTime = submitTime;
--    this.startTime = System.currentTimeMillis();
++    this.startTime = this.systemClock.getTime();
      this.applicationType = applicationType;
      this.applicationTags = applicationTags;
  
@@@ -361,6 -361,6 +375,9 @@@
        this.maxAppAttempts = individualMaxAppAttempts;
      }
  
++    this.attemptFailuresValidityInterval =
++        submissionContext.getAttemptFailuresValidityInterval();
++
      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
      this.readLock = lock.readLock();
      this.writeLock = lock.writeLock();
@@@ -368,6 -368,6 +385,7 @@@
      this.stateMachine = stateMachineFactory.make(this);
  
      rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
++    rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
    }
  
    @Override
@@@ -529,6 -529,6 +547,7 @@@
        float progress = 0.0f;
        org.apache.hadoop.yarn.api.records.Token amrmToken = null;
        if (allowAccess) {
++        trackingUrl = getDefaultProxyTrackingUrl();
          if (this.currentAttempt != null) {
            currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
            trackingUrl = this.currentAttempt.getTrackingUrl();
@@@ -589,6 -589,6 +608,20 @@@
      }
    }
  
++  private String getDefaultProxyTrackingUrl() {
++    try {
++      final String scheme = WebAppUtils.getHttpSchemePrefix(conf);
++      String proxy = WebAppUtils.getProxyHostAndPort(conf);
++      URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy);
++      URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId);
++      return result.toASCIIString();
++    } catch (URISyntaxException e) {
++      LOG.warn("Could not generate default proxy tracking URL for "
++          + applicationId);
++      return UNAVAILABLE;
++    }
++  }
++
    @Override
    public long getFinishTime() {
      this.readLock.lock();
@@@ -631,6 -631,6 +664,20 @@@
    }
  
    @Override
++  public String getOriginalTrackingUrl() {
++    this.readLock.lock();
++    
++    try {
++      if (this.currentAttempt != null) {
++        return this.currentAttempt.getOriginalTrackingUrl();
++      }
++      return null;
++    } finally {
++      this.readLock.unlock();
++    }
++  }
++
++  @Override
    public StringBuilder getDiagnostics() {
      this.readLock.lock();
  
@@@ -888,7 -888,7 +935,7 @@@
        msg = "Unmanaged application " + this.getApplicationId()
                + " failed due to " + failedEvent.getDiagnostics()
                + ". Failing the application.";
--    } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
++    } else if (this.isNumAttemptsBeyondThreshold) {
        msg = "Application " + this.getApplicationId() + " failed "
                + this.maxAppAttempts + " times due to "
                + failedEvent.getDiagnostics() + ". Failing the application.";
@@@ -921,7 -921,7 +968,7 @@@
        RMAppState stateToBeStored) {
      rememberTargetTransitions(event, transitionToDo, targetFinalState);
      this.stateBeforeFinalSaving = getState();
--    this.storedFinishTime = System.currentTimeMillis();
++    this.storedFinishTime = this.systemClock.getTime();
  
      LOG.info("Updating application " + this.applicationId
          + " with final state: " + this.targetedFinalState);
@@@ -1088,7 -1088,7 +1135,7 @@@
        }
        app.finishTime = app.storedFinishTime;
        if (app.finishTime == 0 ) {
--        app.finishTime = System.currentTimeMillis();
++        app.finishTime = app.systemClock.getTime();
        }
        // Recovered apps that are completed were not added to scheduler, so no
        // need to remove them from scheduler.
@@@ -1102,16 -1102,16 +1149,23 @@@
  
        app.rmContext.getRMApplicationHistoryWriter()
            .applicationFinished(app, finalState);
++      app.rmContext.getSystemMetricsPublisher()
++          .appFinished(app, finalState, app.finishTime);
      };
    }
  
    private int getNumFailedAppAttempts() {
      int completedAttempts = 0;
++    long endTime = this.systemClock.getTime();
      // Do not count AM preemption, hardware failures or NM resync
      // as attempt failure.
      for (RMAppAttempt attempt : attempts.values()) {
        if (attempt.shouldCountTowardsMaxAttemptRetry()) {
--        completedAttempts++;
++        if (this.attemptFailuresValidityInterval <= 0
++            || (attempt.getFinishTime() > endTime
++                - this.attemptFailuresValidityInterval)) {
++          completedAttempts++;
++        }
        }
      }
      return completedAttempts;
@@@ -1128,9 -1128,9 +1182,10 @@@
  
      @Override
      public RMAppState transition(RMAppImpl app, RMAppEvent event) {
++      int numberOfFailure = app.getNumFailedAppAttempts();
        if (!app.submissionContext.getUnmanagedAM()
--          && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
--        boolean transferStateFromPreviousAttempt = false;
++          && numberOfFailure < app.maxAppAttempts) {
++        boolean transferStateFromPreviousAttempt;
          RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
          transferStateFromPreviousAttempt =
              failedEvent.getTransferStateFromPreviousAttempt();
@@@ -1140,13 -1140,13 +1195,16 @@@
          // Transfer the state from the previous attempt to the current 
attempt.
          // Note that the previous failed attempt may still be collecting the
          // container events from the scheduler and update its data structures
--        // before the new attempt is created.
--        if (transferStateFromPreviousAttempt) {
--          ((RMAppAttemptImpl) app.currentAttempt)
--            .transferStateFromPreviousAttempt(oldAttempt);
--        }
++        // before the new attempt is created. We always transferState for
++        // finished containers so that they can be acked to NM,
++        // but when pulling finished container we will check this flag again.
++        ((RMAppAttemptImpl) app.currentAttempt)
++          .transferStateFromPreviousAttempt(oldAttempt);
          return initialState;
        } else {
++        if (numberOfFailure >= app.maxAppAttempts) {
++          app.isNumAttemptsBeyondThreshold = true;
++        }
          app.rememberTargetTransitionsAndStoreState(event,
            new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
            RMAppState.FAILED);

Reply via email to