YARN-5028. RMStateStore should trim down app state for completed applications. 
Contributed by Gergo Repas.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92cbbfe7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92cbbfe7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92cbbfe7

Branch: refs/heads/HDFS-12996
Commit: 92cbbfe79ec009a19a71a7f44329a4b2f9fa9be6
Parents: 004b722
Author: Yufei Gu <yu...@apache.org>
Authored: Wed Feb 21 11:42:26 2018 -0800
Committer: Yufei Gu <yu...@apache.org>
Committed: Wed Feb 21 11:42:51 2018 -0800

----------------------------------------------------------------------
 .../resourcemanager/recovery/RMStateStore.java  | 34 +++++++++-
 .../recovery/RMStateStoreTestBase.java          |  3 +
 .../recovery/TestZKRMStateStore.java            | 66 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/92cbbfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index f0ab324..bbe208d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -65,6 +66,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -257,6 +259,9 @@ public abstract class RMStateStore extends AbstractService {
           appState.getApplicationSubmissionContext().getApplicationId();
       LOG.info("Updating info for app: " + appId);
       try {
+        if (isAppStateFinal(appState)) {
+          pruneAppState(appState);
+        }
         store.updateApplicationStateInternal(appId, appState);
         if (((RMStateUpdateAppEvent) event).isNotifyApplication()) {
           store.notifyApplication(new RMAppEvent(appId,
@@ -276,7 +281,34 @@ public abstract class RMStateStore extends AbstractService 
{
         }
       }
       return finalState(isFenced);
-    };
+    }
+
+    private boolean isAppStateFinal(ApplicationStateData appState) {
+      RMAppState state = appState.getState();
+      return state == RMAppState.FINISHED || state == RMAppState.FAILED ||
+          state == RMAppState.KILLED;
+    }
+
+    private void pruneAppState(ApplicationStateData appState) {
+      ApplicationSubmissionContext srcCtx =
+          appState.getApplicationSubmissionContext();
+      ApplicationSubmissionContextPBImpl context =
+          new ApplicationSubmissionContextPBImpl();
+      // most fields in the ApplicationSubmissionContext are not needed,
+      // but the following few need to be present for recovery to succeed
+      context.setApplicationId(srcCtx.getApplicationId());
+      context.setResource(srcCtx.getResource());
+      context.setQueue(srcCtx.getQueue());
+      context.setAMContainerResourceRequests(
+          srcCtx.getAMContainerResourceRequests());
+      context.setApplicationType(srcCtx.getApplicationType());
+      ContainerLaunchContextPBImpl amContainerSpec =
+              new ContainerLaunchContextPBImpl();
+      amContainerSpec.setApplicationACLs(
+              srcCtx.getAMContainerSpec().getApplicationACLs());
+      context.setAMContainerSpec(amContainerSpec);
+      appState.setApplicationSubmissionContext(context);
+    }
   }
 
   private static class RemoveAppTransition implements

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92cbbfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 453d805..dbb2148 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -162,6 +163,7 @@ public class RMStateStoreTestBase {
     ApplicationSubmissionContext context =
         new ApplicationSubmissionContextPBImpl();
     context.setApplicationId(appId);
+    context.setAMContainerSpec(new ContainerLaunchContextPBImpl());
 
     RMApp mockApp = mock(RMApp.class);
     when(mockApp.getApplicationId()).thenReturn(appId);
@@ -378,6 +380,7 @@ public class RMStateStoreTestBase {
     ApplicationSubmissionContext dummyContext =
         new ApplicationSubmissionContextPBImpl();
     dummyContext.setApplicationId(dummyAppId);
+    dummyContext.setAMContainerSpec(new ContainerLaunchContextPBImpl());
     ApplicationStateData dummyApp =
         ApplicationStateData.newInstance(appState.getSubmitTime(),
             appState.getStartTime(), appState.getUser(), dummyContext,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92cbbfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 6a8f47d..0a1b152 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -35,7 +35,9 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.*;
 import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
@@ -49,6 +51,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -83,6 +86,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -845,6 +849,7 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
       ApplicationSubmissionContext context =
           new ApplicationSubmissionContextPBImpl();
       context.setApplicationId(appId);
+      context.setAMContainerSpec(new ContainerLaunchContextPBImpl());
       appStateNew = createAppState(context, submitTime, startTime, finishTime,
           true);
     } else {
@@ -1488,4 +1493,65 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
         tokensWithIndex, sequenceNumber, 3);
     store.close();
   }
+
+  @Test
+  public void testAppSubmissionContextIsPrunedInFinalApplicationState()
+      throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    ApplicationId appId = ApplicationId.fromString("application_1234_0010");
+
+    Configuration conf = createConfForDelegationTokenNodeSplit(1);
+    RMStateStore store = zkTester.getRMStateStore(conf);
+    ApplicationSubmissionContext ctx =
+        new ApplicationSubmissionContextPBImpl();
+    ctx.setApplicationId(appId);
+    ctx.setQueue("a_queue");
+    ContainerLaunchContextPBImpl containerLaunchCtx =
+        new ContainerLaunchContextPBImpl();
+    containerLaunchCtx.setCommands(Collections.singletonList("a_command"));
+    ctx.setAMContainerSpec(containerLaunchCtx);
+    Resource resource = new ResourcePBImpl();
+    resource.setMemorySize(17L);
+    ctx.setResource(resource);
+    Map<String, String> schedulingPropertiesMap =
+        Collections.singletonMap("a_key", "a_value");
+    ctx.setApplicationSchedulingPropertiesMap(schedulingPropertiesMap);
+    ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl();
+    appState.setState(RMAppState.RUNNING);
+    appState.setApplicationSubmissionContext(ctx);
+    store.storeApplicationStateInternal(appId, appState);
+
+    RMState rmState = store.loadState();
+    assertEquals(1, rmState.getApplicationState().size());
+    ctx = rmState.getApplicationState().get(appId)
+        .getApplicationSubmissionContext();
+
+    appState.setState(RMAppState.RUNNING);
+    store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null));
+
+    rmState = store.loadState();
+    ctx = rmState.getApplicationState().get(appId)
+        .getApplicationSubmissionContext();
+
+    assertEquals("ApplicationSchedulingPropertiesMap should not have been "
+        + "pruned from the application submission context before the "
+        + "FINISHED state",
+        schedulingPropertiesMap, ctx.getApplicationSchedulingPropertiesMap());
+
+    appState.setState(RMAppState.FINISHED);
+    store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null));
+
+    rmState = store.loadState();
+    ctx = rmState.getApplicationState().get(appId)
+        .getApplicationSubmissionContext();
+
+    assertEquals(appId, ctx.getApplicationId());
+    assertEquals("a_queue", ctx.getQueue());
+    assertNotNull(ctx.getAMContainerSpec());
+    assertEquals(17L, ctx.getResource().getMemorySize());
+    assertEquals("ApplicationSchedulingPropertiesMap should have been pruned"
+        + " from the application submission context when in FINISHED STATE",
+        Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap());
+    store.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to