http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 6a3d270..77d75ca 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -99,11 +101,13 @@ import 
org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -191,12 +195,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerId container11 = createContainer(appAttemptId, 1,
+        ContainerType.APPLICATION_MASTER);
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11, new String[] { "stdout",
         "stderr", "syslog" });
@@ -302,11 +306,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     LogAggregationContext context =
         LogAggregationContext.newInstance("HOST*", "sys*");
     logAggregationService.handle(new LogHandlerAppStartedEvent(app, this.user,
-        null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, 
context));
+        null, this.acls, context));
 
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(app, 1);
-    ContainerId cont = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerId cont = createContainer(appAttemptId, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(appLogDir, cont, new String[] { "stdout",
         "stderr", "syslog" });
     logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 
0));
@@ -337,8 +342,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application1));
@@ -388,13 +392,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
-    
+    ContainerId container11 = createContainer(appAttemptId1, 1,
+        ContainerType.APPLICATION_MASTER);
+
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11, fileNames);
     logAggregationService.handle(
@@ -407,18 +411,23 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     File app2LogDir =
       new File(localLogDir, ConverterUtils.toString(application2));
     app2LogDir.mkdir();
+    LogAggregationContext contextWithAMOnly =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMOnly.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
+
     logAggregationService.handle(new LogHandlerAppStartedEvent(
-        application2, this.user, null,
-        ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
+        application2, this.user, null, this.acls, contextWithAMOnly));
+
+    ContainerId container21 = createContainer(appAttemptId2, 1,
+        ContainerType.APPLICATION_MASTER);
 
-    
-    ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1);
-    
     writeContainerLogs(app2LogDir, container21, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container21, 0));
 
-    ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
+    ContainerId container12 = createContainer(appAttemptId1, 2,
+        ContainerType.TASK);
 
     writeContainerLogs(app1LogDir, container12, fileNames);
     logAggregationService.handle(
@@ -431,9 +440,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     File app3LogDir =
       new File(localLogDir, ConverterUtils.toString(application3));
     app3LogDir.mkdir();
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+
     logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, 
this.acls));        
+        this.user, null, this.acls, contextWithAMAndFailed));
 
     dispatcher.await();
     ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
@@ -450,22 +463,26 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     checkEvents(appEventHandler, expectedInitEvents, false, "getType", 
"getApplicationID");
     reset(appEventHandler);
     
-    ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
+    ContainerId container31 = createContainer(appAttemptId3, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(app3LogDir, container31, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container31, 0));
 
-    ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
+    ContainerId container32 = createContainer(appAttemptId3, 2,
+        ContainerType.TASK);
     writeContainerLogs(app3LogDir, container32, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container32, 1)); // Failed 
 
-    ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
+    ContainerId container22 = createContainer(appAttemptId2, 2,
+        ContainerType.TASK);
     writeContainerLogs(app2LogDir, container22, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container22, 0));
 
-    ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
+    ContainerId container33 = createContainer(appAttemptId3, 3,
+        ContainerType.TASK);
     writeContainerLogs(app3LogDir, container33, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container33, 0));
@@ -528,10 +545,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
           (int) (Math.random() * 1000));
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-        this.acls));
+        this.user, null, this.acls, contextWithAMAndFailed));
     dispatcher.await();
     
     // Verify that it failed
@@ -551,11 +571,8 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     File appLogDir =
         new File(localLogDir, ConverterUtils.toString(appId2));
     appLogDir.mkdir();
-    
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId2,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-        this.acls));
+        this.user, null, this.acls, contextWithAMAndFailed));
     dispatcher.await();
     
     // Verify that it worked
@@ -627,8 +644,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         remoteRootLogDir.getAbsolutePath(), this.user));
     Path suffixDir = new Path(userDir, logSuffix);
     Path appDir = new Path(suffixDir, appId.toString());
+    LogAggregationContext contextWithAllContainers =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAllContainers.setLogAggregationPolicyClassName(
+        AllContainerLogAggregationPolicy.class.getName());
     aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        this.acls, contextWithAllContainers));
     verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
     verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
     verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
@@ -637,7 +658,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
     Path appDir2 = new Path(suffixDir, appId2.toString());
     aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        this.acls, contextWithAllContainers));
     verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
 
     // start another application with the app dir already created and verify
