This is an automated email from the ASF dual-hosted git repository.

eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b03072  YARN-9080.  Added clean up of bucket directories.             
Contributed by Prabhu Joseph, Peter Bacsko, Szilard Nemeth
7b03072 is described below

commit 7b03072fd466de5817fdcd65f9dd88fd59c0bb00
Author: Eric Yang <ey...@apache.org>
AuthorDate: Thu May 23 12:08:44 2019 -0400

    YARN-9080.  Added clean up of bucket directories.
                Contributed by Prabhu Joseph, Peter Bacsko, Szilard Nemeth
---
 .../timeline/EntityGroupFSTimelineStore.java       | 65 +++++++++++++++++-----
 .../timeline/TestEntityGroupFSTimelineStore.java   | 53 +++++++++++++++++-
 2 files changed, 100 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index 80baf89..498230a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -456,43 +458,76 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
    *                dirpath should be a directory that contains a set of
    *                application log directories. The cleaner method will not
    *                work if the given dirpath itself is an application log dir.
-   * @param fs
    * @param retainMillis
    * @throws IOException
    */
   @InterfaceAudience.Private
   @VisibleForTesting
-  void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
+  void cleanLogs(Path dirpath, long retainMillis)
       throws IOException {
     long now = Time.now();
+    RemoteIterator<FileStatus> iter = list(dirpath);
+    while (iter.hasNext()) {
+      FileStatus stat = iter.next();
+      Path clusterTimeStampPath = stat.getPath();
+      if (isValidClusterTimeStampDir(clusterTimeStampPath)) {
+        MutableBoolean appLogDirPresent = new MutableBoolean(false);
+        cleanAppLogDir(clusterTimeStampPath, retainMillis, appLogDirPresent);
+        if (appLogDirPresent.isFalse() &&
+            (now - stat.getModificationTime() > retainMillis)) {
+          deleteDir(clusterTimeStampPath);
+        }
+      }
+    }
+  }
+
+
+  private void cleanAppLogDir(Path dirpath, long retainMillis,
+      MutableBoolean appLogDirPresent) throws IOException {
+    long now = Time.now();
     // Depth first search from root directory for all application log dirs
     RemoteIterator<FileStatus> iter = list(dirpath);
     while (iter.hasNext()) {
       FileStatus stat = iter.next();
+      Path childPath = stat.getPath();
       if (stat.isDirectory()) {
         // If current is an application log dir, decide if we need to remove it
         // and remove if necessary.
         // Otherwise, keep iterating into it.
-        ApplicationId appId = parseApplicationId(dirpath.getName());
+        ApplicationId appId = parseApplicationId(childPath.getName());
         if (appId != null) { // Application log dir
-          if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) {
-            try {
-              LOG.info("Deleting {}", dirpath);
-              if (!fs.delete(dirpath, true)) {
-                LOG.error("Unable to remove " + dirpath);
-              }
-              metrics.incrLogsDirsCleaned();
-            } catch (IOException e) {
-              LOG.error("Unable to remove " + dirpath, e);
-            }
+          appLogDirPresent.setTrue();
+          if (shouldCleanAppLogDir(childPath, now, fs, retainMillis)) {
+            deleteDir(childPath);
           }
         } else { // Keep cleaning inside
-          cleanLogs(stat.getPath(), fs, retainMillis);
+          cleanAppLogDir(childPath, retainMillis, appLogDirPresent);
         }
       }
     }
   }
 
