This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0be30ef55e6 atomic move segment files to staging location prior to 
delete (#18696)
0be30ef55e6 is described below

commit 0be30ef55e65e6ccd26f5444a6f8a11268831d74
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Oct 27 11:08:18 2025 -0700

    atomic move segment files to staging location prior to delete (#18696)
    
    * atomic move segment files to staging location prior to drop to ensure no 
partial segment files if a failure during a delete operation occurs
---
 .../msq/exec/TaskDataSegmentProviderTest.java      |  4 +-
 .../segment/loading/SegmentLocalCacheManager.java  | 54 ++++++++++++++++++----
 .../SegmentLocalCacheManagerConcurrencyTest.java   |  5 +-
 3 files changed, 50 insertions(+), 13 deletions(-)

diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
index f22e7cb4fe0..56cea64175a 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
@@ -214,9 +214,9 @@ class TaskDataSegmentProviderTest extends 
InitializedNullHandlingTest
       Assertions.assertTrue(FutureUtils.getUnchecked(testFuture, false), "Test 
iteration #" + i);
     }
 
-    // Cache dir should exist, but be empty, since we've closed all holders.
+    // Cache dir should exist, but be (mostly) empty, since we've closed all 
holders.
     Assertions.assertTrue(cacheDir.exists());
-    Assertions.assertEquals(List.of(), List.of(cacheDir.list()));
+    Assertions.assertEquals(List.of("__drop"), List.of(cacheDir.list()));
   }
 
   private class TestCoordinatorClientImpl extends NoopCoordinatorClient
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 67c30e779e7..04cb3dbca98 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -71,6 +71,8 @@ import java.util.function.Supplier;
  */
 public class SegmentLocalCacheManager implements SegmentCacheManager
 {
+  private static final String DROP_PATH = "__drop";
+
   @VisibleForTesting
   static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker";
 
@@ -191,6 +193,25 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       );
     }
 
+    // clean up any dropping files
+    for (StorageLocation location : locations) {
+      File dropFiles = new File(location.getPath(), DROP_PATH);
+      if (dropFiles.exists()) {
+        final File[] dropping = dropFiles.listFiles();
+        if (dropping != null) {
+          log.debug("cleaning up[%s] segments in[%s]", dropping.length, 
dropFiles);
+          for (File droppedFile : dropping) {
+            try {
+              FileUtils.deleteDirectory(droppedFile);
+            }
+            catch (Exception e) {
+              log.warn(e, "Unable to remove dropped segment directory[%s]", 
droppedFile);
+            }
+          }
+        }
+      }
+    }
+
     final List<DataSegment> cachedSegments = new ArrayList<>();
     final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
 
@@ -717,7 +738,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
             }
           } else {
             // entry is not reserved, clean it up
-            
deleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath()));
+            
atomicMoveAndDeleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath()));
           }
         }
       }
@@ -747,13 +768,22 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
   }
 
   /**
-   * Deletes a directory and logs about it. This method should only be called 
under the lock of a {@link #segmentLocks}
+   * Performs an atomic move to a sibling {@link #DROP_PATH} directory, and 
then deletes the directory and logs about
+   * it. This method should only be called under the lock of a {@link 
#segmentLocks}.
    */
-  private static void deleteCacheEntryDirectory(final File path)
+  private static void atomicMoveAndDeleteCacheEntryDirectory(final File path)
   {
-    log.info("Deleting directory[%s]", path);
+    final File parent = path.getParentFile();
+    final File tempLocation = new File(parent, DROP_PATH);
     try {
-      FileUtils.deleteDirectory(path);
+      if (!tempLocation.exists()) {
+        FileUtils.mkdirp(tempLocation);
+      }
+      final File tempPath = new File(tempLocation, path.getName());
+      log.debug("moving[%s] to temp location[%s]", path, tempLocation);
+      Files.move(path.toPath(), tempPath.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
+      log.info("Deleting directory[%s]", path);
+      FileUtils.deleteDirectory(tempPath);
     }
     catch (Exception e) {
       log.error(e, "Unable to remove directory[%s]", path);
@@ -761,7 +791,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
   }
 
   /**
-   * Calls {@link #deleteCacheEntryDirectory(File)} and then checks parent 
path if it is empty, and recursively
+   * Calls {@link FileUtils#deleteDirectory(File)} and then checks parent path 
if it is empty, and recursively
    * continues until a non-empty directory or the base path is reached. This 
method is not thread-safe, and should only
    * be used by a single caller.
    */
@@ -771,7 +801,13 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       return;
     }
 
-    deleteCacheEntryDirectory(cacheFile);
+    try {
+      log.info("Deleting migrated segment directory[%s]", cacheFile);
+      FileUtils.deleteDirectory(cacheFile);
+    }
+    catch (Exception e) {
+      log.warn(e, "Unable to remove directory[%s]", cacheFile);
+    }
 
     File parent = cacheFile.getParentFile();
     if (parent != null) {
@@ -898,7 +934,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
                   "[%s] may be damaged. Delete all the segment files and pull 
from DeepStorage again.",
                   storageDir.getAbsolutePath()
               );
-              deleteCacheEntryDirectory(storageDir);
+              atomicMoveAndDeleteCacheEntryDirectory(storageDir);
             } else {
               needsLoad = false;
             }
@@ -973,7 +1009,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
             return;
           }
           if (storageDir != null) {
-            deleteCacheEntryDirectory(storageDir);
+            atomicMoveAndDeleteCacheEntryDirectory(storageDir);
             storageDir = null;
             location = null;
           }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 366b3cbbc33..f0ae0042364 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -716,8 +716,9 @@ class SegmentLocalCacheManagerConcurrencyTest
     Assertions.assertEquals(0, location2.getActiveWeakHolds());
     Assertions.assertTrue(4 >= location.getWeakEntryCount());
     Assertions.assertTrue(4 >= location2.getWeakEntryCount());
-    Assertions.assertTrue(4 >= location.getPath().listFiles().length);
-    Assertions.assertTrue(4 >= location2.getPath().listFiles().length);
+    // 5 because __drop path
+    Assertions.assertTrue(5 >= location.getPath().listFiles().length);
+    Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
     Assertions.assertTrue(location.getStats().getLoadCount() >= 4);
     Assertions.assertTrue(location2.getStats().getLoadCount() >= 4);
     Assertions.assertEquals(location.getStats().getEvictionCount(), 
location.getStats().getUnmountCount());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to