@@ -646,7 +667,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     Path appDir3 = new Path(suffixDir, appId3.toString());
     new File(appDir3.toUri().getPath()).mkdir();
     aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        this.acls, contextWithAllContainers));
     verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
     aggSvc.stop();
     aggSvc.close();
@@ -674,13 +695,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     doThrow(new YarnRuntimeException("KABOOM!"))
       .when(logAggregationService).initAppAggregator(
           eq(appId), eq(user), any(Credentials.class),
-          any(ContainerLogsRetentionPolicy.class), anyMap(),
-          any(LogAggregationContext.class));
-
+          anyMap(), any(LogAggregationContext.class));
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-        this.acls));
+        this.user, null, this.acls, contextWithAMAndFailed));
 
     dispatcher.await();
     ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
@@ -724,10 +745,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     doThrow(e)
       .when(logAggregationService).createAppDir(any(String.class),
           any(ApplicationId.class), any(UserGroupInformation.class));
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, 
this.acls));        
-    
+        this.user, null, this.acls, contextWithAMAndFailed));
+
     dispatcher.await();
     ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
         new ApplicationEvent(appId, 
@@ -765,10 +789,27 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     }
   }
 
-  private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService 
logAggregationService,
+  private LogFileStatusInLastCycle verifyContainerLogs(
+      LogAggregationService logAggregationService,
       ApplicationId appId, ContainerId[] expectedContainerIds,
-      String[] logFiles, int numOfContainerLogs, boolean multiLogs)
-      throws IOException {
+      String[] logFiles, int numOfLogsPerContainer,
+      boolean multiLogs) throws IOException {
+    return verifyContainerLogs(logAggregationService, appId,
+        expectedContainerIds, expectedContainerIds.length,
+        expectedContainerIds.length, logFiles, numOfLogsPerContainer,
+        multiLogs);
+  }
+
+  // expectedContainerIds is the minimal set of containers to check.
+  // The actual list of containers could be more than that.
+  // Verify the size of the actual list is in the range of
+  // [minNumOfContainers, maxNumOfContainers].
+  private LogFileStatusInLastCycle verifyContainerLogs(
+      LogAggregationService logAggregationService,
+      ApplicationId appId, ContainerId[] expectedContainerIds,
+      int minNumOfContainers, int maxNumOfContainers,
+      String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
+    throws IOException {
     Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, 
this.user);
     RemoteIterator<FileStatus> nodeFiles = null;
     try {
@@ -780,6 +821,10 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     } catch (FileNotFoundException fnf) {
       Assert.fail("Should have log files");
     }
+    if (numOfLogsPerContainer == 0) {
+      Assert.assertTrue(!nodeFiles.hasNext());
+      return null;
+    }
 
     Assert.assertTrue(nodeFiles.hasNext());
     FileStatus targetNodeFile = null;
@@ -865,11 +910,14 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
       }
 
       // 1 for each container
-      Assert.assertEquals(expectedContainerIds.length, logMap.size());
+      Assert.assertTrue("number of containers with logs should be at least " +
+          minNumOfContainers,logMap.size() >= minNumOfContainers);
+      Assert.assertTrue("number of containers with logs should be at most " +
+          minNumOfContainers,logMap.size() <= maxNumOfContainers);
       for (ContainerId cId : expectedContainerIds) {
         String containerStr = ConverterUtils.toString(cId);
         Map<String, String> thisContainerMap = logMap.remove(containerStr);
-        Assert.assertEquals(numOfContainerLogs, thisContainerMap.size());
+        Assert.assertEquals(numOfLogsPerContainer, thisContainerMap.size());
         for (String fileType : logFiles) {
           String expectedValue =
               containerStr + " Hello " + fileType + "!End of LogType:"
@@ -882,8 +930,15 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         }
         Assert.assertEquals(0, thisContainerMap.size());
       }
-      Assert.assertEquals(0, logMap.size());
-      return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), 
fileTypes);
+      Assert.assertTrue("number of remaining containers should be at least " +
+          (minNumOfContainers - expectedContainerIds.length),
+          logMap.size() >= minNumOfContainers - expectedContainerIds.length);
+      Assert.assertTrue("number of remaining containers should be at most " +
+          (maxNumOfContainers - expectedContainerIds.length),
+          logMap.size() <= maxNumOfContainers - expectedContainerIds.length);
+
+      return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(),
+          fileTypes);
     } finally {
       reader.close();
     }
