http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 6a3d270..77d75ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -99,11 +101,13 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; -import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; @@ -191,12 +195,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { app1LogDir.mkdir(); logAggregationService .handle(new LogHandlerAppStartedEvent( - application1, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + application1, this.user, null, this.acls)); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerId container11 = createContainer(appAttemptId, 1, + ContainerType.APPLICATION_MASTER); // Simulate log-file creation writeContainerLogs(app1LogDir, container11, new String[] { "stdout", "stderr", "syslog" }); @@ -302,11 +306,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { LogAggregationContext context = LogAggregationContext.newInstance("HOST*", "sys*"); logAggregationService.handle(new LogHandlerAppStartedEvent(app, this.user, - null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, context)); + null, this.acls, context)); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(app, 1); - ContainerId cont = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerId cont = createContainer(appAttemptId, 1, + ContainerType.APPLICATION_MASTER); writeContainerLogs(appLogDir, cont, new String[] { "stdout", "stderr", "syslog" }); logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0)); @@ -337,8 +342,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { app1LogDir.mkdir(); logAggregationService .handle(new LogHandlerAppStartedEvent( - application1, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + application1, this.user, null, this.acls)); logAggregationService.handle(new LogHandlerAppFinishedEvent( application1)); @@ -388,13 +392,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { app1LogDir.mkdir(); logAggregationService .handle(new LogHandlerAppStartedEvent( - application1, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + application1, this.user, null, this.acls)); ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); - + ContainerId container11 = createContainer(appAttemptId1, 1, + ContainerType.APPLICATION_MASTER); + // Simulate log-file creation writeContainerLogs(app1LogDir, container11, fileNames); logAggregationService.handle( @@ -407,18 +411,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File app2LogDir = new File(localLogDir, ConverterUtils.toString(application2)); app2LogDir.mkdir(); + LogAggregationContext contextWithAMOnly = + Records.newRecord(LogAggregationContext.class); + contextWithAMOnly.setLogAggregationPolicyClassName( + AMOnlyLogAggregationPolicy.class.getName()); + logAggregationService.handle(new LogHandlerAppStartedEvent( - application2, this.user, null, - ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls)); + application2, this.user, null, this.acls, contextWithAMOnly)); + + ContainerId container21 = createContainer(appAttemptId2, 1, + ContainerType.APPLICATION_MASTER); - - ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1); - writeContainerLogs(app2LogDir, container21, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container21, 0)); - ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2); + ContainerId container12 = createContainer(appAttemptId1, 2, + ContainerType.TASK); writeContainerLogs(app1LogDir, container12, fileNames); logAggregationService.handle( @@ -431,9 +440,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File app3LogDir = new File(localLogDir, ConverterUtils.toString(application3)); app3LogDir.mkdir(); + LogAggregationContext contextWithAMAndFailed = + Records.newRecord(LogAggregationContext.class); + contextWithAMAndFailed.setLogAggregationPolicyClassName( + AMOrFailedContainerLogAggregationPolicy.class.getName()); + logAggregationService.handle(new LogHandlerAppStartedEvent(application3, - this.user, null, - ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); + this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{ @@ -450,22 +463,26 @@ public class TestLogAggregationService extends BaseContainerManagerTest { checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID"); reset(appEventHandler); - ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1); + ContainerId container31 = createContainer(appAttemptId3, 1, + ContainerType.APPLICATION_MASTER); writeContainerLogs(app3LogDir, container31, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container31, 0)); - ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2); + ContainerId container32 = createContainer(appAttemptId3, 2, + ContainerType.TASK); writeContainerLogs(app3LogDir, container32, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container32, 1)); // Failed - ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2); + ContainerId container22 = createContainer(appAttemptId2, 2, + ContainerType.TASK); writeContainerLogs(app2LogDir, container22, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container22, 0)); - ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3); + ContainerId container33 = createContainer(appAttemptId3, 3, + ContainerType.TASK); writeContainerLogs(app3LogDir, container33, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container33, 0)); @@ -528,10 +545,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ApplicationId appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000)); + LogAggregationContext contextWithAMAndFailed = + Records.newRecord(LogAggregationContext.class); + contextWithAMAndFailed.setLogAggregationPolicyClassName( + AMOrFailedContainerLogAggregationPolicy.class.getName()); + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, - this.user, null, - ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, - this.acls)); + this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); // Verify that it failed @@ -551,11 +571,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File appLogDir = new File(localLogDir, ConverterUtils.toString(appId2)); appLogDir.mkdir(); - logAggregationService.handle(new LogHandlerAppStartedEvent(appId2, - this.user, null, - ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, - this.acls)); + this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); // Verify that it worked @@ -627,8 +644,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { remoteRootLogDir.getAbsolutePath(), this.user)); Path suffixDir = new Path(userDir, logSuffix); Path appDir = new Path(suffixDir, appId.toString()); + LogAggregationContext contextWithAllContainers = + Records.newRecord(LogAggregationContext.class); + contextWithAllContainers.setLogAggregationPolicyClassName( + AllContainerLogAggregationPolicy.class.getName()); aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); @@ -637,7 +658,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); Path appDir2 = new Path(suffixDir, appId2.toString()); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); // start another application with the app dir already created and verify @@ -646,7 +667,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Path appDir3 = new Path(suffixDir, appId3.toString()); new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + this.acls, contextWithAllContainers)); verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); aggSvc.stop(); aggSvc.close(); @@ -674,13 +695,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { doThrow(new YarnRuntimeException("KABOOM!")) .when(logAggregationService).initAppAggregator( eq(appId), eq(user), any(Credentials.class), - any(ContainerLogsRetentionPolicy.class), anyMap(), - any(LogAggregationContext.class)); - + anyMap(), any(LogAggregationContext.class)); + LogAggregationContext contextWithAMAndFailed = + Records.newRecord(LogAggregationContext.class); + contextWithAMAndFailed.setLogAggregationPolicyClassName( + AMOrFailedContainerLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, - this.user, null, - ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, - this.acls)); + this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ @@ -724,10 +745,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { doThrow(e) .when(logAggregationService).createAppDir(any(String.class), any(ApplicationId.class), any(UserGroupInformation.class)); + LogAggregationContext contextWithAMAndFailed = + Records.newRecord(LogAggregationContext.class); + contextWithAMAndFailed.setLogAggregationPolicyClassName( + AMOrFailedContainerLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, - this.user, null, - ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); - + this.user, null, this.acls, contextWithAMAndFailed)); + dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ new ApplicationEvent(appId, @@ -765,10 +789,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } } - private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService, + private LogFileStatusInLastCycle verifyContainerLogs( + LogAggregationService logAggregationService, ApplicationId appId, ContainerId[] expectedContainerIds, - String[] logFiles, int numOfContainerLogs, boolean multiLogs) - throws IOException { + String[] logFiles, int numOfLogsPerContainer, + boolean multiLogs) throws IOException { + return verifyContainerLogs(logAggregationService, appId, + expectedContainerIds, expectedContainerIds.length, + expectedContainerIds.length, logFiles, numOfLogsPerContainer, + multiLogs); + } + + // expectedContainerIds is the minimal set of containers to check. + // The actual list of containers could be more than that. + // Verify the size of the actual list is in the range of + // [minNumOfContainers, maxNumOfContainers]. + private LogFileStatusInLastCycle verifyContainerLogs( + LogAggregationService logAggregationService, + ApplicationId appId, ContainerId[] expectedContainerIds, + int minNumOfContainers, int maxNumOfContainers, + String[] logFiles, int numOfLogsPerContainer, boolean multiLogs) + throws IOException { Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); RemoteIterator<FileStatus> nodeFiles = null; try { @@ -780,6 +821,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } catch (FileNotFoundException fnf) { Assert.fail("Should have log files"); } + if (numOfLogsPerContainer == 0) { + Assert.assertTrue(!nodeFiles.hasNext()); + return null; + } Assert.assertTrue(nodeFiles.hasNext()); FileStatus targetNodeFile = null; @@ -865,11 +910,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } // 1 for each container - Assert.assertEquals(expectedContainerIds.length, logMap.size()); + Assert.assertTrue("number of containers with logs should be at least " + + minNumOfContainers,logMap.size() >= minNumOfContainers); + Assert.assertTrue("number of containers with logs should be at most " + + minNumOfContainers,logMap.size() <= maxNumOfContainers); for (ContainerId cId : expectedContainerIds) { String containerStr = ConverterUtils.toString(cId); Map<String, String> thisContainerMap = logMap.remove(containerStr); - Assert.assertEquals(numOfContainerLogs, thisContainerMap.size()); + Assert.assertEquals(numOfLogsPerContainer, thisContainerMap.size()); for (String fileType : logFiles) { String expectedValue = containerStr + " Hello " + fileType + "!End of LogType:" @@ -882,8 +930,15 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } Assert.assertEquals(0, thisContainerMap.size()); } - Assert.assertEquals(0, logMap.size()); - return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes); + Assert.assertTrue("number of remaining containers should be at least " + + (minNumOfContainers - expectedContainerIds.length), + logMap.size() >= minNumOfContainers - expectedContainerIds.length); + Assert.assertTrue("number of remaining containers should be at most " + + (maxNumOfContainers - expectedContainerIds.length), + logMap.size() <= maxNumOfContainers - expectedContainerIds.length); + + return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), + fileTypes); } finally { reader.close(); } @@ -991,9 +1046,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.start(); ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + LogAggregationContext contextWithAllContainers = + Records.newRecord(LogAggregationContext.class); + contextWithAllContainers.setLogAggregationPolicyClassName( + AllContainerLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent( - application1, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + application1, this.user, null, this.acls, contextWithAllContainers)); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); @@ -1015,8 +1073,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); logAggregationService.handle(new LogHandlerAppStartedEvent( - application1, this.user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + application1, this.user, null, this.acls)); logAggregationService.handle(new LogHandlerAppFinishedEvent(application1)); dispatcher.await(); @@ -1216,12 +1273,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new File(localLogDir, ConverterUtils.toString(application1)); appLogDir1.mkdir(); logAggregationService.handle(new LogHandlerAppStartedEvent(application1, - this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, + this.user, null, this.acls, logAggregationContextWithIncludePatterns)); ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container1 = BuilderUtils.newContainerId(appAttemptId1, 1); + ContainerId container1 = createContainer(appAttemptId1, 1, + ContainerType.APPLICATION_MASTER); // Simulate log-file creation writeContainerLogs(appLogDir1, container1, new String[] { "stdout", @@ -1239,10 +1297,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File app2LogDir = new File(localLogDir, ConverterUtils.toString(application2)); app2LogDir.mkdir(); + LogAggregationContextWithExcludePatterns.setLogAggregationPolicyClassName( + AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application2, - this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, - this.acls, LogAggregationContextWithExcludePatterns)); - ContainerId container2 = BuilderUtils.newContainerId(appAttemptId2, 1); + this.user, null, this.acls, LogAggregationContextWithExcludePatterns)); + ContainerId container2 = createContainer(appAttemptId2, 1, + ContainerType.APPLICATION_MASTER); writeContainerLogs(app2LogDir, container2, new String[] { "stdout", "stderr", "syslog" }); @@ -1262,10 +1322,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File app3LogDir = new File(localLogDir, ConverterUtils.toString(application3)); app3LogDir.mkdir(); + context1.setLogAggregationPolicyClassName( + AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application3, - this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, - this.acls, context1)); - ContainerId container3 = BuilderUtils.newContainerId(appAttemptId3, 1); + this.user, null, this.acls, context1)); + ContainerId container3 = createContainer(appAttemptId3, 1, + ContainerType.APPLICATION_MASTER); writeContainerLogs(app3LogDir, container3, new String[] { "stdout", "sys.log", "std.log", "out.log", "err.log", "log" }); logAggregationService.handle( @@ -1285,10 +1347,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File app4LogDir = new File(localLogDir, ConverterUtils.toString(application4)); app4LogDir.mkdir(); + context2.setLogAggregationPolicyClassName( + AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application4, - this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, - this.acls, context2)); - ContainerId container4 = BuilderUtils.newContainerId(appAttemptId4, 1); + this.user, null, this.acls, context2)); + ContainerId container4 = createContainer(appAttemptId4, 1, + ContainerType.APPLICATION_MASTER); writeContainerLogs(app4LogDir, container4, new String[] { "stdout", "sys.log", "std.log", "out.log", "err.log", "log" }); logAggregationService.handle( @@ -1347,6 +1411,471 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testNoneContainerPolicy() throws Exception { + ApplicationId appId = createApplication(); + // LogContext specifies policy to not aggregate any container logs + LogAggregationService logAggregationService = createLogAggregationService( + appId, NoneContainerLogAggregationPolicy.class, null); + + String[] logFiles = new String[] { "stdout" }; + ContainerId container1 = finishContainer(appId, logAggregationService, + ContainerType.APPLICATION_MASTER, 1, 0, logFiles); + + finishApplication(appId, logAggregationService); + + verifyContainerLogs(logAggregationService, appId, + new ContainerId[] { container1 }, logFiles, 0, false); + + verifyLogAggFinishEvent(appId); + } + + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testFailedContainerPolicy() throws Exception { + ApplicationId appId = createApplication(); + LogAggregationService logAggregationService = createLogAggregationService( + appId, FailedContainerLogAggregationPolicy.class, null); + + String[] logFiles = new String[] { "stdout" }; + ContainerId container1 = finishContainer( + appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 1, + logFiles); + finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 0, + logFiles); + finishContainer(appId, logAggregationService, ContainerType.TASK, 3, + ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); + + finishApplication(appId, logAggregationService); + + verifyContainerLogs(logAggregationService, appId, + new ContainerId[] { container1 }, logFiles, 1, false); + + verifyLogAggFinishEvent(appId); + } + + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testAMOrFailedContainerPolicy() throws Exception { + ApplicationId appId = createApplication(); + LogAggregationService logAggregationService = createLogAggregationService( + appId, AMOrFailedContainerLogAggregationPolicy.class, null); + + String[] logFiles = new String[] { "stdout" }; + ContainerId container1 = finishContainer( + appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0, + logFiles); + ContainerId container2= finishContainer(appId, + logAggregationService, ContainerType.TASK, 2, 1, logFiles); + finishContainer(appId, logAggregationService, ContainerType.TASK, 3, + ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); + + finishApplication(appId, logAggregationService); + + verifyContainerLogs(logAggregationService, appId, + new ContainerId[] { container1, container2 }, logFiles, 1, false); + + verifyLogAggFinishEvent(appId); + } + + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testFailedOrKilledContainerPolicy() throws Exception { + ApplicationId appId = createApplication(); + LogAggregationService logAggregationService = createLogAggregationService( + appId, FailedOrKilledContainerLogAggregationPolicy.class, null); + + String[] logFiles = new String[] { "stdout" }; + finishContainer(appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0, + logFiles); + ContainerId container2 = finishContainer(appId, + logAggregationService, ContainerType.TASK, 2, 1, logFiles); + ContainerId container3 = finishContainer(appId, logAggregationService, + ContainerType.TASK, 3, + ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); + + finishApplication(appId, logAggregationService); + + verifyContainerLogs(logAggregationService, appId, + new ContainerId[] { container2, container3 }, logFiles, 1, false); + + verifyLogAggFinishEvent(appId); + } + + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testAMOnlyContainerPolicy() throws Exception { + ApplicationId appId = createApplication(); + LogAggregationService logAggregationService = createLogAggregationService( + appId, AMOnlyLogAggregationPolicy.class, null); + + String[] logFiles = new String[] { "stdout" }; + ContainerId container1 = finishContainer(appId, logAggregationService, + ContainerType.APPLICATION_MASTER, 1, 0, logFiles); + finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 1, + logFiles); + finishContainer(appId, logAggregationService, ContainerType.TASK, 3, 0, + logFiles); + + finishApplication(appId, logAggregationService); + + verifyContainerLogs(logAggregationService, appId, + new ContainerId[] { container1 }, logFiles, 1, false); + + verifyLogAggFinishEvent(appId); + } + + // Test sample container policy with an app that has + // the same number of successful containers as + // SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD. + // and verify all those containers' logs are aggregated. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testSampleContainerPolicyWithSmallApp() throws Exception { + setupAndTestSampleContainerPolicy( + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD, + false); + } + + // Test sample container policy with an app that has + // more successful containers than + // SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD. + // and verify some of those containers' logs are aggregated. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testSampleContainerPolicyWithLargeApp() throws Exception { + setupAndTestSampleContainerPolicy( + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD, + false); + } + + // Test sample container policy with zero sample rate. + // and verify there is no sampling beyond the MIN_THRESHOLD containers. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testSampleContainerPolicyWithZeroSampleRate() throws Exception { + setupAndTestSampleContainerPolicy( + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10, + 0, SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD, + false); + } + + // Test sample container policy with 100 percent sample rate. + // and verify all containers' logs are aggregated. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testSampleContainerPolicyWith100PercentSampleRate() + throws Exception { + setupAndTestSampleContainerPolicy( + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10, + 1.0f, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD, + false); + } + + // Test sample container policy with zero min threshold. + // and verify some containers' logs are aggregated. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testSampleContainerPolicyWithZeroMinThreshold() + throws Exception { + setupAndTestSampleContainerPolicy( + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE, 0, false); + } + + // Test sample container policy with customized settings. + // and verify some containers' logs are aggregated. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testSampleContainerPolicyWithCustomizedSettings() + throws Exception { + setupAndTestSampleContainerPolicy(500, 0.5f, 50, false); + } + + // Test cluster-wide sample container policy. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testClusterSampleContainerPolicy() + throws Exception { + setupAndTestSampleContainerPolicy(500, 0.5f, 50, true); + } + + // Test the default cluster-wide sample container policy. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testDefaultClusterSampleContainerPolicy() throws Exception { + setupAndTestSampleContainerPolicy( + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE, + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD, + true); + } + + // The application specifies invalid policy class + // NM should fallback to the default policy which is to aggregate all + // containers. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testInvalidPolicyClassName() throws Exception { + ApplicationId appId = createApplication(); + LogAggregationService logAggregationService = createLogAggregationService( + appId, "foo", null, true); + verifyDefaultPolicy(appId, logAggregationService); + } + + // The application specifies LogAggregationContext, but not policy class. + // NM should fallback to the default policy which is to aggregate all + // containers. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testNullPolicyClassName() throws Exception { + ApplicationId appId = createApplication(); + LogAggregationService logAggregationService = createLogAggregationService( + appId, null, null, true); + verifyDefaultPolicy(appId, logAggregationService); + } + + // The application doesn't specifies LogAggregationContext. + // NM should fallback to the default policy which is to aggregate all + // containers. + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testDefaultPolicyWithoutLogAggregationContext() + throws Exception { + ApplicationId appId = createApplication(); + LogAggregationService logAggregationService = createLogAggregationService( + appId, null, null, false); + verifyDefaultPolicy(appId, logAggregationService); + } + + private void verifyDefaultPolicy(ApplicationId appId, + LogAggregationService logAggregationService) throws Exception { + String[] logFiles = new String[] { "stdout" }; + ContainerId container1 = finishContainer( + appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0, + logFiles); + ContainerId container2 = finishContainer(appId, + logAggregationService, ContainerType.TASK, 2, 1, logFiles); + ContainerId container3 = finishContainer(appId, logAggregationService, + ContainerType.TASK, 3, + ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); + + finishApplication(appId, logAggregationService); + + verifyContainerLogs(logAggregationService, appId, + new ContainerId[] { container1, container2, container3 }, + logFiles, 1, false); + + verifyLogAggFinishEvent(appId); + } + + // If enableAtClusterLevel is false, the policy is set up via + // LogAggregationContext at the application level. If it is true, + // the policy is set up via Configuration at the cluster level. + private void setupAndTestSampleContainerPolicy(int successfulContainers, + float sampleRate, int minThreshold, boolean enableAtClusterLevel) + throws Exception { + ApplicationId appId = createApplication(); + String policyParameters = null; + if (sampleRate != SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE + || minThreshold != + SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD) { + policyParameters = SampleContainerLogAggregationPolicy.buildParameters( + sampleRate, minThreshold); + } + + if (enableAtClusterLevel) { + this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS, + SampleContainerLogAggregationPolicy.class.getName()); + if (policyParameters != null) { + this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS, + policyParameters); + } + } + LogAggregationService logAggregationService = createLogAggregationService( + appId, SampleContainerLogAggregationPolicy.class.getName(), + policyParameters, !enableAtClusterLevel); + + ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>(); + String[] logFiles = new String[] { "stdout" }; + int cid = 1; + // AM container + containerIds.add(finishContainer(appId, logAggregationService, + ContainerType.APPLICATION_MASTER, cid++, 0, logFiles)); + // Successful containers + // We expect the minThreshold containers will be log aggregated. + if (minThreshold > 0) { + containerIds.addAll(finishContainers(appId, logAggregationService, cid, + (successfulContainers > minThreshold) ? minThreshold : + successfulContainers, 0, logFiles)); + } + cid = containerIds.size() + 1; + if (successfulContainers > minThreshold) { + List<ContainerId> restOfSuccessfulContainers = finishContainers(appId, + logAggregationService, cid, successfulContainers - minThreshold, 0, + logFiles); + cid += successfulContainers - minThreshold; + // If the sample rate is 100 percent, restOfSuccessfulContainers will be + // all be log aggregated. + if (sampleRate == 1.0) { + containerIds.addAll(restOfSuccessfulContainers); + } + } + // Failed container + containerIds.add(finishContainer(appId, logAggregationService, + ContainerType.TASK, cid++, 1, logFiles)); + // Killed container + containerIds.add(finishContainer(appId, logAggregationService, + ContainerType.TASK, cid++, + ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles)); + + finishApplication(appId, logAggregationService); + + // The number of containers with logs should be 3(AM + failed + killed) + + // DEFAULT_SAMPLE_MIN_THRESHOLD + + // ( successfulContainers - DEFAULT_SAMPLE_MIN_THRESHOLD ) * SAMPLE_RATE + // Due to the sampling nature, the exact number could vary. + // So we only check for a range. + // For the cases where successfulContainers is the same as minThreshold + // or sampleRate is zero, minOfContainersWithLogs and + // maxOfContainersWithLogs will the same. + int minOfContainersWithLogs = 3 + minThreshold + + (int)((successfulContainers - minThreshold) * sampleRate / 2); + int maxOfContainersWithLogs = 3 + minThreshold + + (int)((successfulContainers - minThreshold) * sampleRate * 2); + verifyContainerLogs(logAggregationService, appId, + containerIds.toArray(new ContainerId[containerIds.size()]), + minOfContainersWithLogs, maxOfContainersWithLogs, + logFiles, 1, false); + + verifyLogAggFinishEvent(appId); + } + + private ApplicationId createApplication() { + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + + ApplicationId appId = BuilderUtils.newApplicationId(1234, 1); + Application mockApp = mock(Application.class); + when(mockApp.getContainers()).thenReturn( + new HashMap<ContainerId, Container>()); + + this.context.getApplications().put(appId, mockApp); + return appId; + } + + private LogAggregationService createLogAggregationService( + ApplicationId appId, + Class<? extends ContainerLogAggregationPolicy> policy, + String parameters) { + return createLogAggregationService(appId, policy.getName(), parameters, + true); + } + + private LogAggregationService createLogAggregationService( + ApplicationId appId, String className, String parameters, + boolean createLogAggContext) { + ConcurrentHashMap<ContainerId, Container> containers = + new ConcurrentHashMap<ContainerId, Container>(); + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + LogAggregationContext logAggContext = null; + + if (createLogAggContext) { + logAggContext = Records.newRecord(LogAggregationContext.class); + logAggContext.setLogAggregationPolicyClassName(className); + if (parameters != null) { + logAggContext.setLogAggregationPolicyParameters(parameters); + } + } + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, + this.user, null, this.acls, logAggContext)); + + return logAggregationService; + } + + private ContainerId createContainer(ApplicationAttemptId appAttemptId1, + long cId, ContainerType containerType) { + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId1, + cId); + Resource r = BuilderUtils.newResource(1024, 1); + ContainerTokenIdentifier containerToken = new ContainerTokenIdentifier( + containerId, context.getNodeId().toString(), user, r, + System.currentTimeMillis() + 100000L, 123, DUMMY_RM_IDENTIFIER, + Priority.newInstance(0), 0, null, null, containerType); + Container container = mock(Container.class); + context.getContainers().put(containerId, container); + when(container.getContainerTokenIdentifier()).thenReturn(containerToken); + when(container.getContainerId()).thenReturn(containerId); + return containerId; + } + + private ContainerId finishContainer(ApplicationId application1, + LogAggregationService logAggregationService, ContainerType containerType, + long cId, int exitCode, String[] logFiles) throws IOException { + ApplicationAttemptId appAttemptId1 = + BuilderUtils.newApplicationAttemptId(application1, 1); + ContainerId containerId = createContainer(appAttemptId1, cId, + containerType); + // Simulate log-file creation + File appLogDir1 = + new File(localLogDir, ConverterUtils.toString(application1)); + appLogDir1.mkdir(); + writeContainerLogs(appLogDir1, containerId, logFiles); + + logAggregationService.handle(new LogHandlerContainerFinishedEvent( + containerId, exitCode)); + return containerId; + + } + + private List<ContainerId> finishContainers(ApplicationId appId, + LogAggregationService logAggregationService, long startingCid, int count, + int exitCode, String[] logFiles) throws IOException { + ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>(); + for (long cid = startingCid; cid < startingCid + count; cid++) { + containerIds.add(finishContainer( + appId, logAggregationService, ContainerType.TASK, cid, exitCode, + logFiles)); + } + return containerIds; + } + + private void finishApplication(ApplicationId appId, + LogAggregationService logAggregationService) throws Exception { + dispatcher.await(); + ApplicationEvent expectedInitEvents[] = + new ApplicationEvent[] { new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) }; + checkEvents(appEventHandler, expectedInitEvents, false, "getType", + "getApplicationID"); + reset(appEventHandler); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + } + + private void verifyLogAggFinishEvent(ApplicationId appId) throws Exception { + dispatcher.await(); + + ApplicationEvent[] expectedFinishedEvents = + new ApplicationEvent[] { new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) }; + checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", + "getApplicationID"); + } + + @Test (timeout = 50000) public void testLogAggregationServiceWithInterval() throws Exception { testLogAggregationService(false); } @@ -1391,17 +1920,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ApplicationId application = BuilderUtils.newApplicationId(123456, 1); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(application, 1); - ContainerId container = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerId container = createContainer(appAttemptId, 1, + ContainerType.APPLICATION_MASTER); - Context context = spy(this.context); ConcurrentMap<ApplicationId, Application> maps = - new ConcurrentHashMap<ApplicationId, Application>(); + this.context.getApplications(); Application app = mock(Application.class); - Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>(); - containers.put(container, mock(Container.class)); maps.put(application, app); - when(app.getContainers()).thenReturn(containers); - when(context.getApplications()).thenReturn(maps); + when(app.getContainers()).thenReturn(this.context.getContainers()); LogAggregationService logAggregationService = new LogAggregationService(dispatcher, context, this.delSrvc, @@ -1415,8 +1941,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new File(localLogDir, ConverterUtils.toString(application)); appLogDir.mkdir(); logAggregationService.handle(new LogHandlerAppStartedEvent(application, - this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, - logAggregationContextWithInterval)); + this.user, null, this.acls, logAggregationContextWithInterval)); LogFileStatusInLastCycle logFileStatusInLastCycle = null; // Simulate log-file creation @@ -1536,7 +2061,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.init(this.conf); logAggregationService.start(); logAggregationService.handle(new LogHandlerAppStartedEvent(application1, - this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, + this.user, null, this.acls, Records.newRecord(LogAggregationContext.class))); // Inject new token for log-aggregation after app log-aggregator init
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index 0bab5ea..46d06da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -148,8 +147,7 @@ public class TestNonAggregatingLogHandler { logHandler.init(conf); logHandler.start(); - logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); @@ -189,8 +187,7 @@ public class TestNonAggregatingLogHandler { logHandler.init(conf); logHandler.start(); - logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); @@ -357,8 +354,7 @@ public class TestNonAggregatingLogHandler { logHandler.init(conf); logHandler.start(); - logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); logHandler.handle(new LogHandlerAppFinishedEvent(appId)); @@ -445,7 +441,7 @@ public class TestNonAggregatingLogHandler { doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup(); logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, - ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls)); + appAcls)); // test case where some dirs have the log dir to delete // mock some dirs throwing various exceptions http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 4cb8e1a..769041b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -231,16 +231,22 @@ public class TestContainerAllocation { LogAggregationContext.newInstance( "includePattern", "excludePattern", "rolledLogsIncludePattern", - "rolledLogsExcludePattern"); + "rolledLogsExcludePattern", + "policyClass", + "policyParameters"); LogAggregationContext returned = getLogAggregationContextFromContainerToken(rm1, nm2, logAggregationContext); Assert.assertEquals("includePattern", returned.getIncludePattern()); Assert.assertEquals("excludePattern", returned.getExcludePattern()); Assert.assertEquals("rolledLogsIncludePattern", - returned.getRolledLogsIncludePattern()); + returned.getRolledLogsIncludePattern()); Assert.assertEquals("rolledLogsExcludePattern", - returned.getRolledLogsExcludePattern()); + returned.getRolledLogsExcludePattern()); + Assert.assertEquals("policyClass", + returned.getLogAggregationPolicyClassName()); + Assert.assertEquals("policyParameters", + returned.getLogAggregationPolicyParameters()); rm1.stop(); }