YARN-5095. flow activities and flow runs are populated with wrong timestamp 
when RM restarts w/ recovery enabled (Varun Saxena via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/70223612
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/70223612
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/70223612

Branch: refs/heads/trunk
Commit: 702236129b930a799a5a3295f0aa0dc7b619c354
Parents: 831a3ff
Author: Sangjin Lee <sj...@apache.org>
Authored: Wed May 25 16:56:49 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Sun Jul 10 08:46:00 2016 -0700

----------------------------------------------------------------------
 .../server/resourcemanager/RMAppManager.java    | 12 ++--
 .../server/resourcemanager/rmapp/RMAppImpl.java | 19 +++++-
 .../server/resourcemanager/TestRMRestart.java   | 63 ++++++++++++++++++++
 3 files changed, 87 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/70223612/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 836fe90..49daedb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -288,8 +288,10 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
       String user) throws YarnException, AccessControlException {
     ApplicationId applicationId = submissionContext.getApplicationId();
 
-    RMAppImpl application =
-        createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
+    // Passing start time as -1. It will be eventually set in RMAppImpl
+    // constructor.
+    RMAppImpl application = createAndPopulateNewRMApp(
+        submissionContext, submitTime, user, false, -1);
     Credentials credentials = null;
     try {
       credentials = parseCredentials(submissionContext);
@@ -327,14 +329,14 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
     // create and recover app.
     RMAppImpl application =
         createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
-            appState.getUser(), true);
+            appState.getUser(), true, appState.getStartTime());
 
     application.handle(new RMAppRecoverEvent(appId, rmState));
   }
 
   private RMAppImpl createAndPopulateNewRMApp(
       ApplicationSubmissionContext submissionContext, long submitTime,
-      String user, boolean isRecovery)
+      String user, boolean isRecovery, long startTime)
       throws YarnException, AccessControlException {
     // Do queue mapping
     if (!isRecovery) {
@@ -391,7 +393,7 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
             submissionContext.getQueue(),
             submissionContext, this.scheduler, this.masterService,
             submitTime, submissionContext.getApplicationType(),
-            submissionContext.getApplicationTags(), amReq);
+            submissionContext.getApplicationTags(), amReq, startTime);
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with different applicationIds will not
     // influence each other

http://git-wip-us.apache.org/repos/asf/hadoop/blob/70223612/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 8463f3a..6e448f7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -416,8 +416,19 @@ public class RMAppImpl implements RMApp, Recoverable {
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
       ApplicationMasterService masterService, long submitTime,
-      String applicationType, Set<String> applicationTags, 
+      String applicationType, Set<String> applicationTags,
       ResourceRequest amReq) {
+    this(applicationId, rmContext, config, name, user, queue, 
submissionContext,
+      scheduler, masterService, submitTime, applicationType, applicationTags,
+      amReq, -1);
+  }
+
+  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
+      Configuration config, String name, String user, String queue,
+      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
+      ApplicationMasterService masterService, long submitTime,
+      String applicationType, Set<String> applicationTags,
+      ResourceRequest amReq, long startTime) {
 
     this.systemClock = SystemClock.getInstance();
 
@@ -433,7 +444,11 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.submitTime = submitTime;
-    this.startTime = this.systemClock.getTime();
+    if (startTime <= 0) {
+      this.startTime = this.systemClock.getTime();
+    } else {
+      this.startTime = startTime;
+    }
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
     this.amReq = amReq;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/70223612/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 364f9d1..bbb1d15 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -109,6 +109,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
@@ -1126,6 +1127,68 @@ public class TestRMRestart extends 
ParameterizedSchedulerTestBase {
   }
 
   @Test (timeout = 60000)
+  public void testRMRestartTimelineCollectorContext() throws Exception {
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationStateData> rmAppState =
+        rmState.getApplicationState();
+    MockRM rm1 = null;
+    MockRM rm2 = null;
+    try {
+      rm1 = createMockRM(conf, memStore);
+      rm1.start();
+      MockNM nm1 =
+          new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+      nm1.registerNode();
+
+      // submit an app.
+      RMApp app = rm1.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+          null);
+      // Check if app info has been saved.
+      ApplicationStateData appState = rmAppState.get(app.getApplicationId());
+      Assert.assertNotNull(appState);
+      Assert.assertEquals(0, appState.getAttemptCount());
+      Assert.assertEquals(appState.getApplicationSubmissionContext()
+          .getApplicationId(), app.getApplicationSubmissionContext()
+          .getApplicationId());
+
+      // Allocate the AM
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt = app.getCurrentAppAttempt();
+      ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
+      rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+
+      ApplicationId appId = app.getApplicationId();
+      TimelineCollectorContext contextBeforeRestart =
+          rm1.getRMContext().getRMTimelineCollectorManager().get(appId).
+              getTimelineEntityContext();
+
+      // Restart RM.
+      rm2 = createMockRM(conf, memStore);
+      rm2.start();
+      Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+      rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+      TimelineCollectorContext contextAfterRestart =
+          rm2.getRMContext().getRMTimelineCollectorManager().get(appId).
+              getTimelineEntityContext();
+      Assert.assertEquals("Collector contexts for an app should be same " +
+          "across restarts", contextBeforeRestart, contextAfterRestart);
+    } finally {
+      conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+      if (rm1 != null) {
+        rm1.close();
+      }
+      if (rm2 != null) {
+        rm2.close();
+      }
+    }
+  }
+
+  @Test (timeout = 60000)
   public void testDelegationTokenRestoredInDelegationTokenRenewer()
       throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to