This is an automated email from the ASF dual-hosted git repository.

Gargi-jais11 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8322282a8aa HDDS-15524. [DiskBalancer] Container parallel moves can 
overwrite pending source replica deletions (#10489).
8322282a8aa is described below

commit 8322282a8aa63c8cc70869a830a979ae688beefe
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Mon Jun 15 09:59:14 2026 +0530

    HDDS-15524. [DiskBalancer] Container parallel moves can overwrite pending 
source replica deletions (#10489).
---
 .../diskbalancer/DiskBalancerService.java          | 50 ++++++++-----
 .../diskbalancer/TestDiskBalancerTask.java         | 82 ++++++++++++++++++++++
 2 files changed, 116 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 5de1ee198a3..fdf90afb61b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -38,8 +38,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -112,7 +114,8 @@ public class DiskBalancerService extends BackgroundService {
   private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
 
   private Set<ContainerID> inProgressContainers;
-  private ConcurrentSkipListMap<Long, Container> pendingDeletionContainers = 
new ConcurrentSkipListMap();
+  private final ConcurrentSkipListMap<Long, Queue<Container>> 
pendingDeletionContainers =
+      new ConcurrentSkipListMap<>();
   private static FaultInjector injector;
 
   /**
@@ -639,14 +642,16 @@ public BackgroundTaskResult call() {
         }
         if (moveSucceeded && newContainer != null) {
           // Add current old container to pendingDeletionContainers.
-          pendingDeletionContainers.put(clock.millis() + replicaDeletionDelay,
-              container);
+          long deadline = clock.millis() + replicaDeletionDelay;
+          pendingDeletionContainers
+              .computeIfAbsent(deadline, ignored -> new 
ConcurrentLinkedQueue<>())
+              .add(container);
           ContainerLogger.logMoveSuccess(newContainer.getContainerData(), 
sourceVolume,
               destVolume, containerSize, Time.monotonicNow() - startTime);
         }
         postCall(moveSucceeded && newContainer != null, startTime);
 
-        // pick one expired container from pendingDeletionContainers to delete
+        // Attempt to delete any pending-deletion buckets whose deadline has 
elapsed.
         tryCleanupOnePendingDeletionContainer();
       }
       return BackgroundTaskResult.EmptyTaskResult.newResult();
@@ -686,7 +691,8 @@ private void deleteContainer(Container container) {
     }
   }
 
-  private void cleanupPendingDeletionContainers() {
+  @VisibleForTesting
+  public void cleanupPendingDeletionContainers() {
     // delete all pending deletion containers before stop the service
     boolean ret;
     do {
@@ -695,18 +701,18 @@ private void cleanupPendingDeletionContainers() {
   }
 
   private boolean tryCleanupOnePendingDeletionContainer() {
-    Map.Entry<Long, Container> entry = 
pendingDeletionContainers.pollFirstEntry();
-    if (entry != null) {
-      if (entry.getKey() <= clock.millis()) {
-        // entry container is expired
-        deleteContainer(entry.getValue());
-        return true;
-      } else {
-        // put back the container
-        pendingDeletionContainers.put(entry.getKey(), entry.getValue());
-      }
+    // peek first, only remove when expired
+    Map.Entry<Long, Queue<Container>> entry = 
pendingDeletionContainers.firstEntry();
+    if (entry == null || entry.getKey() > clock.millis()) {
+      return false;
     }
-    return false;
+    if (!pendingDeletionContainers.remove(entry.getKey(), entry.getValue())) {
+      return false;
+    }
+    for (Container pending : entry.getValue()) {
+      deleteContainer(pending);
+    }
+    return true;
   }
 
   public DiskBalancerInfo getDiskBalancerInfo() {
@@ -887,4 +893,16 @@ private static void pauseInjector() {
   public void setReplicaDeletionDelay(long durationMills) {
     this.replicaDeletionDelay = durationMills;
   }
+
+  @VisibleForTesting
+  public int getPendingDeletionDeadlineCount() {
+    return pendingDeletionContainers.size();
+  }
+
+  @VisibleForTesting
+  public int getPendingDeletionQueueSize() {
+    return pendingDeletionContainers.values().stream()
+        .mapToInt(Queue::size)
+        .sum();
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
index 7e4b49ccb40..ef9dc8f57b8 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -39,6 +39,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -47,6 +48,10 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -58,6 +63,8 @@
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -679,6 +686,81 @@ public void testMoveSkippedWhenContainerStateChanged(State 
invalidState)
     assertEquals(initialSourceDelta, 
diskBalancerService.getDeltaSizes().get(sourceVolume));
   }
 
+  @ContainerTestVersionInfo.ContainerTest
+  public void testPendingDeletionDoesNotDropReplicasOnSameMillisecondKey(
+      ContainerTestVersionInfo versionInfo)
+      throws Exception {
+    setLayoutAndSchemaForTest(versionInfo);
+
+    long delayMs = 2_000L;
+    diskBalancerService.setReplicaDeletionDelay(delayMs);
+
+    long id1 = CONTAINER_ID;
+    long id2 = CONTAINER_ID + 1;
+    long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+
+    Container c1 = createContainer(id1, sourceVolume, State.CLOSED);
+    Container c2 = createContainer(id2, sourceVolume, State.CLOSED);
+
+    File oldDir1 = new File(c1.getContainerData().getContainerPath());
+    File oldDir2 = new File(c2.getContainerData().getContainerPath());
+    assertTrue(oldDir1.exists());
+    assertTrue(oldDir2.exists());
+
+    // Reserve dest space like the choosing policy would.
+    destVolume.incCommittedBytes(c1.getContainerData().getBytesUsed());
+    destVolume.incCommittedBytes(c2.getContainerData().getBytesUsed());
+
+    // Schedule two moves (parallelThread default is 5 in config).
+    BackgroundTaskQueue queue = diskBalancerService.getTasks();
+    assertEquals(2, queue.size());
+    DiskBalancerService.DiskBalancerTask task1 =
+        (DiskBalancerService.DiskBalancerTask) queue.poll();
+    DiskBalancerService.DiskBalancerTask task2 =
+        (DiskBalancerService.DiskBalancerTask) queue.poll();
+    assertNotNull(task1);
+    assertNotNull(task2);
+
+    // Run both moves concurrently; fixed TestClock => same deadline key.
+    ExecutorService pool = Executors.newFixedThreadPool(2);
+    try {
+      List<Future<BackgroundTaskResult>> futures = 
pool.invokeAll(Arrays.asList(
+          task1::call,
+          task2::call));
+      for (Future<BackgroundTaskResult> future : futures) {
+        future.get(30, TimeUnit.SECONDS);
+      }
+    } finally {
+      pool.shutdownNow();
+    }
+
+    assertEquals(2, diskBalancerService.getMetrics().getSuccessCount());
+
+    assertEquals(1, diskBalancerService.getPendingDeletionDeadlineCount(),
+        "both moves should share one deadline key");
+    assertEquals(2, diskBalancerService.getPendingDeletionQueueSize(),
+        "both container replicas should be queued for deletion");
+
+    // Not deleted yet — delay has not elapsed.
+    assertTrue(oldDir1.exists());
+    assertTrue(oldDir2.exists());
+
+    clock.fastForward(delayMs);
+    diskBalancerService.cleanupPendingDeletionContainers();
+
+    assertEquals(0, diskBalancerService.getPendingDeletionQueueSize());
+    assertFalse(oldDir1.exists());
+    assertFalse(oldDir2.exists());
+    assertFalse(sourceVolume.getContainerIterator().hasNext(),
+        "source volume should have no containers after delayed deletion");
+    assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace(),
+        "source volume used space should return to pre-move level after old 
replicas are deleted");
+
+    // New replicas live on dest volume.
+    assertTrue(new 
File(containerSet.getContainer(id1).getContainerData().getContainerPath()).exists());
+    assertTrue(new 
File(containerSet.getContainer(id2).getContainerData().getContainerPath()).exists());
+  }
+
   private KeyValueContainer createContainer(long containerId, HddsVolume vol, 
State state)
       throws IOException {
     KeyValueContainerData containerData = new KeyValueContainerData(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to