This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4769febaccc  HADOOP-19554. LocalDirAllocator still doesn't always 
recover from directory deletion (#7651)
4769febaccc is described below

commit 4769febaccc3a55ca61f600db0a2eaf11459b333
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Mon May 12 17:31:31 2025 +0100

     HADOOP-19554. LocalDirAllocator still doesn't always recover from 
directory deletion (#7651)
    
    
    The LocalDIrManager class used to allocate disk space for HDFS block 
storage and cloud connector buffers
    should  now recover from directory deletion on all codepaths.
    This situation can occur for the cloud buffer uses where cron jobs can 
clean up temp directories on a schedule.
    
    There is also significantly better logging of the allocation process in the 
case of failure:
    all steps attempted before concluding there was no disk space will be 
logged at
    error on the failing host.
    
    Contributed by Steve Loughran
---
 .../org/apache/hadoop/fs/LocalDirAllocator.java    | 118 ++++++++++++++++-----
 .../apache/hadoop/fs/TestLocalDirAllocator.java    |  44 ++++++--
 2 files changed, 129 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
index d8ab16f41d3..ba7a5418e61 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
@@ -65,7 +65,10 @@
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Unstable
 public class LocalDirAllocator {
-  
+
+  static final String E_NO_SPACE_AVAILABLE =
+      "No space available in any of the local directories";
+
   //A Map from the config item names like "mapred.local.dir"
   //to the instance of the AllocatorPerContext. This
   //is a static object to make sure there exists exactly one instance per JVM
@@ -384,6 +387,24 @@ int getCurrentDirectoryIndex() {
       return currentContext.get().dirNumLastAccessed.get();
     }
 
+    /**
+     * Format a string, log at debug and append it to the history as a new 
line.
+     *
+     * @param history history to fill in
+     * @param fmt format string
+     * @param args varags
+     */
+    private void note(StringBuilder history, String fmt, Object... args) {
+      try {
+        final String s = String.format(fmt, args);
+        history.append(s).append("\n");
+        LOG.debug(s);
+      } catch (Exception e) {
+        // some resilience in case the format string is wrong
+        LOG.debug(fmt, e);
+      }
+    }
+
     /** Get a path from the local FS. If size is known, we go
      *  round-robin over the set of disks (via the configured dirs) and return
      *  the first complete path which has enough space.
@@ -393,6 +414,12 @@ int getCurrentDirectoryIndex() {
      */
     public Path getLocalPathForWrite(String pathStr, long size,
         Configuration conf, boolean checkWrite) throws IOException {
+
+      // history is built up and logged at error if the alloc
+      StringBuilder history = new StringBuilder();
+
+      note(history, "Searchng for a directory for file at %s, size = %,d; 
checkWrite=%s",
+          pathStr, size, checkWrite);
       Context ctx = confChanged(conf);
       int numDirs = ctx.localDirs.length;
       int numDirsSearched = 0;
@@ -406,27 +433,56 @@ public Path getLocalPathForWrite(String pathStr, long 
size,
         pathStr = pathStr.substring(1);
       }
       Path returnPath = null;
-      
-      if(size == SIZE_UNKNOWN) {  //do roulette selection: pick dir with 
probability 
-                    //proportional to available size
-        long[] availableOnDisk = new long[ctx.dirDF.length];
-        long totalAvailable = 0;
-        
-            //build the "roulette wheel"
-        for(int i =0; i < ctx.dirDF.length; ++i) {
-          final DF target = ctx.dirDF[i];
-          // attempt to recreate the dir so that getAvailable() is valid
-          // if it fails, getAvailable() will return 0, so the dir will
-          // be declared unavailable.
-          // return value is logged at debug to keep spotbugs quiet.
-          final boolean b = new File(target.getDirPath()).mkdirs();
-          LOG.debug("mkdirs of {}={}", target, b);
-          availableOnDisk[i] = target.getAvailable();
+
+      final int dirCount = ctx.dirDF.length;
+      long[] availableOnDisk = new long[dirCount];
+      long totalAvailable = 0;
+
+      StringBuilder pathNames = new StringBuilder();
+
+      //build the "roulette wheel"
+      for (int i =0; i < dirCount; ++i) {
+        final DF target = ctx.dirDF[i];
+        // attempt to recreate the dir so that getAvailable() is valid
+        // if it fails, getAvailable() will return 0, so the dir will
+        // be declared unavailable.
+        // return value is logged at debug to keep spotbugs quiet.
+        final String name = target.getDirPath();
+        pathNames.append(" ").append(name);
+        final File dirPath = new File(name);
+
+        // existence probe
+        if (!dirPath.exists()) {
+          LOG.debug("creating buffer dir {}", name);
+          if (dirPath.mkdirs()) {
+            note(history, "Created buffer dir %s", name);
+          } else {
+            note(history, "Failed to create buffer dir %s", name);
+          }
+        }
+
+        // path already existed or the mkdir call had an outcome
+        // make sure the path is present and a dir, and if so add its 
availability
+        if (dirPath.isDirectory()) {
+          final long available = target.getAvailable();
+          availableOnDisk[i] = available;
+          note(history, "%s available under path %s",  pathStr, available);
           totalAvailable += availableOnDisk[i];
+        } else {
+          note(history, "%s does not exist/is not a directory",  pathStr);
         }
+      }
+
+      note(history, "Directory count is %d; total available capacity is %,d{}",
+          dirCount, totalAvailable);
 
-        if (totalAvailable == 0){
-          throw new DiskErrorException("No space available in any of the local 
directories.");
+      if (size == SIZE_UNKNOWN) {
+        //do roulette selection: pick dir with probability
+        // proportional to available size
+        note(history, "Size not specified, so picking at random.");
+
+        if (totalAvailable == 0) {
+          throw new DiskErrorException(E_NO_SPACE_AVAILABLE + pathNames + "; 
history=" + history);
         }
 
         // Keep rolling the wheel till we get a valid path
@@ -439,14 +495,19 @@ public Path getLocalPathForWrite(String pathStr, long 
size,
             dir++;
           }
           ctx.dirNumLastAccessed.set(dir);
-          returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
+          final Path localDir = ctx.localDirs[dir];
+          returnPath = createPath(localDir, pathStr, checkWrite);
           if (returnPath == null) {
             totalAvailable -= availableOnDisk[dir];
             availableOnDisk[dir] = 0; // skip this disk
             numDirsSearched++;
+            note(history, "No capacity in %s", localDir);
+          } else {
+            note(history, "Allocated file %s in %s", returnPath, localDir);
           }
         }
       } else {
+        note(history, "Size is %,d; searching", size);
         // Start linear search with random increment if possible
         int randomInc = 1;
         if (numDirs > 2) {
@@ -459,17 +520,22 @@ public Path getLocalPathForWrite(String pathStr, long 
size,
             maxCapacity = capacity;
           }
           if (capacity > size) {
+            final Path localDir = ctx.localDirs[dirNum];
             try {
-              returnPath = createPath(ctx.localDirs[dirNum], pathStr,
-                  checkWrite);
+              returnPath = createPath(localDir, pathStr, checkWrite);
             } catch (IOException e) {
               errorText = e.getMessage();
               diskException = e;
-              LOG.debug("DiskException caught for dir {}", 
ctx.localDirs[dirNum], e);
+              note(history, "Exception while creating path %s: %s", localDir, 
errorText);
+              LOG.debug("DiskException caught for dir {}", localDir, e);
             }
             if (returnPath != null) {
+              // success
               ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
+              note(history, "Allocated file %s in %s", returnPath, localDir);
               break;
+            } else {
+              note(history, "No capacity in %s", localDir);
             }
           }
           dirNum++;
@@ -484,10 +550,14 @@ public Path getLocalPathForWrite(String pathStr, long 
size,
       //no path found
       String newErrorText = "Could not find any valid local directory for " +
           pathStr + " with requested size " + size +
-          " as the max capacity in any directory is " + maxCapacity;
+          " as the max capacity in any directory"
+          + " (" + pathNames + " )"
+          + " is " + maxCapacity;
       if (errorText != null) {
         newErrorText = newErrorText + " due to " + errorText;
       }
+      LOG.error(newErrorText);
+      LOG.error(history.toString());
       throw new DiskErrorException(newErrorText, diskException);
     }
 
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java
index eb6d251add0..0e89e29dace 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java
@@ -26,14 +26,16 @@
 import java.util.NoSuchElementException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import static org.apache.hadoop.fs.LocalDirAllocator.E_NO_SPACE_AVAILABLE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -564,13 +566,8 @@ public void testGetLocalPathForWriteForInvalidPaths(String 
paramRoot, String par
       throws Exception {
     initTestLocalDirAllocator(paramRoot, paramPrefix);
     conf.set(CONTEXT, " ");
-    try {
-      dirAllocator.getLocalPathForWrite("/test", conf);
-      fail("not throwing the exception");
-    } catch (IOException e) {
-      assertEquals("No space available in any of the local directories.",
-          e.getMessage(), "Incorrect exception message");
-    }
+    intercept(IOException.class, E_NO_SPACE_AVAILABLE, () ->
+        dirAllocator.getLocalPathForWrite("/test", conf));
   }
 
   /**
@@ -587,10 +584,13 @@ public void testGetLocalPathForWriteForLessSpace(String 
paramRoot, String paramP
     String dir0 = buildBufferDir(root, 0);
     String dir1 = buildBufferDir(root, 1);
     conf.set(CONTEXT, dir0 + "," + dir1);
-    LambdaTestUtils.intercept(DiskErrorException.class,
+    final DiskErrorException ex = intercept(DiskErrorException.class,
         String.format("Could not find any valid local directory for %s with 
requested size %s",
             "p1/x", Long.MAX_VALUE - 1), "Expect a DiskErrorException.",
         () -> dirAllocator.getLocalPathForWrite("p1/x", Long.MAX_VALUE - 1, 
conf));
+    Assertions.assertThat(ex.getMessage())
+        .contains(new File(dir0).getName())
+        .contains(new File(dir1).getName());
   }
 
   /**
@@ -614,5 +614,31 @@ public void testDirectoryRecovery(String paramRoot, String 
paramPrefix) throws T
     // and expect to get a new file back
     dirAllocator.getLocalPathForWrite("file2", -1, conf);
   }
+
+
+  /**
+   * Test for HADOOP-19554. LocalDirAllocator still doesn't always recover
+   * from directory tree deletion.
+   */
+  @Timeout(value = 30)
+  @MethodSource("params")
+  @ParameterizedTest
+  public void testDirectoryRecoveryKnownSize(String paramRoot, String 
paramPrefix) throws Throwable {
+    initTestLocalDirAllocator(paramRoot, paramPrefix);
+    String dir0 = buildBufferDir(root, 0);
+    String subdir = dir0 + "/subdir1/subdir2";
+
+    conf.set(CONTEXT, subdir);
+    // get local path and an ancestor
+    final Path pathForWrite = dirAllocator.getLocalPathForWrite("file", 512, 
conf);
+    final Path ancestor = pathForWrite.getParent().getParent();
+
+    // delete that ancestor
+    localFs.delete(ancestor, true);
+    // and expect to get a new file back
+    dirAllocator.getLocalPathForWrite("file2", -1, conf);
+  }
+
+
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to