@@ -991,9 +1046,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     logAggregationService.start();
 
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    LogAggregationContext contextWithAllContainers =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAllContainers.setLogAggregationPolicyClassName(
+        AllContainerLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        application1, this.user, null, this.acls, contextWithAllContainers));
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
@@ -1015,8 +1073,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
 
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
     logAggregationService.handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
     dispatcher.await();
@@ -1216,12 +1273,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         new File(localLogDir, ConverterUtils.toString(application1));
     appLogDir1.mkdir();
     logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
-      this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
+      this.user, null, this.acls,
       logAggregationContextWithIncludePatterns));
 
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container1 = BuilderUtils.newContainerId(appAttemptId1, 1);
+    ContainerId container1 = createContainer(appAttemptId1, 1,
+        ContainerType.APPLICATION_MASTER);
 
     // Simulate log-file creation
     writeContainerLogs(appLogDir1, container1, new String[] { "stdout",
@@ -1239,10 +1297,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     File app2LogDir =
         new File(localLogDir, ConverterUtils.toString(application2));
     app2LogDir.mkdir();
+    LogAggregationContextWithExcludePatterns.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application2,
-      this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
-      this.acls, LogAggregationContextWithExcludePatterns));
-    ContainerId container2 = BuilderUtils.newContainerId(appAttemptId2, 1);
+      this.user, null, this.acls, LogAggregationContextWithExcludePatterns));
+    ContainerId container2 = createContainer(appAttemptId2, 1,
+        ContainerType.APPLICATION_MASTER);
 
     writeContainerLogs(app2LogDir, container2, new String[] { "stdout",
         "stderr", "syslog" });
@@ -1262,10 +1322,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     File app3LogDir =
         new File(localLogDir, ConverterUtils.toString(application3));
     app3LogDir.mkdir();
