This is an automated email from the ASF dual-hosted git repository.
Gargi-jais11 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8322282a8aa HDDS-15524. [DiskBalancer] Container parallel moves can
overwrite pending source replica deletions (#10489).
8322282a8aa is described below
commit 8322282a8aa63c8cc70869a830a979ae688beefe
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Mon Jun 15 09:59:14 2026 +0530
HDDS-15524. [DiskBalancer] Container parallel moves can overwrite pending
source replica deletions (#10489).
---
.../diskbalancer/DiskBalancerService.java | 50 ++++++++-----
.../diskbalancer/TestDiskBalancerTask.java | 82 ++++++++++++++++++++++
2 files changed, 116 insertions(+), 16 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 5de1ee198a3..fdf90afb61b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -38,8 +38,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -112,7 +114,8 @@ public class DiskBalancerService extends BackgroundService {
private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
private Set<ContainerID> inProgressContainers;
- private ConcurrentSkipListMap<Long, Container> pendingDeletionContainers =
new ConcurrentSkipListMap();
+ private final ConcurrentSkipListMap<Long, Queue<Container>>
pendingDeletionContainers =
+ new ConcurrentSkipListMap<>();
private static FaultInjector injector;
/**
@@ -639,14 +642,16 @@ public BackgroundTaskResult call() {
}
if (moveSucceeded && newContainer != null) {
// Add current old container to pendingDeletionContainers.
- pendingDeletionContainers.put(clock.millis() + replicaDeletionDelay,
- container);
+ long deadline = clock.millis() + replicaDeletionDelay;
+ pendingDeletionContainers
+ .computeIfAbsent(deadline, ignored -> new
ConcurrentLinkedQueue<>())
+ .add(container);
ContainerLogger.logMoveSuccess(newContainer.getContainerData(),
sourceVolume,
destVolume, containerSize, Time.monotonicNow() - startTime);
}
postCall(moveSucceeded && newContainer != null, startTime);
- // pick one expired container from pendingDeletionContainers to delete
+ // Attempt to delete any pending-deletion buckets whose deadline has
elapsed.
tryCleanupOnePendingDeletionContainer();
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
@@ -686,7 +691,8 @@ private void deleteContainer(Container container) {
}
}
- private void cleanupPendingDeletionContainers() {
+ @VisibleForTesting
+ public void cleanupPendingDeletionContainers() {
// delete all pending deletion containers before stop the service
boolean ret;
do {
@@ -695,18 +701,18 @@ private void cleanupPendingDeletionContainers() {
}
private boolean tryCleanupOnePendingDeletionContainer() {
- Map.Entry<Long, Container> entry =
pendingDeletionContainers.pollFirstEntry();
- if (entry != null) {
- if (entry.getKey() <= clock.millis()) {
- // entry container is expired
- deleteContainer(entry.getValue());
- return true;
- } else {
- // put back the container
- pendingDeletionContainers.put(entry.getKey(), entry.getValue());
- }
+ // peek first, only remove when expired
+ Map.Entry<Long, Queue<Container>> entry =
pendingDeletionContainers.firstEntry();
+ if (entry == null || entry.getKey() > clock.millis()) {
+ return false;
}
- return false;
+ if (!pendingDeletionContainers.remove(entry.getKey(), entry.getValue())) {
+ return false;
+ }
+ for (Container pending : entry.getValue()) {
+ deleteContainer(pending);
+ }
+ return true;
}
public DiskBalancerInfo getDiskBalancerInfo() {
@@ -887,4 +893,16 @@ private static void pauseInjector() {
public void setReplicaDeletionDelay(long durationMills) {
this.replicaDeletionDelay = durationMills;
}
+
+ @VisibleForTesting
+ public int getPendingDeletionDeadlineCount() {
+ return pendingDeletionContainers.size();
+ }
+
+ @VisibleForTesting
+ public int getPendingDeletionQueueSize() {
+ return pendingDeletionContainers.values().stream()
+ .mapToInt(Queue::size)
+ .sum();
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
index 7e4b49ccb40..ef9dc8f57b8 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -39,6 +39,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -47,6 +48,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -58,6 +63,8 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -679,6 +686,81 @@ public void testMoveSkippedWhenContainerStateChanged(State
invalidState)
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
+ @ContainerTestVersionInfo.ContainerTest
+ public void testPendingDeletionDoesNotDropReplicasOnSameMillisecondKey(
+ ContainerTestVersionInfo versionInfo)
+ throws Exception {
+ setLayoutAndSchemaForTest(versionInfo);
+
+ long delayMs = 2_000L;
+ diskBalancerService.setReplicaDeletionDelay(delayMs);
+
+ long id1 = CONTAINER_ID;
+ long id2 = CONTAINER_ID + 1;
+ long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+
+ Container c1 = createContainer(id1, sourceVolume, State.CLOSED);
+ Container c2 = createContainer(id2, sourceVolume, State.CLOSED);
+
+ File oldDir1 = new File(c1.getContainerData().getContainerPath());
+ File oldDir2 = new File(c2.getContainerData().getContainerPath());
+ assertTrue(oldDir1.exists());
+ assertTrue(oldDir2.exists());
+
+ // Reserve dest space like the choosing policy would.
+ destVolume.incCommittedBytes(c1.getContainerData().getBytesUsed());
+ destVolume.incCommittedBytes(c2.getContainerData().getBytesUsed());
+
+ // Schedule two moves (parallelThread default is 5 in config).
+ BackgroundTaskQueue queue = diskBalancerService.getTasks();
+ assertEquals(2, queue.size());
+ DiskBalancerService.DiskBalancerTask task1 =
+ (DiskBalancerService.DiskBalancerTask) queue.poll();
+ DiskBalancerService.DiskBalancerTask task2 =
+ (DiskBalancerService.DiskBalancerTask) queue.poll();
+ assertNotNull(task1);
+ assertNotNull(task2);
+
+ // Run both moves concurrently; fixed TestClock => same deadline key.
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ try {
+ List<Future<BackgroundTaskResult>> futures =
pool.invokeAll(Arrays.asList(
+ task1::call,
+ task2::call));
+ for (Future<BackgroundTaskResult> future : futures) {
+ future.get(30, TimeUnit.SECONDS);
+ }
+ } finally {
+ pool.shutdownNow();
+ }
+
+ assertEquals(2, diskBalancerService.getMetrics().getSuccessCount());
+
+ assertEquals(1, diskBalancerService.getPendingDeletionDeadlineCount(),
+ "both moves should share one deadline key");
+ assertEquals(2, diskBalancerService.getPendingDeletionQueueSize(),
+ "both container replicas should be queued for deletion");
+
+ // Not deleted yet — delay has not elapsed.
+ assertTrue(oldDir1.exists());
+ assertTrue(oldDir2.exists());
+
+ clock.fastForward(delayMs);
+ diskBalancerService.cleanupPendingDeletionContainers();
+
+ assertEquals(0, diskBalancerService.getPendingDeletionQueueSize());
+ assertFalse(oldDir1.exists());
+ assertFalse(oldDir2.exists());
+ assertFalse(sourceVolume.getContainerIterator().hasNext(),
+ "source volume should have no containers after delayed deletion");
+ assertEquals(initialSourceUsed,
sourceVolume.getCurrentUsage().getUsedSpace(),
+ "source volume used space should return to pre-move level after old
replicas are deleted");
+
+ // New replicas live on dest volume.
+ assertTrue(new
File(containerSet.getContainer(id1).getContainerData().getContainerPath()).exists());
+ assertTrue(new
File(containerSet.getContainer(id2).getContainerData().getContainerPath()).exists());
+ }
+
private KeyValueContainer createContainer(long containerId, HddsVolume vol,
State state)
throws IOException {
KeyValueContainerData containerData = new KeyValueContainerData(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]