+  private void deleteDir(Path path) {
+    try {
+      LOG.info("Deleting {}", path);
+      if (fs.delete(path, true)) {
+        metrics.incrLogsDirsCleaned();
+      } else {
+        LOG.error("Unable to remove {}", path);
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to remove {}", path, e);
+    }
+  }
+
+  private boolean isValidClusterTimeStampDir(Path clusterTimeStampPath)
+      throws IOException {
+    FileStatus stat = fs.getFileStatus(clusterTimeStampPath);
+    return stat.isDirectory() &&
+        StringUtils.isNumeric(clusterTimeStampPath.getName());
+  }
+
+
   private static boolean shouldCleanAppLogDir(Path appLogPath, long now,
       FileSystem fs, long logRetainMillis) throws IOException {
     RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath);
@@ -908,7 +943,7 @@ public class EntityGroupFSTimelineStore extends 
CompositeService
       LOG.debug("Cleaner starting");
       long startTime = Time.monotonicNow();
       try {
-        cleanLogs(doneRootPath, fs, logRetainMillis);
+        cleanLogs(doneRootPath, logRetainMillis);
       } catch (Exception e) {
         Throwable t = extract(e);
         if (t instanceof InterruptedException) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index 6435416..dc10912 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -268,7 +268,8 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
     Path irrelevantDirPath = new Path(testDoneDirPath, "irrelevant");
     fs.mkdirs(irrelevantDirPath);
 
-    Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001");
+    Path doneAppHomeDir = new Path(new Path(new Path(testDoneDirPath,
+        Long.toString(mainTestAppId.getClusterTimestamp())), "0000"), "001");
     // First application, untouched after creation
     Path appDirClean = new Path(doneAppHomeDir, appDirName);
     Path attemptDirClean = new Path(appDirClean, attemptDirName);
@@ -300,7 +301,7 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
     // Should retain all logs after this run
     MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned();
     long before = dirsCleaned.value();
-    store.cleanLogs(testDoneDirPath, fs, 10000);
+    store.cleanLogs(testDoneDirPath, 10000);
     assertTrue(fs.exists(irrelevantDirPath));
     assertTrue(fs.exists(irrelevantFilePath));
     assertTrue(fs.exists(filePath));
@@ -317,7 +318,7 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
     // Touch the third application by creating a new dir
     fs.mkdirs(new Path(dirPathHold, "holdByMe"));
 
-    store.cleanLogs(testDoneDirPath, fs, 1000);
+    store.cleanLogs(testDoneDirPath, 1000);
 
     // Verification after the second cleaner call
     assertTrue(fs.exists(irrelevantDirPath));
@@ -333,6 +334,52 @@ public class TestEntityGroupFSTimelineStore extends 
TimelineStoreTestUtils {
   }
 
   @Test
+  public void testCleanBuckets() throws Exception {
+    // ClusterTimeStampDir with App Log Dirs
+    Path clusterTimeStampDir1 = new Path(testDoneDirPath,
+        Long.toString(sampleAppIds.get(0).getClusterTimestamp()));
+    Path appDir1 = new Path(new Path(new Path(
+        clusterTimeStampDir1, "0000"), "000"), sampleAppIds.get(0).toString());
+    Path appDir2 = new Path(new Path(new Path(
+        clusterTimeStampDir1, "0000"), "001"), sampleAppIds.get(1).toString());
+    Path appDir3 = new Path(new Path(new Path(
+        clusterTimeStampDir1, "0000"), "002"), sampleAppIds.get(2).toString());
+    Path appDir4 = new Path(new Path(new Path(
+        clusterTimeStampDir1, "0001"), "000"), sampleAppIds.get(3).toString());
+
+    // ClusterTimeStampDir with no App Log Dirs
+    Path clusterTimeStampDir2 = new Path(testDoneDirPath, "1235");
+
+    // Irrevelant ClusterTimeStampDir
+    Path clusterTimeStampDir3 = new Path(testDoneDirPath, "irrevelant");
+    Path appDir5 = new Path(new Path(new Path(
+        clusterTimeStampDir3, "0000"), "000"), sampleAppIds.get(4).toString());
+
+    fs.mkdirs(appDir1);
+    fs.mkdirs(appDir2);
+    fs.mkdirs(appDir3);
+    fs.mkdirs(appDir4);
+    fs.mkdirs(clusterTimeStampDir2);
+    fs.mkdirs(appDir5);
+
+    Thread.sleep(2000);
+
+    store.cleanLogs(testDoneDirPath, 1000);
+
+    // ClusterTimeStampDir will be removed only if no App Log Dir Present
+    assertTrue(fs.exists(clusterTimeStampDir1));
+    assertFalse(fs.exists(appDir1));
+    assertFalse(fs.exists(appDir2));
+    assertFalse(fs.exists(appDir3));
+    assertFalse(fs.exists(appDir4));
+    assertFalse(fs.exists(clusterTimeStampDir2));
+    assertTrue(fs.exists(appDir5));
+
+    store.cleanLogs(testDoneDirPath, 1000);
+    assertFalse(fs.exists(clusterTimeStampDir1));
+  }
+
+  @Test
   public void testPluginRead() throws Exception {
     // Verify precondition
     assertEquals(EntityGroupPlugInForTest.class.getName(),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to