This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new 6ab87743145 HDDS-13602. Delay delete source container replica to avoid 
read failure due to read thread holds the old containerData (#8965)
6ab87743145 is described below

commit 6ab87743145b5189c2f4a859d458c54e56f2a518
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Dec 2 10:19:48 2025 +0800

    HDDS-13602. Delay delete source container replica to avoid read failure due 
to read thread holds the old containerData (#8965)
---
 .../diskbalancer/DiskBalancerService.java          | 63 ++++++++++++++++++----
 .../diskbalancer/TestDiskBalancerTask.java         | 35 ++++++++++--
 2 files changed, 85 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 04bd959eb4d..37984848e9f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -32,6 +32,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -85,6 +86,7 @@ public class DiskBalancerService extends BackgroundService {
       LoggerFactory.getLogger(DiskBalancerService.class);
 
   public static final String DISK_BALANCER_DIR = "diskBalancer";
+  private static long replicaDeletionDelayMills = 60 * 60 * 1000L; // 60 
minutes
 
   private OzoneContainer ozoneContainer;
   private final ConfigurationSource conf;
@@ -104,6 +106,7 @@ public class DiskBalancerService extends BackgroundService {
   private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
 
   private Set<ContainerID> inProgressContainers;
+  private ConcurrentSkipListMap<Long, Container> pendingDeletionContainers = 
new ConcurrentSkipListMap();
   private static FaultInjector injector;
 
   /**
@@ -339,12 +342,14 @@ public BackgroundTaskQueue getTasks() {
 
     if (this.operationalState == DiskBalancerRunningStatus.STOPPED ||
         this.operationalState == DiskBalancerRunningStatus.PAUSED) {
+      cleanupPendingDeletionContainers();
       return queue;
     }
     metrics.incrRunningLoopCount();
 
     if (shouldDelay()) {
       metrics.incrIdleLoopExceedsBandwidthCount();
+      cleanupPendingDeletionContainers();
       return queue;
     }
 
@@ -391,6 +396,7 @@ public BackgroundTaskQueue getTasks() {
         }
       }
       metrics.incrIdleLoopNoAvailableVolumePairCount();
+      cleanupPendingDeletionContainers();
     }
 
     return queue;
@@ -554,20 +560,15 @@ public BackgroundTaskResult call() {
           container.readUnlock();
         }
         if (moveSucceeded) {
-          // Remove the old container from the KeyValueContainerUtil.
-          try {
-            KeyValueContainerUtil.removeContainer(
-                (KeyValueContainerData) container.getContainerData(), conf);
-            container.delete();
-            
container.getContainerData().getVolume().decrementUsedSpace(containerSize);
-          } catch (IOException ex) {
-            LOG.warn("Failed to move or delete old container {} after it's 
marked as DELETED. " +
-                    "It will be handled by background scanners.", containerId, 
ex);
-          }
+          // Add current old container to pendingDeletionContainers.
+          pendingDeletionContainers.put(System.currentTimeMillis() + 
replicaDeletionDelayMills, container);
           ContainerLogger.logMoveSuccess(containerId, sourceVolume,
               destVolume, containerSize, Time.monotonicNow() - startTime);
         }
         postCall(moveSucceeded, startTime);
+
+        // pick one expired container from pendingDeletionContainers to delete
+        tryCleanupOnePendingDeletionContainer();
       }
       return BackgroundTaskResult.EmptyTaskResult.newResult();
     }
@@ -593,6 +594,43 @@ private void postCall(boolean success, long startTime) {
     }
   }
 
+  private void deleteContainer(Container container) {
+    try {
+      KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+      KeyValueContainerUtil.removeContainer(containerData, conf);
+      container.delete();
+      
container.getContainerData().getVolume().decrementUsedSpace(containerData.getBytesUsed());
+      LOG.info("Deleted expired container {} after delay {} ms.",
+          containerData.getContainerID(), replicaDeletionDelayMills);
+    } catch (IOException ex) {
+      LOG.warn("Failed to delete old container {} after it's marked as 
DELETED. " +
+          "It will be handled by background scanners.", 
container.getContainerData().getContainerID(), ex);
+    }
+  }
+
+  private void cleanupPendingDeletionContainers() {
+    // delete all pending deletion containers before stop the service
+    boolean ret;
+    do {
+      ret = tryCleanupOnePendingDeletionContainer();
+    } while (ret);
+  }
+
+  private boolean tryCleanupOnePendingDeletionContainer() {
+    Map.Entry<Long, Container> entry = 
pendingDeletionContainers.pollFirstEntry();
+    if (entry != null) {
+      if (entry.getKey() <= System.currentTimeMillis()) {
+        // entry container is expired
+        deleteContainer(entry.getValue());
+        return true;
+      } else {
+        // put back the container
+        pendingDeletionContainers.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return false;
+  }
+
   public DiskBalancerInfo getDiskBalancerInfo() {
     ImmutableList<HddsVolume> immutableVolumeSet = 
DiskBalancerVolumeCalculation.getImmutableVolumeSet(volumeSet);
 
@@ -737,4 +775,9 @@ public void shutdown() {
   public static void setInjector(FaultInjector instance) {
     injector = instance;
   }
+
+  @VisibleForTesting
+  public static void setReplicaDeletionDelayMills(long durationMills) {
+    replicaDeletionDelayMills = durationMills;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
index 60fb52eba10..76a3992658b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -19,6 +19,7 @@
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
 import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static 
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DISK_BALANCER_DIR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -243,7 +244,7 @@ public void setup() throws Exception {
     conf.setFromObject(diskBalancerConfiguration);
     diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer,
         100, conf, 1);
-
+    DiskBalancerService.setReplicaDeletionDelayMills(0);
     KeyValueContainer.setInjector(kvFaultInjector);
   }
 
@@ -320,7 +321,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo 
versionInfo)
 
     // verify temp container directory doesn't exist before task execution
     Path tempContainerDir = destVolume.getTmpDir().toPath()
-        
.resolve(DiskBalancerService.DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
+        .resolve(DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
     File dir = new File(String.valueOf(tempContainerDir));
     assertFalse(dir.exists(), "Temp container directory should not exist 
before task starts");
 
@@ -370,7 +371,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo 
versionInfo)
         0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
     String oldContainerPath = container.getContainerData().getContainerPath();
     Path tempDir = destVolume.getTmpDir().toPath()
-        .resolve(DiskBalancerService.DISK_BALANCER_DIR)
+        .resolve(DISK_BALANCER_DIR)
         .resolve(String.valueOf(CONTAINER_ID));
     assertFalse(Files.exists(tempDir), "Temp container directory should not 
exist");
     Path destDirPath = Paths.get(
@@ -573,6 +574,34 @@ public void 
testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio
     assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
   }
 
+  @ContainerTestVersionInfo.ContainerTest
+  public void testOldReplicaDelayedDeletion(ContainerTestVersionInfo 
versionInfo)
+      throws IOException, InterruptedException {
+    setLayoutAndSchemaForTest(versionInfo);
+    long delay = 2000L; // 2 second delay
+    DiskBalancerService.setReplicaDeletionDelayMills(delay);
+
+    Container container = createContainer(CONTAINER_ID, sourceVolume, 
State.CLOSED);
+    KeyValueContainerData keyValueContainerData = (KeyValueContainerData) 
container.getContainerData();
+    File oldContainerDir = new File(keyValueContainerData.getContainerPath());
+    assertTrue(oldContainerDir.exists());
+
+    DiskBalancerService.DiskBalancerTask task = getTask();
+    task.call();
+    assertEquals(State.DELETED, container.getContainerState());
+    // Verify that the old container is not deleted immediately
+    assertTrue(oldContainerDir.exists());
+
+    // create another container to trigger the deletion of old replicas
+    createContainer(CONTAINER_ID + 1, sourceVolume, State.CLOSED);
+    task = getTask();
+    // Wait for the delay to pass
+    Thread.sleep(delay);
+    task.call();
+    // Verify that the old container is deleted
+    assertFalse(oldContainerDir.exists());
+  }
+
   private KeyValueContainer createContainer(long containerId, HddsVolume vol, 
State state)
       throws IOException {
     KeyValueContainerData containerData = new KeyValueContainerData(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to