This is an automated email from the ASF dual-hosted git repository. eyang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 7b03072 YARN-9080. Added clean up of bucket directories. Contributed by Prabhu Joseph, Peter Bacsko, Szilard Nemeth 7b03072 is described below commit 7b03072fd466de5817fdcd65f9dd88fd59c0bb00 Author: Eric Yang <ey...@apache.org> AuthorDate: Thu May 23 12:08:44 2019 -0400 YARN-9080. Added clean up of bucket directories. Contributed by Prabhu Joseph, Peter Bacsko, Szilard Nemeth --- .../timeline/EntityGroupFSTimelineStore.java | 65 +++++++++++++++++----- .../timeline/TestEntityGroupFSTimelineStore.java | 53 +++++++++++++++++- 2 files changed, 100 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 80baf89..498230a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -456,43 +458,76 @@ public class EntityGroupFSTimelineStore extends CompositeService * dirpath should be a directory that contains a set of * application log directories. The cleaner method will not * work if the given dirpath itself is an application log dir. - * @param fs * @param retainMillis * @throws IOException */ @InterfaceAudience.Private @VisibleForTesting - void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) + void cleanLogs(Path dirpath, long retainMillis) throws IOException { long now = Time.now(); + RemoteIterator<FileStatus> iter = list(dirpath); + while (iter.hasNext()) { + FileStatus stat = iter.next(); + Path clusterTimeStampPath = stat.getPath(); + if (isValidClusterTimeStampDir(clusterTimeStampPath)) { + MutableBoolean appLogDirPresent = new MutableBoolean(false); + cleanAppLogDir(clusterTimeStampPath, retainMillis, appLogDirPresent); + if (appLogDirPresent.isFalse() && + (now - stat.getModificationTime() > retainMillis)) { + deleteDir(clusterTimeStampPath); + } + } + } + } + + + private void cleanAppLogDir(Path dirpath, long retainMillis, + MutableBoolean appLogDirPresent) throws IOException { + long now = Time.now(); // Depth first search from root directory for all application log dirs RemoteIterator<FileStatus> iter = list(dirpath); while (iter.hasNext()) { FileStatus stat = iter.next(); + Path childPath = stat.getPath(); if (stat.isDirectory()) { // If current is an application log dir, decide if we need to remove it // and remove if necessary. // Otherwise, keep iterating into it. - ApplicationId appId = parseApplicationId(dirpath.getName()); + ApplicationId appId = parseApplicationId(childPath.getName()); if (appId != null) { // Application log dir - if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) { - try { - LOG.info("Deleting {}", dirpath); - if (!fs.delete(dirpath, true)) { - LOG.error("Unable to remove " + dirpath); - } - metrics.incrLogsDirsCleaned(); - } catch (IOException e) { - LOG.error("Unable to remove " + dirpath, e); - } + appLogDirPresent.setTrue(); + if (shouldCleanAppLogDir(childPath, now, fs, retainMillis)) { + deleteDir(childPath); } } else { // Keep cleaning inside - cleanLogs(stat.getPath(), fs, retainMillis); + cleanAppLogDir(childPath, retainMillis, appLogDirPresent); } } } } + private void deleteDir(Path path) { + try { + LOG.info("Deleting {}", path); + if (fs.delete(path, true)) { + metrics.incrLogsDirsCleaned(); + } else { + LOG.error("Unable to remove {}", path); + } + } catch (IOException e) { + LOG.error("Unable to remove {}", path, e); + } + } + + private boolean isValidClusterTimeStampDir(Path clusterTimeStampPath) + throws IOException { + FileStatus stat = fs.getFileStatus(clusterTimeStampPath); + return stat.isDirectory() && + StringUtils.isNumeric(clusterTimeStampPath.getName()); + } + + private static boolean shouldCleanAppLogDir(Path appLogPath, long now, FileSystem fs, long logRetainMillis) throws IOException { RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath); @@ -908,7 +943,7 @@ public class EntityGroupFSTimelineStore extends CompositeService LOG.debug("Cleaner starting"); long startTime = Time.monotonicNow(); try { - cleanLogs(doneRootPath, fs, logRetainMillis); + cleanLogs(doneRootPath, logRetainMillis); } catch (Exception e) { Throwable t = extract(e); if (t instanceof InterruptedException) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index 6435416..dc10912 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -268,7 +268,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { Path irrelevantDirPath = new Path(testDoneDirPath, "irrelevant"); fs.mkdirs(irrelevantDirPath); - Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001"); + Path doneAppHomeDir = new Path(new Path(new Path(testDoneDirPath, + Long.toString(mainTestAppId.getClusterTimestamp())), "0000"), "001"); // First application, untouched after creation Path appDirClean = new Path(doneAppHomeDir, appDirName); Path attemptDirClean = new Path(appDirClean, attemptDirName); @@ -300,7 +301,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { // Should retain all logs after this run MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned(); long before = dirsCleaned.value(); - store.cleanLogs(testDoneDirPath, fs, 10000); + store.cleanLogs(testDoneDirPath, 10000); assertTrue(fs.exists(irrelevantDirPath)); assertTrue(fs.exists(irrelevantFilePath)); assertTrue(fs.exists(filePath)); @@ -317,7 +318,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { // Touch the third application by creating a new dir fs.mkdirs(new Path(dirPathHold, "holdByMe")); - store.cleanLogs(testDoneDirPath, fs, 1000); + store.cleanLogs(testDoneDirPath, 1000); // Verification after the second cleaner call assertTrue(fs.exists(irrelevantDirPath)); @@ -333,6 +334,52 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { } @Test + public void testCleanBuckets() throws Exception { + // ClusterTimeStampDir with App Log Dirs + Path clusterTimeStampDir1 = new Path(testDoneDirPath, + Long.toString(sampleAppIds.get(0).getClusterTimestamp())); + Path appDir1 = new Path(new Path(new Path( + clusterTimeStampDir1, "0000"), "000"), sampleAppIds.get(0).toString()); + Path appDir2 = new Path(new Path(new Path( + clusterTimeStampDir1, "0000"), "001"), sampleAppIds.get(1).toString()); + Path appDir3 = new Path(new Path(new Path( + clusterTimeStampDir1, "0000"), "002"), sampleAppIds.get(2).toString()); + Path appDir4 = new Path(new Path(new Path( + clusterTimeStampDir1, "0001"), "000"), sampleAppIds.get(3).toString()); + + // ClusterTimeStampDir with no App Log Dirs + Path clusterTimeStampDir2 = new Path(testDoneDirPath, "1235"); + + // Irrevelant ClusterTimeStampDir + Path clusterTimeStampDir3 = new Path(testDoneDirPath, "irrevelant"); + Path appDir5 = new Path(new Path(new Path( + clusterTimeStampDir3, "0000"), "000"), sampleAppIds.get(4).toString()); + + fs.mkdirs(appDir1); + fs.mkdirs(appDir2); + fs.mkdirs(appDir3); + fs.mkdirs(appDir4); + fs.mkdirs(clusterTimeStampDir2); + fs.mkdirs(appDir5); + + Thread.sleep(2000); + + store.cleanLogs(testDoneDirPath, 1000); + + // ClusterTimeStampDir will be removed only if no App Log Dir Present + assertTrue(fs.exists(clusterTimeStampDir1)); + assertFalse(fs.exists(appDir1)); + assertFalse(fs.exists(appDir2)); + assertFalse(fs.exists(appDir3)); + assertFalse(fs.exists(appDir4)); + assertFalse(fs.exists(clusterTimeStampDir2)); + assertTrue(fs.exists(appDir5)); + + store.cleanLogs(testDoneDirPath, 1000); + assertFalse(fs.exists(clusterTimeStampDir1)); + } + + @Test public void testPluginRead() throws Exception { // Verify precondition assertEquals(EntityGroupPlugInForTest.class.getName(), --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org