+    context1.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
-      this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
-      this.acls, context1));
-    ContainerId container3 = BuilderUtils.newContainerId(appAttemptId3, 1);
+      this.user, null, this.acls, context1));
+    ContainerId container3 = createContainer(appAttemptId3, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(app3LogDir, container3, new String[] { "stdout",
         "sys.log", "std.log", "out.log", "err.log", "log" });
     logAggregationService.handle(
@@ -1285,10 +1347,12 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     File app4LogDir =
         new File(localLogDir, ConverterUtils.toString(application4));
     app4LogDir.mkdir();
+    context2.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application4,
-      this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
-      this.acls, context2));
-    ContainerId container4 = BuilderUtils.newContainerId(appAttemptId4, 1);
+      this.user, null, this.acls, context2));
+    ContainerId container4 = createContainer(appAttemptId4, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(app4LogDir, container4, new String[] { "stdout",
         "sys.log", "std.log", "out.log", "err.log", "log" });
     logAggregationService.handle(
@@ -1347,6 +1411,471 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
   }
 
   @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testNoneContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    // LogContext specifies policy to not aggregate any container logs
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, NoneContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(appId, logAggregationService,
+        ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1 }, logFiles, 0, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testFailedContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, FailedContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(
+        appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 1,
+            logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 0,
+        logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testAMOrFailedContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, AMOrFailedContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(
+        appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
+            logFiles);
+    ContainerId container2= finishContainer(appId,
+        logAggregationService, ContainerType.TASK, 2, 1, logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1, container2 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testFailedOrKilledContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, FailedOrKilledContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    finishContainer(appId, logAggregationService, 
ContainerType.APPLICATION_MASTER, 1, 0,
+        logFiles);
+    ContainerId container2 = finishContainer(appId,
+        logAggregationService, ContainerType.TASK, 2, 1, logFiles);
+    ContainerId container3 = finishContainer(appId, logAggregationService,
+        ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container2, container3 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testAMOnlyContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, AMOnlyLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(appId, logAggregationService,
+        ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 1,
+        logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 3, 0,
+        logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  // Test sample container policy with an app that has
+  // the same number of successful containers as
+  // SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD.
+  // and verify all those containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithSmallApp() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with an app that has
+  // more successful containers than
+  // SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD.
+  // and verify some of those containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithLargeApp() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with zero sample rate.
+  // and verify there is no sampling beyond the MIN_THRESHOLD containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithZeroSampleRate() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        0, SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with 100 percent sample rate.
+  // and verify all containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWith100PercentSampleRate()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        1.0f,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with zero min threshold.
+  // and verify some containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithZeroMinThreshold()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE, 0, false);
+  }
+
+  // Test sample container policy with customized settings.
+  // and verify some containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithCustomizedSettings()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(500, 0.5f, 50, false);
+  }
+
+  // Test cluster-wide sample container policy.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testClusterSampleContainerPolicy()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(500, 0.5f, 50, true);
+  }
+
+  // Test the default cluster-wide sample container policy.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testDefaultClusterSampleContainerPolicy() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        true);
+  }
+
+  // The application specifies invalid policy class
+  // NM should fallback to the default policy which is to aggregate all
+  // containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testInvalidPolicyClassName() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, "foo", null, true);
+    verifyDefaultPolicy(appId, logAggregationService);
+  }
+
+  // The application specifies LogAggregationContext, but not policy class.
+  // NM should fallback to the default policy which is to aggregate all
+  // containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testNullPolicyClassName() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, null, null, true);
+    verifyDefaultPolicy(appId, logAggregationService);
+  }
+
+  // The application doesn't specifies LogAggregationContext.
+  // NM should fallback to the default policy which is to aggregate all
+  // containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testDefaultPolicyWithoutLogAggregationContext()
+      throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, null, null, false);
+    verifyDefaultPolicy(appId, logAggregationService);
+  }
+
+  private void verifyDefaultPolicy(ApplicationId appId,
+      LogAggregationService logAggregationService) throws Exception {
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(
+        appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
+            logFiles);
+    ContainerId container2 = finishContainer(appId,
+        logAggregationService, ContainerType.TASK, 2, 1, logFiles);
+    ContainerId container3 = finishContainer(appId, logAggregationService,
+        ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1, container2, container3 },
+            logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  // If enableAtClusterLevel is false, the policy is set up via
+  // LogAggregationContext at the application level. If it is true,
+  // the policy is set up via Configuration at the cluster level.
+  private void setupAndTestSampleContainerPolicy(int successfulContainers,
+      float sampleRate, int minThreshold, boolean enableAtClusterLevel)
+      throws Exception {
+    ApplicationId appId = createApplication();
+    String policyParameters = null;
+    if (sampleRate != SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE
+        || minThreshold !=
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD) {
+      policyParameters = SampleContainerLogAggregationPolicy.buildParameters(
+          sampleRate, minThreshold);
+    }
+
+    if (enableAtClusterLevel) {
+      this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS,
+          SampleContainerLogAggregationPolicy.class.getName());
+      if (policyParameters != null) {
+        this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS,
+            policyParameters);
+      }
+    }
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, SampleContainerLogAggregationPolicy.class.getName(),
+            policyParameters, !enableAtClusterLevel);
+
+    ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>();
+    String[] logFiles = new String[] { "stdout" };
+    int cid = 1;
+    // AM container
+    containerIds.add(finishContainer(appId, logAggregationService,
+        ContainerType.APPLICATION_MASTER, cid++, 0, logFiles));
+    // Successful containers
+    // We expect the minThreshold containers will be log aggregated.
+    if (minThreshold > 0) {
+      containerIds.addAll(finishContainers(appId, logAggregationService, cid,
+          (successfulContainers > minThreshold) ? minThreshold :
+              successfulContainers, 0, logFiles));
+    }
+    cid = containerIds.size() + 1;
+    if (successfulContainers > minThreshold) {
+      List<ContainerId> restOfSuccessfulContainers = finishContainers(appId,
+          logAggregationService, cid, successfulContainers - minThreshold, 0,
+          logFiles);
+      cid += successfulContainers - minThreshold;
+      // If the sample rate is 100 percent, restOfSuccessfulContainers will be
+      // all be log aggregated.
+      if (sampleRate == 1.0) {
+        containerIds.addAll(restOfSuccessfulContainers);
+      }
+    }
+    // Failed container
+    containerIds.add(finishContainer(appId, logAggregationService,
+        ContainerType.TASK, cid++, 1, logFiles));
+    // Killed container
+    containerIds.add(finishContainer(appId, logAggregationService,
+        ContainerType.TASK, cid++,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles));
+
+    finishApplication(appId, logAggregationService);
+
+    // The number of containers with logs should be 3(AM + failed + killed) +
+    // DEFAULT_SAMPLE_MIN_THRESHOLD +
+    // ( successfulContainers - DEFAULT_SAMPLE_MIN_THRESHOLD ) * SAMPLE_RATE
+    // Due to the sampling nature, the exact number could vary.
+    // So we only check for a range.
+    // For the cases where successfulContainers is the same as minThreshold
+    // or sampleRate is zero, minOfContainersWithLogs and
+    // maxOfContainersWithLogs will the same.
+    int minOfContainersWithLogs = 3 + minThreshold +
+        (int)((successfulContainers - minThreshold) * sampleRate / 2);
+    int maxOfContainersWithLogs = 3 + minThreshold +
+        (int)((successfulContainers - minThreshold) * sampleRate * 2);
+    verifyContainerLogs(logAggregationService, appId,
+        containerIds.toArray(new ContainerId[containerIds.size()]),
+        minOfContainersWithLogs, maxOfContainersWithLogs,
+        logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  private ApplicationId createApplication() {
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, 
localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+
+    ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
+    Application mockApp = mock(Application.class);
+    when(mockApp.getContainers()).thenReturn(
+        new HashMap<ContainerId, Container>());
+
+    this.context.getApplications().put(appId, mockApp);
+    return appId;
+  }
+
+  private LogAggregationService createLogAggregationService(
+      ApplicationId appId,
+      Class<? extends ContainerLogAggregationPolicy> policy,
+      String parameters) {
+    return createLogAggregationService(appId, policy.getName(), parameters,
+        true);
+  }
+
+  private LogAggregationService createLogAggregationService(
+      ApplicationId appId, String className, String parameters,
+      boolean createLogAggContext) {
+    ConcurrentHashMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<ContainerId, Container>();
+    LogAggregationService logAggregationService =
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+            super.dirsHandler);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+    LogAggregationContext logAggContext = null;
+
+    if (createLogAggContext) {
+      logAggContext = Records.newRecord(LogAggregationContext.class);
+      logAggContext.setLogAggregationPolicyClassName(className);
+      if (parameters != null) {
+        logAggContext.setLogAggregationPolicyParameters(parameters);
+      }
+    }
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null, this.acls, logAggContext));
+
+    return logAggregationService;
+  }
+
+  private ContainerId createContainer(ApplicationAttemptId appAttemptId1,
+      long cId, ContainerType containerType) {
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId1,
+        cId);
+    Resource r = BuilderUtils.newResource(1024, 1);
+    ContainerTokenIdentifier containerToken = new ContainerTokenIdentifier(
+        containerId, context.getNodeId().toString(), user, r,
+        System.currentTimeMillis() + 100000L, 123, DUMMY_RM_IDENTIFIER,
+        Priority.newInstance(0), 0, null, null, containerType);
+    Container container = mock(Container.class);
+    context.getContainers().put(containerId, container);
+    when(container.getContainerTokenIdentifier()).thenReturn(containerToken);
+    when(container.getContainerId()).thenReturn(containerId);
+    return containerId;
+  }
+
+  private ContainerId finishContainer(ApplicationId application1,
+      LogAggregationService logAggregationService, ContainerType containerType,
+      long cId, int exitCode, String[] logFiles) throws IOException {
+    ApplicationAttemptId appAttemptId1 =
+        BuilderUtils.newApplicationAttemptId(application1, 1);
+    ContainerId containerId = createContainer(appAttemptId1, cId,
+        containerType);
+    // Simulate log-file creation
+    File appLogDir1 =
+        new File(localLogDir, ConverterUtils.toString(application1));
+    appLogDir1.mkdir();
+    writeContainerLogs(appLogDir1, containerId, logFiles);
+
+    logAggregationService.handle(new LogHandlerContainerFinishedEvent(
+        containerId, exitCode));
+    return containerId;
+
+  }
+
+  private List<ContainerId> finishContainers(ApplicationId appId,
+      LogAggregationService logAggregationService, long startingCid, int count,
+      int exitCode, String[] logFiles) throws IOException {
+    ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>();
+    for (long cid = startingCid; cid < startingCid + count; cid++) {
+      containerIds.add(finishContainer(
+          appId, logAggregationService, ContainerType.TASK, cid, exitCode,
+              logFiles));
+    }
+    return containerIds;
+  }
+
+  private void finishApplication(ApplicationId appId,
+      LogAggregationService logAggregationService) throws Exception {
+    dispatcher.await();
+    ApplicationEvent expectedInitEvents[] =
+        new ApplicationEvent[] { new ApplicationEvent(appId,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) };
+    checkEvents(appEventHandler, expectedInitEvents, false, "getType",
+        "getApplicationID");
+    reset(appEventHandler);
+
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(appId));
+    logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
+  }
+
+  private void verifyLogAggFinishEvent(ApplicationId appId) throws Exception {
+    dispatcher.await();
+
+    ApplicationEvent[] expectedFinishedEvents =
+        new ApplicationEvent[] { new ApplicationEvent(appId,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
+    checkEvents(appEventHandler, expectedFinishedEvents, false, "getType",
+            "getApplicationID");
+  }
+
+  @Test (timeout = 50000)
   public void testLogAggregationServiceWithInterval() throws Exception {
     testLogAggregationService(false);
   }
@@ -1391,17 +1920,14 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     ApplicationId application = BuilderUtils.newApplicationId(123456, 1);
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(application, 1);
-    ContainerId container = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerId container = createContainer(appAttemptId, 1,
+        ContainerType.APPLICATION_MASTER);
 
-    Context context = spy(this.context);
     ConcurrentMap<ApplicationId, Application> maps =
-        new ConcurrentHashMap<ApplicationId, Application>();
+        this.context.getApplications();
     Application app = mock(Application.class);
-    Map<ContainerId, Container> containers = new HashMap<ContainerId, 
Container>();
-    containers.put(container, mock(Container.class));
     maps.put(application, app);
-    when(app.getContainers()).thenReturn(containers);
-    when(context.getApplications()).thenReturn(maps);
+    when(app.getContainers()).thenReturn(this.context.getContainers());
 
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, context, this.delSrvc,
@@ -1415,8 +1941,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         new File(localLogDir, ConverterUtils.toString(application));
     appLogDir.mkdir();
     logAggregationService.handle(new LogHandlerAppStartedEvent(application,
-      this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
-      logAggregationContextWithInterval));
+      this.user, null, this.acls, logAggregationContextWithInterval));
 
     LogFileStatusInLastCycle logFileStatusInLastCycle = null;
     // Simulate log-file creation
