This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new e6cf6e6d10b HADOOP-19554. LocalDirAllocator still doesn't always recover from directory deletion (#7683) (#7685) e6cf6e6d10b is described below commit e6cf6e6d10bb2e8e4963e9e78d9c3ade3f0bf161 Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Mon May 19 15:26:06 2025 +0100 HADOOP-19554. LocalDirAllocator still doesn't always recover from directory deletion (#7683) (#7685) The LocalDIrManager class used to allocate disk space for HDFS block storage and cloud connector buffers should now recover from directory deletion on all codepaths. This situation can occur in cloud connectors uses when cron jobs clean up temp directories on a schedule. There is also significantly better logging of the allocation process failures. Contributed by Steve Loughran --- .../org/apache/hadoop/fs/LocalDirAllocator.java | 133 +++++++++++++++++---- .../apache/hadoop/fs/TestLocalDirAllocator.java | 33 +++-- 2 files changed, 133 insertions(+), 33 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index d8ab16f41d3..6808bdc5953 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.net.NetUtils.getHostname; + /** An implementation of a round-robin scheme for disk allocation for creating * files. The way it works is that it is kept track what disk was last * allocated for a file write. For the current request, the next disk from @@ -65,7 +67,10 @@ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Unstable public class LocalDirAllocator { - + + static final String E_NO_SPACE_AVAILABLE = + "No space available in any of the local directories"; + //A Map from the config item names like "mapred.local.dir" //to the instance of the AllocatorPerContext. This //is a static object to make sure there exists exactly one instance per JVM @@ -384,6 +389,24 @@ int getCurrentDirectoryIndex() { return currentContext.get().dirNumLastAccessed.get(); } + /** + * Format a string, log at debug and append it to the history as a new line. + * + * @param history history to fill in + * @param fmt format string + * @param args varags + */ + private void note(StringBuilder history, String fmt, Object... args) { + try { + final String s = String.format(fmt, args); + history.append(s).append("\n"); + LOG.debug(s); + } catch (Exception e) { + // some resilience in case the format string is wrong + LOG.debug(fmt, e); + } + } + /** Get a path from the local FS. If size is known, we go * round-robin over the set of disks (via the configured dirs) and return * the first complete path which has enough space. @@ -393,6 +416,12 @@ int getCurrentDirectoryIndex() { */ public Path getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite) throws IOException { + + // history is built up and logged at error if the alloc + StringBuilder history = new StringBuilder(); + + note(history, "Searching for a directory for file \"%s\", size = %,d; checkWrite=%s", + pathStr, size, checkWrite); Context ctx = confChanged(conf); int numDirs = ctx.localDirs.length; int numDirsSearched = 0; @@ -406,27 +435,62 @@ public Path getLocalPathForWrite(String pathStr, long size, pathStr = pathStr.substring(1); } Path returnPath = null; - - if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability - //proportional to available size - long[] availableOnDisk = new long[ctx.dirDF.length]; - long totalAvailable = 0; - - //build the "roulette wheel" - for(int i =0; i < ctx.dirDF.length; ++i) { - final DF target = ctx.dirDF[i]; - // attempt to recreate the dir so that getAvailable() is valid - // if it fails, getAvailable() will return 0, so the dir will - // be declared unavailable. - // return value is logged at debug to keep spotbugs quiet. - final boolean b = new File(target.getDirPath()).mkdirs(); - LOG.debug("mkdirs of {}={}", target, b); - availableOnDisk[i] = target.getAvailable(); + + final int dirCount = ctx.dirDF.length; + long[] availableOnDisk = new long[dirCount]; + long totalAvailable = 0; + + StringBuilder pathNames = new StringBuilder(); + + //build the "roulette wheel" + for (int i =0; i < dirCount; ++i) { + final DF target = ctx.dirDF[i]; + // attempt to recreate the dir so that getAvailable() is valid + // if it fails, getAvailable() will return 0, so the dir will + // be declared unavailable. + // return value is logged at debug to keep spotbugs quiet. + final String name = target.getDirPath(); + pathNames.append(" ").append(name); + final File dirPath = new File(name); + + // existence probe with directory recreation + if (!dirPath.exists()) { + LOG.debug("Creating buffer dir {}", name); + if (dirPath.mkdirs()) { + note(history, "Created buffer dir %s", name); + } else { + note(history, "Failed to create buffer dir %s", name); + } + } + + // path already existed or the mkdir call had an outcome + // make sure the path is present and a dir, and if so add its availability + if (dirPath.isDirectory()) { + final long available = target.getAvailable(); + availableOnDisk[i] = available; + note(history, "%,d bytes available under path %s", available, name); totalAvailable += availableOnDisk[i]; + } else { + note(history, "%s does not exist/is not a directory", name); } + } - if (totalAvailable == 0){ - throw new DiskErrorException("No space available in any of the local directories."); + note(history, "Directory count is %d; total available capacity is %,d", + dirCount, totalAvailable); + + if (size == SIZE_UNKNOWN) { + //do roulette selection: pick dir with probability + // proportional to available size + note(history, "Size not specified, so picking directories at random."); + + if (totalAvailable == 0) { + // log error and history + String newErrorText = E_NO_SPACE_AVAILABLE + pathNames + + " on host" + getHostname(); + LOG.error(newErrorText); + LOG.error(history.toString()); + // then raise the exception + throw new DiskErrorException(newErrorText); } // Keep rolling the wheel till we get a valid path @@ -439,14 +503,20 @@ public Path getLocalPathForWrite(String pathStr, long size, dir++; } ctx.dirNumLastAccessed.set(dir); - returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite); + final Path localDir = ctx.localDirs[dir]; + returnPath = createPath(localDir, pathStr, checkWrite); if (returnPath == null) { totalAvailable -= availableOnDisk[dir]; availableOnDisk[dir] = 0; // skip this disk numDirsSearched++; + note(history, "No capacity in %s", localDir); + } else { + note(history, "Allocated file %s in %s", returnPath, localDir); } } } else { + note(history, "Requested file size is %,d; searching for a suitable directory", + size); // Start linear search with random increment if possible int randomInc = 1; if (numDirs > 2) { @@ -459,17 +529,22 @@ public Path getLocalPathForWrite(String pathStr, long size, maxCapacity = capacity; } if (capacity > size) { + final Path localDir = ctx.localDirs[dirNum]; try { - returnPath = createPath(ctx.localDirs[dirNum], pathStr, - checkWrite); + returnPath = createPath(localDir, pathStr, checkWrite); } catch (IOException e) { errorText = e.getMessage(); diskException = e; - LOG.debug("DiskException caught for dir {}", ctx.localDirs[dirNum], e); + note(history, "Exception while creating path %s: %s", localDir, errorText); + LOG.debug("DiskException caught for dir {}", localDir, e); } if (returnPath != null) { + // success ctx.getAndIncrDirNumLastAccessed(numDirsSearched); + note(history, "Allocated file %s in %s", returnPath, localDir); break; + } else { + note(history, "No capacity in %s", localDir); } } dirNum++; @@ -482,12 +557,18 @@ public Path getLocalPathForWrite(String pathStr, long size, } //no path found - String newErrorText = "Could not find any valid local directory for " + - pathStr + " with requested size " + size + - " as the max capacity in any directory is " + maxCapacity; + String hostname = getHostname(); + String newErrorText = "Could not find any valid local directory for " + + pathStr + " with requested size " + size + + " on host " + hostname + + " as the max capacity in any directory" + + " (" + pathNames + " )" + + " is " + maxCapacity; if (errorText != null) { newErrorText = newErrorText + " due to " + errorText; } + LOG.error(newErrorText); + LOG.error(history.toString()); throw new DiskErrorException(newErrorText, diskException); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java index 3693b4f0acd..841b09be3f0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java @@ -35,6 +35,8 @@ import org.junit.runners.Parameterized.Parameters; import org.junit.Test; +import static org.apache.hadoop.fs.LocalDirAllocator.E_NO_SPACE_AVAILABLE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import static org.junit.Assert.*; @@ -524,13 +526,8 @@ public void testRemoveContext() throws IOException { @Test(timeout = 30000) public void testGetLocalPathForWriteForInvalidPaths() throws Exception { conf.set(CONTEXT, " "); - try { - dirAllocator.getLocalPathForWrite("/test", conf); - fail("not throwing the exception"); - } catch (IOException e) { - assertEquals("Incorrect exception message", - "No space available in any of the local directories.", e.getMessage()); - } + intercept(IOException.class, E_NO_SPACE_AVAILABLE, () -> + dirAllocator.getLocalPathForWrite("/test", conf)); } /** @@ -567,5 +564,27 @@ public void testDirectoryRecovery() throws Throwable { // and expect to get a new file back dirAllocator.getLocalPathForWrite("file2", -1, conf); } + + /** + * Test for HADOOP-19554. LocalDirAllocator still doesn't always recover + * from directory tree deletion. + */ + @Test(timeout = 30000) + public void testDirectoryRecoveryKnownSize() throws Throwable { + String dir0 = buildBufferDir(ROOT, 0); + String subdir = dir0 + "/subdir1/subdir2"; + + conf.set(CONTEXT, subdir); + // get local path and an ancestor + final Path pathForWrite = dirAllocator.getLocalPathForWrite("file", 512, conf); + final Path ancestor = pathForWrite.getParent().getParent(); + + // delete that ancestor + localFs.delete(ancestor, true); + // and expect to get a new file back + dirAllocator.getLocalPathForWrite("file2", -1, conf); + } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org