Repository: hadoop Updated Branches: refs/heads/branch-2.8 ea2e7321d -> 810470508
YARN-5237. Fix missing log files issue in rolling log aggregation. Contributed by Xuan Gong. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81047050 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81047050 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81047050 Branch: refs/heads/branch-2.8 Commit: 810470508bacfddcbb54241e27046168edca2f48 Parents: ea2e732 Author: Junping Du <junping...@apache.org> Authored: Thu Jun 16 07:18:36 2016 -0700 Committer: Junping Du <junping...@apache.org> Committed: Thu Jun 16 07:18:36 2016 -0700 ---------------------------------------------------------------------- .../logaggregation/AggregatedLogFormat.java | 47 ++++++--- .../logaggregation/AppLogAggregatorImpl.java | 9 +- .../TestLogAggregationService.java | 100 ++++++++++++++++++- 3 files changed, 134 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/81047050/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index f5dbc92..5051731 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -168,17 +168,20 @@ public class AggregatedLogFormat { private final Set<String> alreadyUploadedLogFiles; private Set<String> allExistingFileMeta = new HashSet<String>(); private final boolean appFinished; + private final boolean containerFinished; // TODO Maybe add a version string here. Instead of changing the version of // the entire k-v format public LogValue(List<String> rootLogDirs, ContainerId containerId, String user) { - this(rootLogDirs, containerId, user, null, new HashSet<String>(), true); + this(rootLogDirs, containerId, user, null, new HashSet<String>(), true, + true); } public LogValue(List<String> rootLogDirs, ContainerId containerId, String user, LogAggregationContext logAggregationContext, - Set<String> alreadyUploadedLogFiles, boolean appFinished) { + Set<String> alreadyUploadedLogFiles, boolean appFinished, + boolean containerFinished) { this.rootLogDirs = new ArrayList<String>(rootLogDirs); this.containerId = containerId; this.user = user; @@ -188,6 +191,7 @@ public class AggregatedLogFormat { this.logAggregationContext = logAggregationContext; this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; this.appFinished = appFinished; + this.containerFinished = containerFinished; } private Set<File> getPendingLogFilesToUploadForThisContainer() { @@ -294,28 +298,39 @@ public class AggregatedLogFormat { this.allExistingFileMeta.add(getLogFileMetaData(logFile)); } + Set<File> fileCandidates = new HashSet<File>(candidates); if (this.logAggregationContext != null && candidates.size() > 0) { - filterFiles( - this.appFinished ? this.logAggregationContext.getIncludePattern() + fileCandidates = getFileCandidates(fileCandidates, this.appFinished); + if (!this.appFinished && this.containerFinished) { + Set<File> addition = new HashSet<File>(candidates); + addition = getFileCandidates(addition, true); + fileCandidates.addAll(addition); + } + } + return fileCandidates; + } + + private Set<File> getFileCandidates(Set<File> candidates, + boolean useRegularPattern) { + filterFiles( + useRegularPattern ? this.logAggregationContext.getIncludePattern() : this.logAggregationContext.getRolledLogsIncludePattern(), candidates, false); - filterFiles( - this.appFinished ? this.logAggregationContext.getExcludePattern() + filterFiles( + useRegularPattern ? this.logAggregationContext.getExcludePattern() : this.logAggregationContext.getRolledLogsExcludePattern(), candidates, true); - Iterable<File> mask = - Iterables.filter(candidates, new Predicate<File>() { - @Override - public boolean apply(File next) { - return !alreadyUploadedLogFiles + Iterable<File> mask = + Iterables.filter(candidates, new Predicate<File>() { + @Override + public boolean apply(File next) { + return !alreadyUploadedLogFiles .contains(getLogFileMetaData(next)); - } - }); - candidates = Sets.newHashSet(mask); - } - return candidates; + } + }); + return Sets.newHashSet(mask); } private void filterFiles(String pattern, Set<File> candidates, http://git-wip-us.apache.org/repos/asf/hadoop/blob/81047050/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index a5b1e2c..fa07c59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -348,7 +348,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { containerLogAggregators.put(container, aggregator); } Set<Path> uploadedFilePathsInThisCycle = - aggregator.doContainerLogAggregation(writer, appFinished); + aggregator.doContainerLogAggregation(writer, appFinished, + finishedContainers.contains(container)); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; this.delService.delete(this.userUgi.getShortUserName(), null, @@ -650,15 +651,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } public Set<Path> doContainerLogAggregation(LogWriter writer, - boolean appFinished) { + boolean appFinished, boolean containerFinished) { LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " + StringUtils.join(",", dirsHandler.getLogDirsForRead())); final LogKey logKey = new LogKey(containerId); final LogValue logValue = new LogValue(dirsHandler.getLogDirsForRead(), containerId, - userUgi.getShortUserName(), logAggregationContext, - this.uploadedFileMeta, appFinished); + userUgi.getShortUserName(), logAggregationContext, + this.uploadedFileMeta, appFinished, containerFinished); try { writer.append(logKey, logValue); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/81047050/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index d56d030..c0a3f3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -1435,6 +1435,102 @@ public class TestLogAggregationService extends BaseContainerManagerTest { "getApplicationID"); } + @SuppressWarnings("resource") + @Test (timeout = 50000) + public void testLogAggregationServiceWithPatternsAndIntervals() + throws Exception { + LogAggregationContext logAggregationContext = + Records.newRecord(LogAggregationContext.class); + // set IncludePattern and RolledLogsIncludePattern. + // When the app is running, we only aggregate the log with + // the name stdout. After the app finishes, we only aggregate + // the log with the name std_final. + logAggregationContext.setRolledLogsIncludePattern("stdout"); + logAggregationContext.setIncludePattern("std_final"); + this.conf.set( + YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + //configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to + //have fully qualified path + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.toURI().toString()); + this.conf.setLong( + YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + 3600); + + this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + + ApplicationId application = + BuilderUtils.newApplicationId(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application, 1); + ContainerId container = createContainer(appAttemptId, 1, + ContainerType.APPLICATION_MASTER); + + ConcurrentMap<ApplicationId, Application> maps = + this.context.getApplications(); + Application app = mock(Application.class); + maps.put(application, app); + when(app.getContainers()).thenReturn(this.context.getContainers()); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, context, this.delSrvc, + super.dirsHandler); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + // AppLogDir should be created + File appLogDir = + new File(localLogDir, ConverterUtils.toString(application)); + appLogDir.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application, + this.user, null, this.acls, logAggregationContext)); + + // Simulate log-file creation + // create std_final in log directory which will not be aggregated + // until the app finishes. + String[] logFilesWithFinalLog = + new String[] {"stdout", "std_final"}; + writeContainerLogs(appLogDir, container, logFilesWithFinalLog); + + // Do log aggregation + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(application); + + aggregator.doLogAggregationOutOfBand(); + + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 1, false, null)); + + String[] logFiles = new String[] { "stdout" }; + verifyContainerLogs(logAggregationService, application, + new ContainerId[] {container}, logFiles, 1, true); + + logAggregationService.handle( + new LogHandlerContainerFinishedEvent(container, 0)); + + dispatcher.await(); + + // Do the log aggregation after ContainerFinishedEvent but before + // AppFinishedEvent. The std_final is expected to be aggregated this time + // even if the app is running but the container finishes. + aggregator.doLogAggregationOutOfBand(); + + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 2, false, null)); + + // This container finishes. + // The log "std_final" should be aggregated this time. + String[] logFinalLog = new String[] {"std_final"}; + verifyContainerLogs(logAggregationService, application, + new ContainerId[] {container}, logFinalLog, 1, true); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); + + logAggregationService.stop(); + } + @Test (timeout = 50000) @SuppressWarnings("unchecked") public void testNoneContainerPolicy() throws Exception { @@ -1443,14 +1539,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest { LogAggregationService logAggregationService = createLogAggregationService( appId, NoneContainerLogAggregationPolicy.class, null); - String[] logFiles = new String[] { "stdout" }; + String[] logFiles = new String[] {"stdout"}; ContainerId container1 = finishContainer(appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0, logFiles); finishApplication(appId, logAggregationService); verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1 }, logFiles, 0, false); + new ContainerId[] {container1}, logFiles, 0, false); verifyLogAggFinishEvent(appId); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org