@@ -1536,7 +2061,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     logAggregationService.init(this.conf);
     logAggregationService.start();
     logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
-      this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
+      this.user, null, this.acls,
       Records.newRecord(LogAggregationContext.class)));
 
     // Inject new token for log-aggregation after app log-aggregator init

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index 0bab5ea..46d06da 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -148,8 +147,7 @@ public class TestNonAggregatingLogHandler {
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
@@ -189,8 +187,7 @@ public class TestNonAggregatingLogHandler {
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
@@ -357,8 +354,7 @@ public class TestNonAggregatingLogHandler {
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
     logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
@@ -445,7 +441,7 @@ public class TestNonAggregatingLogHandler {
     doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup();
 
     logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-      ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls));
+        appAcls));
 
     // test case where some dirs have the log dir to delete
     // mock some dirs throwing various exceptions

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 4cb8e1a..769041b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -231,16 +231,22 @@ public class TestContainerAllocation {
         LogAggregationContext.newInstance(
           "includePattern", "excludePattern",
           "rolledLogsIncludePattern",
-          "rolledLogsExcludePattern");
+          "rolledLogsExcludePattern",
+          "policyClass",
+          "policyParameters");
     LogAggregationContext returned =
         getLogAggregationContextFromContainerToken(rm1, nm2,
           logAggregationContext);
     Assert.assertEquals("includePattern", returned.getIncludePattern());
     Assert.assertEquals("excludePattern", returned.getExcludePattern());
     Assert.assertEquals("rolledLogsIncludePattern",
-      returned.getRolledLogsIncludePattern());
+        returned.getRolledLogsIncludePattern());
     Assert.assertEquals("rolledLogsExcludePattern",
-      returned.getRolledLogsExcludePattern());
+        returned.getRolledLogsExcludePattern());
+    Assert.assertEquals("policyClass",
+        returned.getLogAggregationPolicyClassName());
+    Assert.assertEquals("policyParameters",
+        returned.getLogAggregationPolicyParameters());
     rm1.stop();
   }
 

Reply via email to