This is an automated email from the ASF dual-hosted git repository.

Gargi-jais11 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a60b0236413 HDDS-15066. Read-Write Lock race leave stale references to 
container creating orphan replicas (#10109).
a60b0236413 is described below

commit a60b02364135a5a24ea0797fa5971bcd8992397a
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Thu May 14 16:11:49 2026 +0530

    HDDS-15066. Read-Write Lock race leave stale references to container 
creating orphan replicas (#10109).
---
 .../ozone/container/common/impl/ContainerSet.java  |  54 ++
 .../commandhandler/DeleteBlocksCommandHandler.java |  28 +-
 .../diskbalancer/DiskBalancerService.java          |  41 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 107 ++--
 .../statemachine/background/BlockDeletingTask.java |  52 +-
 .../container/common/impl/TestContainerSet.java    |  98 ++++
 .../TestDeleteBlocksCommandHandler.java            |  89 +++-
 ...tDiskBalancerWithConcurrentBackgroundTasks.java | 591 +++++++++++++++++++++
 .../container/keyvalue/TestKeyValueHandler.java    |  29 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |  14 +-
 10 files changed, 1001 insertions(+), 102 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 13bf7789929..d122dfb587f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -63,6 +63,12 @@ public class ContainerSet implements Iterable<Container<?>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerSet.class);
 
+  /**
+   * Max attempts to acquire {@link Container#writeLock()} while verifying 
this set's id → container mapping
+   * is same (e.g. another thread may {@link #updateContainer} / DiskBalancer 
swap the instance).
+   */
+  private static final int MAX_CONTAINER_MAP_SWAP_RETRIES = 5;
+
   private final ConcurrentSkipListMap<Long, Container<?>> containerMap = new
       ConcurrentSkipListMap<>();
   private final ConcurrentSkipListSet<Long> missingContainerSet =
@@ -279,6 +285,54 @@ public Container<?> getContainer(long containerId) {
     return containerMap.get(containerId);
   }
 
+  /**
+   * Returns the max retry for a container map swap while acquiring container 
lock.
+   * @return max retry count
+   */
+  public static int maxContainerMapSwapRetries() {
+    return MAX_CONTAINER_MAP_SWAP_RETRIES;
+  }
+
+  /**
+   * Locks the container mapped to {@code containerId} for write, and verifies 
that the instance locked is still
+   * the one stored in this set. If the mapping is swapped, unlocks and 
retries up to
+   * {@link #maxContainerMapSwapRetries()} times, then returns {@code null}.
+   *
+   * @return the locked container, or {@code null} if the mapping could not be 
stabilized after all retries
+   * @throws StorageContainerException with {@code CONTAINER_NOT_FOUND}
+   */
+  @Nullable
+  public Container<?> getContainerWithWriteLock(long containerId) throws 
StorageContainerException {
+    for (int retry = 0; retry < MAX_CONTAINER_MAP_SWAP_RETRIES; retry++) {
+      Container<?> candidate = getContainer(containerId);
+      if (candidate == null) {
+        throw new StorageContainerException(
+            "Container " + containerId + " not found in ContainerSet.",
+            ContainerProtos.Result.CONTAINER_NOT_FOUND);
+      }
+      candidate.writeLock();
+      Container<?> current = getContainer(containerId);
+      if (current == null) {
+        candidate.writeUnlock();
+        throw new StorageContainerException(
+            "Container " + containerId + " not found in ContainerSet.",
+            ContainerProtos.Result.CONTAINER_NOT_FOUND);
+      }
+      if (current != candidate) {
+        candidate.writeUnlock();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Container {} mapping changed during lock acquisition 
(attempt {}); retrying.",
+              containerId, retry);
+        }
+        continue;
+      }
+      return candidate;
+    }
+    LOG.warn("Container {} mapping kept changing after {} attempts; giving 
up.",
+        containerId, MAX_CONTAINER_MAP_SWAP_RETRIES);
+    return null;
+  }
+
   /**
    * Removes container from both memory and database. This should be used when 
the containerData on disk has been
    * removed completely from the node.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 48d5053eb76..45ea5bffdec 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -303,19 +303,31 @@ public DeleteBlockTransactionExecutionResult call() {
           if (keyValueContainer.
               writeLockTryLock(tryLockTimeoutMs, TimeUnit.MILLISECONDS)) {
             try {
-              String schemaVersion = containerData
-                  .getSupportedSchemaVersionOrDefault();
-              if (getSchemaHandlers().containsKey(schemaVersion)) {
-                schemaHandlers.get(schemaVersion).handle(containerData, tx);
+              // Re-fetch the container after acquiring the lock. DiskBalancer 
may have relocated
+              // this container to a different disk while we waited — in that 
case, the container
+              // object in ContainerSet has changed and containerData points 
to the old replica.
+              Container<?> current = containerSet.getContainer(containerId);
+              if (current == null || current.getContainerData() != 
containerData) {
+                LOG.debug("DeleteBlocks: containerData for container {} is 
stale "
+                    + ", Will retry on the new replica.",
+                    containerId);
+                lockAcquisitionFailed = true;
+                txResultBuilder.setContainerID(containerId).setSuccess(false);
               } else {
-                throw new UnsupportedOperationException(
-                    "Only schema version 1,2,3 are supported.");
+                String schemaVersion = containerData
+                    .getSupportedSchemaVersionOrDefault();
+                if (getSchemaHandlers().containsKey(schemaVersion)) {
+                  schemaHandlers.get(schemaVersion).handle(containerData, tx);
+                } else {
+                  throw new UnsupportedOperationException(
+                      "Only schema version 1,2,3 are supported.");
+                }
+                txResultBuilder.setContainerID(containerId)
+                    .setSuccess(true);
               }
             } finally {
               keyValueContainer.writeUnlock();
             }
-            txResultBuilder.setContainerID(containerId)
-                .setSuccess(true);
           } else {
             lockAcquisitionFailed = true;
             txResultBuilder.setContainerID(containerId)
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index bf22d811ff2..b1bac677178 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -512,19 +512,19 @@ public BackgroundTaskResult call() {
         return BackgroundTaskResult.EmptyTaskResult.newResult();
       }
 
-      // Double check container state before acquiring lock to start move 
process.
-      // Container state may have changed after selection.
-      State containerState = container.getContainerData().getState();
-      if (!movableContainerStates.contains(containerState)) {
-        LOG.warn("Container {} is in {} state, skipping move process.", 
containerId, containerState);
-        postCall(false, startTime);
-        return BackgroundTaskResult.EmptyTaskResult.newResult();
-      }
-
       // hold read lock on the container first, to avoid other threads to 
update the container state,
       // such as block deletion.
       container.readLock();
       try {
+        // Double check container state after acquiring lock to start move 
process.
+        // Container state may have changed after selection.
+        State containerState = container.getContainerData().getState();
+        if (!movableContainerStates.contains(containerState)) {
+          LOG.warn("Container {} is in {} state, skipping move process.", 
containerId, containerState);
+          moveSucceeded = false;
+          return BackgroundTaskResult.EmptyTaskResult.newResult();
+        }
+
         // Step 1: Copy container to new Volume's tmp Dir
         diskBalancerTmpDir = getDiskBalancerTmpDir(destVolume)
             .resolve(String.valueOf(containerId));
@@ -580,6 +580,10 @@ public BackgroundTaskResult call() {
         // old caller can still hold the old Container object.
         ozoneContainer.getContainerSet().updateContainer(newContainer);
         destVolume.incrementUsedSpace(containerSize);
+
+        // Test injector: ContainerSet now references newContainer while this 
thread still holds
+        // readLock on the old replica.
+        pauseInjector();
         // Mark old container as DELETED and persist state.
         // markContainerForDelete require writeLock, so release readLock first
         container.readUnlock();
@@ -597,13 +601,7 @@ public BackgroundTaskResult call() {
         metrics.incrSuccessBytes(containerSize);
         totalBalancedBytes.addAndGet(containerSize);
       } catch (IOException e) {
-        if (injector != null) {
-          try {
-            injector.pause();
-          } catch (IOException ex) {
-            // do nothing
-          }
-        }
+        pauseInjector();
         moveSucceeded = false;
         LOG.warn("Failed to move container {}", containerId, e);
         if (diskBalancerTmpDir != null) {
@@ -861,6 +859,17 @@ public static void setInjector(FaultInjector instance) {
     injector = instance;
   }
 
+  // call FaultInjector#pause when an injector is registered; ignore 
IOException.
+  private static void pauseInjector() {
+    if (injector != null) {
+      try {
+        injector.pause();
+      } catch (IOException ex) {
+        // do nothing
+      }
+    }
+  }
+
   @VisibleForTesting
   public void setReplicaDeletionDelay(long durationMills) {
     this.replicaDeletionDelay = durationMills;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 6f93bcef4fd..474b7458644 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1511,10 +1511,17 @@ private ContainerProtos.ContainerChecksumInfo 
updateAndGetContainerChecksum(Cont
   @Override
   public void markContainerUnhealthy(Container container, ScanResult reason)
       throws IOException {
-    container.writeLock();
     long containerID = container.getContainerData().getContainerID();
+    Container<?> lockedContainer = 
containerSet.getContainerWithWriteLock(containerID);
+    if (lockedContainer == null) {
+      // null means container retries exhausted ;
+      // container not-found throws StorageContainerException.
+      LOG.warn("Exceeded {} attempts locking live container {}; skipping 
markContainerUnhealthy.",
+          ContainerSet.maxContainerMapSwapRetries(), containerID);
+      return;
+    }
     try {
-      if (container.getContainerState() == State.UNHEALTHY) {
+      if (lockedContainer.getContainerState() == State.UNHEALTHY) {
         LOG.debug("Call to mark already unhealthy container {} as unhealthy",
             containerID);
         return;
@@ -1522,25 +1529,25 @@ public void markContainerUnhealthy(Container container, 
ScanResult reason)
       // If the volume is unhealthy, no action is needed. The container has
       // already been discarded and SCM notified. Once a volume is failed, it
       // cannot be restored without a restart.
-      HddsVolume containerVolume = container.getContainerData().getVolume();
+      HddsVolume containerVolume = 
lockedContainer.getContainerData().getVolume();
       if (containerVolume.isFailed()) {
         LOG.debug("Ignoring unhealthy container {} detected on an " +
             "already failed volume {}", containerID, containerVolume);
         return;
       }
-      container.markContainerUnhealthy();
+      lockedContainer.markContainerUnhealthy();
     } catch (StorageContainerException ex) {
       LOG.warn("Unexpected error while marking container {} unhealthy",
           containerID, ex);
     } finally {
-      container.writeUnlock();
+      lockedContainer.writeUnlock();
     }
-    updateContainerChecksumFromMetadataIfNeeded(container);
+    updateContainerChecksumFromMetadataIfNeeded(lockedContainer);
     // Even if the container file is corrupted/missing and the unhealthy
     // update fails, the unhealthy state is kept in memory and sent to
     // SCM. Write a corresponding entry to the container log as well.
-    ContainerLogger.logUnhealthy(container.getContainerData(), reason);
-    sendICR(container);
+    ContainerLogger.logUnhealthy(lockedContainer.getContainerData(), reason);
+    sendICR(lockedContainer);
   }
 
   @Override
@@ -1574,17 +1581,24 @@ public void quasiCloseContainer(Container container, 
String reason)
   @Override
   public void closeContainer(Container container)
       throws IOException {
-    container.writeLock();
+    long containerID = container.getContainerData().getContainerID();
+    Container<?> lockedContainer = 
containerSet.getContainerWithWriteLock(containerID);
+    if (lockedContainer == null) {
+      // null means container locking retries exhausted ;
+      // container not-found throws StorageContainerException.
+      LOG.warn("Exceeded {} attempts locking live container {}; skipping 
closeContainer.",
+          ContainerSet.maxContainerMapSwapRetries(), containerID);
+      return;
+    }
     try {
-      final State state = container.getContainerState();
+      final State state = lockedContainer.getContainerState();
       // Close call is idempotent.
       if (state == State.CLOSED) {
         return;
       }
       if (state == State.UNHEALTHY) {
         throw new StorageContainerException(
-            "Cannot close container #" + container.getContainerData()
-                .getContainerID() + " while in " + state + " state.",
+            "Cannot close container #" + containerID + " while in " + state + 
" state.",
             ContainerProtos.Result.CONTAINER_UNHEALTHY);
       }
       // The container has to be either in CLOSING or in QUASI_CLOSED state.
@@ -1593,16 +1607,15 @@ public void closeContainer(Container container)
             state == State.INVALID ? INVALID_CONTAINER_STATE :
                 CONTAINER_INTERNAL_ERROR;
         throw new StorageContainerException(
-            "Cannot close container #" + container.getContainerData()
-                .getContainerID() + " while in " + state + " state.", error);
+            "Cannot close container #" + containerID + " while in " + state + 
" state.", error);
       }
-      container.close();
+      lockedContainer.close();
     } finally {
-      container.writeUnlock();
+      lockedContainer.writeUnlock();
     }
-    updateContainerChecksumFromMetadataIfNeeded(container);
-    ContainerLogger.logClosed(container.getContainerData());
-    sendICR(container);
+    updateContainerChecksumFromMetadataIfNeeded(lockedContainer);
+    ContainerLogger.logClosed(lockedContainer.getContainerData());
+    sendICR(lockedContainer);
   }
 
   @Override
@@ -2293,24 +2306,33 @@ private boolean logBlocksFoundOnDisk(Container 
container) throws IOException {
 
   private void deleteInternal(Container container, boolean force)
       throws StorageContainerException {
+    final long containerId = container.getContainerData().getContainerID();
     long startTime = clock.millis();
-    container.writeLock();
+    Container<?> containerLocked = 
containerSet.getContainerWithWriteLock(containerId);
+    if (containerLocked == null) {
+      // null means container locking retries exhausted ;
+      // container not-found throws StorageContainerException.
+      LOG.info("Exceeded {} retries to lock container {}; Now SCM will resend 
for delete with " +
+              "the current container replica", 
ContainerSet.maxContainerMapSwapRetries(),
+          containerId);
+      return;
+    }
     try {
-      final ContainerData data = container.getContainerData();
-      if (container.getContainerData().getVolume().isFailed()) {
+      final ContainerData data = containerLocked.getContainerData();
+      if (containerLocked.getContainerData().getVolume().isFailed()) {
         // if the  volume in which the container resides fails
         // don't attempt to delete/move it. When a volume fails,
         // failedVolumeListener will pick it up and clear the container
         // from the container set.
         LOG.info("Delete container issued on containerID {} which is in a " +
-                "failed volume. Skipping", container.getContainerData()
+                "failed volume. Skipping", containerLocked.getContainerData()
             .getContainerID());
         return;
       }
       // If force is false, we check container state.
       if (!force) {
         // Check if container is open
-        if (container.getContainerData().isOpen()) {
+        if (containerLocked.getContainerData().isOpen()) {
           throw new StorageContainerException(
               "Deletion of Open Container is not allowed.",
               DELETE_ON_OPEN_CONTAINER);
@@ -2319,14 +2341,14 @@ private void deleteInternal(Container container, 
boolean force)
         // If the container is not empty, it should not be deleted unless the
         // container is being forcefully deleted (which happens when
         // container is unhealthy or over-replicated).
-        if (container.hasBlocks()) {
+        if (containerLocked.hasBlocks()) {
           metrics.incContainerDeleteFailedNonEmpty();
           LOG.error("Received container deletion command for non-empty {}: 
{}", data, data.getStatistics());
           // blocks table for future debugging.
           // List blocks
-          logBlocksIfNonZero(container);
+          logBlocksIfNonZero(containerLocked);
           // Log chunks
-          logBlocksFoundOnDisk(container);
+          logBlocksFoundOnDisk(containerLocked);
           throw new StorageContainerException("Non-force deletion of " +
               "non-empty container is not allowed.",
               DELETE_ON_NON_EMPTY_CONTAINER);
@@ -2334,9 +2356,9 @@ private void deleteInternal(Container container, boolean 
force)
       } else {
         metrics.incContainersForceDelete();
       }
-      if (container.getContainerData() instanceof KeyValueContainerData) {
+      if (containerLocked.getContainerData() instanceof KeyValueContainerData) 
{
         KeyValueContainerData keyValueContainerData =
-            (KeyValueContainerData) container.getContainerData();
+            (KeyValueContainerData) containerLocked.getContainerData();
         HddsVolume hddsVolume = keyValueContainerData.getVolume();
 
         // Steps to delete
@@ -2350,21 +2372,20 @@ private void deleteInternal(Container container, 
boolean force)
           if (waitTime > maxDeleteLockWaitMs) {
             LOG.warn("An attempt to delete container {} took {} ms acquiring 
locks and pre-checks. " +
                     "The delete has been skipped and should be retried 
automatically by SCM.",
-                container.getContainerData().getContainerID(), waitTime);
+                containerLocked.getContainerData().getContainerID(), waitTime);
             return;
           }
-          container.markContainerForDelete();
-          long containerId = container.getContainerData().getContainerID();
+          containerLocked.markContainerForDelete();
           containerSet.removeContainer(containerId);
-          ContainerLogger.logDeleted(container.getContainerData(), force);
+          ContainerLogger.logDeleted(containerLocked.getContainerData(), 
force);
           KeyValueContainerUtil.removeContainer(keyValueContainerData, conf);
         } catch (IOException ioe) {
           LOG.error("Failed to move container under " + hddsVolume
               .getDeletedContainerDir());
           String errorMsg =
-              "Failed to move container" + container.getContainerData()
+              "Failed to move container" + containerLocked.getContainerData()
                   .getContainerID();
-          triggerVolumeScanAndThrowException(container, errorMsg,
+          triggerVolumeScanAndThrowException(containerLocked, errorMsg,
               CONTAINER_INTERNAL_ERROR);
         }
       }
@@ -2374,20 +2395,20 @@ private void deleteInternal(Container container, 
boolean force)
       // All other IO Exceptions should be treated as if the container is not
       // empty as a defensive check.
       LOG.error("Could not determine if the container {} is empty",
-          container.getContainerData().getContainerID(), e);
+          containerLocked.getContainerData().getContainerID(), e);
       String errorMsg =
-          "Failed to read container dir" + container.getContainerData()
+          "Failed to read container dir" + containerLocked.getContainerData()
               .getContainerID();
-      triggerVolumeScanAndThrowException(container, errorMsg,
+      triggerVolumeScanAndThrowException(containerLocked, errorMsg,
           CONTAINER_INTERNAL_ERROR);
     } finally {
-      container.writeUnlock();
+      containerLocked.writeUnlock();
     }
     // Avoid holding write locks for disk operations
-    sendICR(container);
-    long bytesUsed = container.getContainerData().getBytesUsed();
-    HddsVolume volume = container.getContainerData().getVolume();
-    container.delete();
+    sendICR(containerLocked);
+    long bytesUsed = containerLocked.getContainerData().getBytesUsed();
+    HddsVolume volume = containerLocked.getContainerData().getVolume();
+    containerLocked.delete();
     volume.decrementUsedSpace(bytesUsed);
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
index 1f66cad476f..a4ab1a7ef0e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import 
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
 import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -67,7 +68,7 @@ public class BlockDeletingTask implements BackgroundTask {
 
   private final BlockDeletingServiceMetrics metrics;
   private final int priority;
-  private final KeyValueContainerData containerData;
+  private KeyValueContainerData containerData;
   private long blocksToDelete;
   private final OzoneContainer ozoneContainer;
   private final ConfigurationSource conf;
@@ -139,24 +140,39 @@ public BackgroundTaskResult call() throws Exception {
 
   private ContainerBackgroundTaskResult handleDeleteTask() throws Exception {
     ContainerBackgroundTaskResult crr;
-    final Container container = ozoneContainer.getContainerSet()
-        .getContainer(containerData.getContainerID());
-    container.writeLock();
-    File dataDir = new File(containerData.getChunksPath());
-    long startTime = Time.monotonicNow();
-    // Scan container's db and get list of under deletion blocks
-    try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
-      if (containerData.hasSchema(SCHEMA_V1)) {
-        crr = deleteViaSchema1(meta, container, dataDir, startTime);
-      } else if (containerData.hasSchema(SCHEMA_V2)) {
-        crr = deleteViaSchema2(meta, container, dataDir, startTime);
-      } else if (containerData.hasSchema(SCHEMA_V3)) {
-        crr = deleteViaSchema3(meta, container, dataDir, startTime);
-      } else {
-        throw new UnsupportedOperationException(
-            "Only schema version 1,2,3 are supported.");
+    ContainerSet cs = ozoneContainer.getContainerSet();
+    final long containerId = containerData.getContainerID();
+
+    Container<?> container = cs.getContainerWithWriteLock(containerId);
+    if (container == null) {
+      // null means container locking retries exhausted ;
+      // container not-found throws StorageContainerException.
+      LOG.info("Exceeded {} retries to lock container {}; Now DN will resend 
for delete with " +
+              "the current container replica", 
ContainerSet.maxContainerMapSwapRetries(),
+          containerId);
+      return new ContainerBackgroundTaskResult();
+    }
+    try {
+      // Always use ContainerData from the locked live Container so paths / 
RocksDB locations match deleteViaSchema*.
+      containerData = (KeyValueContainerData) container.getContainerData();
+
+      File dataDir = new File(containerData.getChunksPath());
+      long startTime = Time.monotonicNow();
+      // Scan container's db and get list of under deletion blocks
+      try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
+        if (containerData.hasSchema(SCHEMA_V1)) {
+          crr = deleteViaSchema1(meta, container, dataDir, startTime);
+        } else if (containerData.hasSchema(SCHEMA_V2)) {
+          crr = deleteViaSchema2(meta, container, dataDir, startTime);
+        } else if (containerData.hasSchema(SCHEMA_V3)) {
+          crr = deleteViaSchema3(meta, container, dataDir, startTime);
+        } else {
+          throw new UnsupportedOperationException(
+              "Only schema version 1,2,3 are supported.");
+        }
+        return crr;
+
       }
-      return crr;
     } finally {
       container.writeUnlock();
     }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
index efb4be86e8d..33b87f220ed 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
@@ -23,6 +23,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -30,6 +31,10 @@
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -55,6 +60,7 @@
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
+import org.junit.jupiter.api.Test;
 
 /**
  * Class used to test ContainerSet operations.
@@ -371,6 +377,98 @@ public void 
testContainerScanHandlerWithoutGap(ContainerLayoutVersion layout) th
     assertEquals(1, invocationCount.get());
   }
 
+  // -------------------------------------------------------------------------
+  // getContainerWithWriteLock tests
+  // -------------------------------------------------------------------------
+
+  /**
+   * Happy path: container is in the map and the mapping is stable.
+   * Expect the locked container to be returned, with writeLock called and 
writeUnlock NOT yet called
+   */
+  @Test
+  public void testAcquireContainerLockStableMapping() throws 
StorageContainerException {
+    ContainerSet cs = spy(newContainerSet());
+    Container<?> c1 = mock(Container.class);
+    doAnswer(inv -> c1).when(cs).getContainer(1L);
+
+    Container<?> result = cs.getContainerWithWriteLock(1L);
+
+    assertSame(c1, result);
+    verify(c1).writeLock();
+    verify(c1, never()).writeUnlock();
+  }
+
+  /**
+   * Container is present when first fetched, but removed from the map after 
writeLock is acquired
+   * (second getContainer check returns null).
+   * Expect StorageContainerException(CONTAINER_NOT_FOUND), and the lock 
released before throwing (no lock leak).
+   */
+  @Test
+  public void testContainerRemovedAfterWriteLock() {
+    ContainerSet cs = spy(newContainerSet());
+    Container<?> c1 = mock(Container.class);
+    int[] callCount = {0};
+    // First call → candidate c1; second call (re-check after lock) → null 
(container removed)
+    doAnswer(inv -> callCount[0]++ == 0 ? c1 : null).when(cs).getContainer(1L);
+
+    assertThrows(StorageContainerException.class, () -> 
cs.getContainerWithWriteLock(1L));
+    verify(c1).writeLock();
+    verify(c1).writeUnlock();  // lock must be released before throwing
+  }
+
+  /**
+   * Mapping is swapped once (DiskBalancer moves container from C1 to C2) 
while the lock is being
+   * acquired. The first attempt detects the mismatch (current=C2 ≠ 
candidate=C1), releases C1's lock,
+   * and retries. The second attempt finds C2 stable and returns it locked.
+   */
+  @Test
+  public void testRetriesOnMappingSwapThenSucceeds()
+      throws StorageContainerException {
+    ContainerSet cs = spy(newContainerSet());
+    Container<?> c1 = mock(Container.class);
+    Container<?> c2 = mock(Container.class);
+    // Sequence: c1 (candidate retry-0), c2 (current retry-0 → mismatch),
+    // c2 (candidate retry-1), c2 (current retry-1 → match)
+    int[] n = {0};
+    Container<?>[] seq = {c1, c2, c2, c2};
+    doAnswer(inv -> seq[Math.min(n[0]++, seq.length - 
1)]).when(cs).getContainer(1L);
+
+    Container<?> result = cs.getContainerWithWriteLock(1L);
+
+    assertSame(c2, result);
+    // c1 was locked then released during the retry
+    verify(c1).writeLock();
+    verify(c1).writeUnlock();
+    // c2 was locked and is held by the caller
+    verify(c2).writeLock();
+    verify(c2, never()).writeUnlock();
+  }
+
+  /**
+   * The mapping keeps changing on every retry. After {@link 
ContainerSet#maxContainerMapSwapRetries()}
+   * retries, null is returned. All intermediate locks on C1 must be released 
(no lock leak).
+   */
+  @Test
+  public void testExhaustsMaxRetriesReturnsNull()
+      throws StorageContainerException {
+    ContainerSet cs = spy(newContainerSet());
+    Container<?> c1 = mock(Container.class);
+    Container<?> c2 = mock(Container.class);
+    // Alternate: c1 as candidate, c2 as current → always mismatched → all 
retries fail
+    int[] n = {0};
+    doAnswer(inv -> n[0]++ % 2 == 0 ? c1 : c2).when(cs).getContainer(1L);
+
+    Container<?> result = cs.getContainerWithWriteLock(1L);
+
+    assertNull(result);
+    int maxRetries = ContainerSet.maxContainerMapSwapRetries();
+    // c1 is locked and released once per retry
+    verify(c1, times(maxRetries)).writeLock();
+    verify(c1, times(maxRetries)).writeUnlock();
+    // c2 is only ever seen as "current" — it is never locked
+    verify(c2, never()).writeLock();
+  }
+
   /**
    * Verify that {@code result} contains {@code count} containers
    * with IDs in increasing order starting at {@code startId}.
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
index 2b6b387dbe7..6ac39d9ba4f 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
@@ -29,10 +29,12 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -53,6 +55,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -96,6 +99,7 @@ public class TestDeleteBlocksCommandHandler {
   private String schemaVersion;
   private HddsVolume volume1;
   private BlockDeletingServiceMetrics blockDeleteMetrics;
+  private static final long STALE_CONTAINER_ID = 5L;
 
   private void prepareTest(ContainerTestVersionInfo versionInfo)
       throws Exception {
@@ -481,6 +485,89 @@ public void 
testDuplicateTxFromSCMHandledByDeleteBlocksCommandHandler(
     assertEquals(afterSecondPendingBytes + 768L, 
containerData.getBlockPendingDeletionBytes());
   }
 
+  /**
+   * Simulates DiskBalancer swapping the {@link ContainerSet} entry after the
+   * delete-blocks worker read the container once but before it re-checks after
+   * {@code writeLockTryLock}: the first attempt must treat {@code 
containerData} as stale, fail
+   * without calling the schema handler on the old replica, and succeed on the 
built-in retry
+   * against the live replica.
+   */
+  @Test
+  public void deleteBlocksRetriesWhenContainerDataStale() throws Exception {
+    schemaVersion = SCHEMA_V3;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+    OzoneContainer ozoneContainer = mock(OzoneContainer.class);
+    ContainerLayoutVersion layout = ContainerLayoutVersion.FILE_PER_BLOCK;
+    ContainerSet realSet = newContainerSet();
+    volume1 = mockHddsVolume("uuid-1");
+    for (int i = 0; i <= 10; i++) {
+      KeyValueContainerData data =
+          new KeyValueContainerData(i,
+              layout,
+              ContainerTestHelper.CONTAINER_MAX_SIZE,
+              UUID.randomUUID().toString(),
+              UUID.randomUUID().toString());
+      data.setSchemaVersion(schemaVersion);
+      data.setVolume(volume1);
+      KeyValueContainer container = new KeyValueContainer(data, conf);
+      data.closeContainer();
+      realSet.addContainer(container);
+    }
+
+    KeyValueContainer oldContainer =
+        (KeyValueContainer) realSet.getContainer(STALE_CONTAINER_ID);
+    KeyValueContainerData newReplicaData =
+        new KeyValueContainerData((KeyValueContainerData) 
oldContainer.getContainerData());
+    newReplicaData.setVolume(volume1);
+    newReplicaData.closeContainer();
+    KeyValueContainer newContainer = new KeyValueContainer(newReplicaData, 
conf);
+
+    AtomicInteger getContainerSequence = new AtomicInteger(0);
+    ContainerSet containerSetSpy = spy(realSet);
+    doAnswer(invocation -> {
+      long id = invocation.getArgument(0);
+      if (id != STALE_CONTAINER_ID) {
+        return invocation.callRealMethod();
+      }
+      int seq = getContainerSequence.getAndIncrement();
+      if (seq == 0) {
+        return oldContainer;
+      }
+      if (seq == 1) {
+        return newContainer;
+      }
+      return newContainer;
+    }).when(containerSetSpy).getContainer(anyLong());
+
+    when(ozoneContainer.getContainerSet()).thenReturn(containerSetSpy);
+    containerSet = containerSetSpy;
+
+    DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
+    handler = spy(new DeleteBlocksCommandHandler(ozoneContainer, conf, dnConf, 
""));
+    blockDeleteMetrics = handler.getBlockDeleteMetrics();
+    TestSchemaHandler testSchemaHandler1 = spy(new TestSchemaHandler());
+    TestSchemaHandler testSchemaHandler2 = spy(new TestSchemaHandler());
+    TestSchemaHandler testSchemaHandler3 = spy(new TestSchemaHandler());
+    handler.getSchemaHandlers().put(SCHEMA_V1, testSchemaHandler1);
+    handler.getSchemaHandlers().put(SCHEMA_V2, testSchemaHandler2);
+    handler.getSchemaHandlers().put(SCHEMA_V3, testSchemaHandler3);
+
+    DeletedBlocksTransaction transaction =
+        createDeletedBlocksTransaction(77L, STALE_CONTAINER_ID);
+    List<DeleteBlockTransactionResult> results =
+        handler.executeCmdWithRetry(Collections.singletonList(transaction));
+
+    assertEquals(1, results.size());
+    assertTrue(results.get(0).getSuccess());
+    verify(handler, times(2)).submitTasks(any());
+    verify(handler.getSchemaHandlers().get(SCHEMA_V3), times(1))
+        .handle(eq(newReplicaData), eq(transaction));
+    verify(handler.getSchemaHandlers().get(SCHEMA_V3), never())
+        .handle(eq((KeyValueContainerData) oldContainer.getContainerData()), 
any());
+    assertEquals(0, blockDeleteMetrics.getTotalLockTimeoutTransactionCount());
+  }
+
   private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID,
       long containerID) {
     return DeletedBlocksTransaction.newBuilder()
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerWithConcurrentBackgroundTasks.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerWithConcurrentBackgroundTasks.java
new file mode 100644
index 00000000000..2463d0f872f
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerWithConcurrentBackgroundTasks.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.FaultInjector;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
+import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingTask;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * The balancer thread holds a <em>read</em> lock on the old replica while it 
copies the
+ * container and then calls {@link ContainerSet#updateContainer} so the map 
points at the new
+ * replica on another disk. Concurrent delete / block-deletion / unhealthy / 
close paths resolve
+ * the live container by id via {@link ContainerSet#getContainerWithWriteLock} 
and then operate on
+ * that instance so paths, DB, and state match the destination replica.
+ */
+@Timeout(60)
+class TestDiskBalancerWithConcurrentBackgroundTasks {
+
+  @TempDir
+  private java.nio.file.Path tmpDir;
+
+  private File testDir;
+  private final String scmId = UUID.randomUUID().toString();
+  private final String datanodeUuid = UUID.randomUUID().toString();
+  private final OzoneConfiguration conf = new OzoneConfiguration();
+
+  private OzoneContainer ozoneContainer;
+  private ContainerSet containerSet;
+  private MutableVolumeSet volumeSet;
+  private KeyValueHandler keyValueHandler;
+  private ContainerChecksumTreeManager checksumTreeManager;
+  private DiskBalancerServiceTestImpl diskBalancerService;
+
+  private HddsVolume hotVolume;
+  private HddsVolume coldVolume;
+
+  private static final long CONTAINER_ID = 42L;
+  private static final long CONTAINER_SIZE = 1024L * 1024L;
+  private static final long SEEDED_PENDING_BLOCKS = 2L;
+  private static final long SEEDED_PENDING_BYTES = 2048L;
+
+  /**
+   * Pauses disk balancer immediately after {@link 
ContainerSet#updateContainer} (map points at
+   * the new replica) but before readUnlock, so another thread can run while 
the balancer
+   * still holds readLock on the old container.
+   */
+  private static final class AfterInMemoryUpdateInjector extends FaultInjector 
{
+    private final CountDownLatch reachedSwapPoint = new CountDownLatch(1);
+    private final CountDownLatch continueBalancer = new CountDownLatch(1);
+
+    @Override
+    public void pause() throws IOException {
+      // Signal the test thread that the race window has started, then block 
the balancer.
+      reachedSwapPoint.countDown();
+      try {
+        continueBalancer.await();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      }
+    }
+
+    void awaitSwapPoint() throws InterruptedException, TimeoutException {
+      if (!reachedSwapPoint.await(60, TimeUnit.SECONDS)) {
+        throw new TimeoutException("balancer did not reach 
post-updateContainer hook");
+      }
+    }
+
+    // Unblock the balancer so it can readUnlock and finish the move.
+    void continueBalancer() {
+      continueBalancer.countDown();
+    }
+  }
+
+  @BeforeEach
+  void setup() throws Exception {
+    testDir = tmpDir.toFile();
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+        testDir.getAbsolutePath() + "/vol0,"
+            + testDir.getAbsolutePath() + "/vol1,"
+            + testDir.getAbsolutePath() + "/vol2");
+    conf.setClass(SpaceUsageCheckFactory.Conf.configKeyForClassName(),
+        MockSpaceUsageCheckFactory.HalfTera.class,
+        SpaceUsageCheckFactory.class);
+
+    volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+
+    List<StorageVolume> volumes = volumeSet.getVolumesList();
+    HddsVolume v0 = (HddsVolume) volumes.get(0);
+    HddsVolume v1 = (HddsVolume) volumes.get(1);
+    HddsVolume v2 = (HddsVolume) volumes.get(2);
+
+    for (HddsVolume v : new HddsVolume[] {v0, v1, v2}) {
+      v.incrementUsedSpace(0 - v.getCurrentUsage().getUsedSpace());
+    }
+
+    long capacity = v0.getCurrentUsage().getCapacity();
+    v0.incrementUsedSpace(capacity / 20);
+    v1.incrementUsedSpace(capacity / 20);
+    v2.incrementUsedSpace(capacity / 2);
+
+    coldVolume = coldestVolume(v0, v1, v2);
+    hotVolume = hottestVolume(v0, v1, v2);
+
+    containerSet = ContainerSet.newReadOnlyContainerSet(1000);
+    ContainerMetrics containerMetrics = ContainerMetrics.create(conf);
+    checksumTreeManager = new ContainerChecksumTreeManager(conf);
+    keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
+        containerSet, volumeSet, containerMetrics, c -> { },
+        checksumTreeManager);
+    keyValueHandler.setClusterID(scmId);
+
+    Map<ContainerProtos.ContainerType, Handler> handlers = new HashMap<>();
+    handlers.put(ContainerProtos.ContainerType.KeyValueContainer, 
keyValueHandler);
+    ContainerController controller = new ContainerController(containerSet, 
handlers);
+    ContainerDispatcher dispatcher = mock(ContainerDispatcher.class);
+    
when(dispatcher.getHandler(ContainerProtos.ContainerType.KeyValueContainer))
+        .thenReturn(keyValueHandler);
+
+    ozoneContainer = mock(OzoneContainer.class);
+    when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+    when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet);
+    when(ozoneContainer.getController()).thenReturn(controller);
+    when(ozoneContainer.getDispatcher()).thenReturn(dispatcher);
+
+    DiskBalancerConfiguration diskBalancerConfiguration = 
conf.getObject(DiskBalancerConfiguration.class);
+    diskBalancerConfiguration.setDiskBalancerShouldRun(true);
+    conf.setFromObject(diskBalancerConfiguration);
+    diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer, 100, 
conf, 1);
+    // Immediate cleanup of the source replica after a move
+    diskBalancerService.setReplicaDeletionDelay(0);
+  }
+
+  @AfterEach
+  void cleanup() throws IOException {
+    DiskBalancerService.setInjector(null);
+    if (diskBalancerService != null) {
+      diskBalancerService.shutdown();
+    }
+    BlockUtils.shutdownCache(conf);
+    if (volumeSet != null) {
+      volumeSet.shutdown();
+    }
+    if (testDir != null && testDir.exists()) {
+      FileUtils.deleteDirectory(testDir);
+    }
+  }
+
+  /**
+   * Force-delete with a stale {@link Container} handle still targets the 
container id; after a
+   * DiskBalancer swap, {@code deleteInternal} locks the live map entry 
(destination replica) and
+   * applies deletion there — not on the old source object passed in from the 
RPC.
+   */
+  @ContainerTestVersionInfo.ContainerTest
+  void containerDeleteStaleRefKeepsSwappedReplica(ContainerTestVersionInfo 
versionInfo)
+      throws Exception {
+    // Capture delete path logs: deleteInternal and diskBalancer
+    LogCapturer kvLogs = LogCapturer.captureLogs(KeyValueHandler.class);
+    LogCapturer diskBalancerLogs = 
LogCapturer.captureLogs(DiskBalancerService.class);
+
+    String schemaVersion = versionInfo.getSchemaVersion();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+    KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID, 
hotVolume, versionInfo);
+    Container staleContainerRef = oldReplica;
+    String oldReplicaPathOnHot = 
oldReplica.getContainerData().getContainerPath();
+    assertThat(new File(oldReplicaPathOnHot)).exists();
+
+    // Install injector so balancer pauses right after ContainerSet points at 
the new location of replica.
+    AfterInMemoryUpdateInjector raceInjector = new 
AfterInMemoryUpdateInjector();
+    DiskBalancerService.setInjector(raceInjector);
+
+    DiskBalancerService.DiskBalancerTask task =
+        (DiskBalancerService.DiskBalancerTask) 
diskBalancerService.getTasks().poll();
+    assertNotNull(task);
+
+    // Run the move on a background thread; it will block inside the injector.
+    CompletableFuture<Void> balancerDone =
+        CompletableFuture.runAsync(() -> task.call());
+
+    // Wait until updateContainer(newReplica) is done; balancer still holds 
readLock on old replica.
+    raceInjector.awaitSwapPoint();
+
+    Container<?> currentContainerRef = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(currentContainerRef);
+    assertNotEquals(staleContainerRef, currentContainerRef,
+        "ContainerSet should reference the new replica before readUnlock");
+    assertEquals(coldVolume, 
currentContainerRef.getContainerData().getVolume(),
+        "Replica should already be on the destination volume");
+
+    String destinationPathBeforeDelete =
+        currentContainerRef.getContainerData().getContainerPath();
+
+    // Start RM-style force delete using the stale Container handle; 
deleteInternal resolves id 42,
+    // acquires writeLock on the live map entry (destination replica), not on 
the stale source handle.
+    CountDownLatch deleteThreadPastSchedule = new CountDownLatch(1);
+    CompletableFuture<Void> deleteDone = CompletableFuture.runAsync(() -> {
+      deleteThreadPastSchedule.countDown();
+      try {
+        keyValueHandler.deleteContainer(staleContainerRef, true);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    assertThat(deleteThreadPastSchedule.await(10, TimeUnit.SECONDS)).isTrue();
+    Thread.sleep(200);
+
+    raceInjector.continueBalancer();
+
+    CompletableFuture.allOf(balancerDone, deleteDone).get(60, 
TimeUnit.SECONDS);
+
+    assertThat(kvLogs.getOutput()).doesNotContain("reference is stale");
+
+    // Old replica: disk balancer marks it DELETED after the move.
+    assertEquals(State.DELETED, staleContainerRef.getContainerState());
+    assertEquals(hotVolume, staleContainerRef.getContainerData().getVolume());
+
+    // Live map entry was removed by force-delete on the destination replica.
+    assertNull(containerSet.getContainer(CONTAINER_ID));
+    assertThat(new File(destinationPathBeforeDelete)).doesNotExist();
+
+    // Disk balancer delayed cleanup removes the old replica from the source 
path — not RM delete.
+    assertThat(new File(oldReplicaPathOnHot)).doesNotExist();
+    GenericTestUtils.waitFor(
+        () -> diskBalancerLogs.getOutput().contains("Deleted expired container 
42 after delay")
+            && 
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+        100, 10_000);
+  }
+
+  /**
+   * BlockDeletingTask queued with stale ref of KeyValueContainerData still 
resolves
+   * the live replica by id; after {@code getContainerWithWriteLock}, it uses 
the destination
+   * replica's DB and paths (updated {@code containerData} field).
+   */
+  @ContainerTestVersionInfo.ContainerTest
+  void blockTaskStaleDataKeepsPendingOnDestination(ContainerTestVersionInfo 
versionInfo)
+      throws Exception {
+    LogCapturer logs = LogCapturer.captureLogs(BlockDeletingTask.class);
+    LogCapturer diskBalancerLogs = 
LogCapturer.captureLogs(DiskBalancerService.class);
+    String schemaVersion = versionInfo.getSchemaVersion();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+    KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID, 
hotVolume, versionInfo);
+    seedPendingDeletionInMetadata(oldReplica);
+    KeyValueContainerData staleReplicaData = oldReplica.getContainerData();
+
+    // Balancer injector — pause after map swap, before readUnlock.
+    AfterInMemoryUpdateInjector raceInjector = new 
AfterInMemoryUpdateInjector();
+    DiskBalancerService.setInjector(raceInjector);
+
+    DiskBalancerService.DiskBalancerTask balancerTask =
+        (DiskBalancerService.DiskBalancerTask) 
diskBalancerService.getTasks().poll();
+    assertNotNull(balancerTask);
+    CompletableFuture<Void> balancerDone =
+        CompletableFuture.runAsync(() -> balancerTask.call());
+
+    // We are past updateContainer; new replica is in ContainerSet; balancer 
still read-locks old replica.
+    raceInjector.awaitSwapPoint();
+
+    // New ContainerData instance must differ from what the queued block task 
still holds.
+    Container<?> newReplicaData = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(newReplicaData);
+    assertNotEquals(staleReplicaData, newReplicaData.getContainerData());
+    KeyValueContainerData newData = (KeyValueContainerData) 
newReplicaData.getContainerData();
+    long pendingBefore = readPendingDeleteBlockCount(newData);
+    assertEquals(pendingBefore, SEEDED_PENDING_BLOCKS,
+        "new replica should report same pending deletions from copied 
metadata");
+
+    BlockDeletingService blockDeletingService =
+        new BlockDeletingService(ozoneContainer, 500, 500, 
TimeUnit.MILLISECONDS, 1, conf,
+            checksumTreeManager);
+    BlockDeletingService.ContainerBlockInfo blockInfo =
+        new BlockDeletingService.ContainerBlockInfo(staleReplicaData, 
SEEDED_PENDING_BLOCKS + 10);
+    BlockDeletingTask blockDeletingTask =
+        new BlockDeletingTask(blockDeletingService, blockInfo, 
checksumTreeManager, 1);
+
+    // Run block deletion concurrently; getContainerWithWriteLock targets the 
live map entry on the destination.
+    CountDownLatch blockThreadStarted = new CountDownLatch(1);
+    CompletableFuture<Void> blockDone = CompletableFuture.runAsync(() -> {
+      blockThreadStarted.countDown();
+      try {
+        blockDeletingTask.call();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    assertThat(blockThreadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+    Thread.sleep(200);
+
+    raceInjector.continueBalancer();
+    CompletableFuture.allOf(balancerDone, blockDone).get(60, TimeUnit.SECONDS);
+
+    assertThat(logs.getOutput()).doesNotContain("reference is stale");
+
+    KeyValueContainerData newContainerData =
+        (KeyValueContainerData) 
containerSet.getContainer(CONTAINER_ID).getContainerData();
+    assertNotNull(newContainerData);
+    
assertThat(readPendingDeleteBlockCount(newContainerData)).isLessThanOrEqualTo(pendingBefore);
+    assertThat(new File(newContainerData.getChunksPath())).exists();
+    // Disk balancer delayed cleanup removes the old replica from the source 
path — not RM delete.
+    assertThat(new File(staleReplicaData.getContainerPath())).doesNotExist();
+    GenericTestUtils.waitFor(
+        () -> diskBalancerLogs.getOutput().contains("Deleted expired container 
42 after delay")
+            && 
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+        100, 10_000);
+  }
+
+  /**
+   * {@link KeyValueHandler#markContainerUnhealthy} with a stale container
+   * reference while DiskBalancer has already run {@link 
ContainerSet#updateContainer}
+   * (map = destination). Without {@link 
ContainerSet#getContainerWithWriteLock}, the handler would
+   * take writeLock on the old object and mark it unhealthy after {@code 
markContainerForDelete}
+   * turns it {@link State#DELETED}, sending a false UNHEALTHY ICR for a 
replica SCM no longer tracks.
+   * With the fix, the live map entry (destination) is locked and marked 
{@link State#UNHEALTHY}.
+   */
+  @ContainerTestVersionInfo.ContainerTest
+  void markUnhealthyAppliedOnDestVolumeContainer(
+      ContainerTestVersionInfo versionInfo) throws Exception {
+    LogCapturer diskBalancerLogs = 
LogCapturer.captureLogs(DiskBalancerService.class);
+    String schemaVersion = versionInfo.getSchemaVersion();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+    KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID, 
hotVolume, versionInfo);
+    Container staleContainerRef = oldReplica;
+
+    AfterInMemoryUpdateInjector raceInjector = new 
AfterInMemoryUpdateInjector();
+    DiskBalancerService.setInjector(raceInjector);
+
+    DiskBalancerService.DiskBalancerTask balancerTask =
+        (DiskBalancerService.DiskBalancerTask) 
diskBalancerService.getTasks().poll();
+    assertNotNull(balancerTask);
+    CompletableFuture<Void> balancerDone =
+        CompletableFuture.runAsync(balancerTask::call);
+
+    raceInjector.awaitSwapPoint();
+
+    Container<?> liveReplica = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(liveReplica);
+    assertNotEquals(staleContainerRef, liveReplica,
+        "ContainerSet should reference the destination replica at the race 
hook");
+    assertEquals(State.CLOSED, liveReplica.getContainerState());
+
+    ScanResult reason = getUnhealthyDataScanResult();
+    CountDownLatch unhealthyThreadStarted = new CountDownLatch(1);
+    CompletableFuture<Void> unhealthyDone = CompletableFuture.runAsync(() -> {
+      unhealthyThreadStarted.countDown();
+      try {
+        keyValueHandler.markContainerUnhealthy(staleContainerRef, reason);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    assertThat(unhealthyThreadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+    Thread.sleep(200);
+
+    raceInjector.continueBalancer();
+    CompletableFuture.allOf(balancerDone, unhealthyDone).get(60, 
TimeUnit.SECONDS);
+
+    Container<?> afterMove = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(afterMove);
+    assertSame(liveReplica, afterMove);
+    assertEquals(State.UNHEALTHY, afterMove.getContainerState(),
+        "UNHEALTHY must apply to the live destination replica, not the stale 
source handle");
+
+    assertEquals(State.DELETED, staleContainerRef.getContainerState());
+    assertEquals(hotVolume, staleContainerRef.getContainerData().getVolume());
+
+    GenericTestUtils.waitFor(
+        () -> diskBalancerLogs.getOutput().contains("Deleted expired container 
42 after delay")
+            && 
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+        100, 10_000);
+  }
+
+  /**
+   * SCM closeContainer with a stale source container while the map already 
references
+   * the destination after {@link ContainerSet#updateContainer}. Without 
resolving the live instance,
+   * the close would run on the source after it is DELETED and throw, leaving 
the destination
+   * {@link State#QUASI_CLOSED}. With {@link 
ContainerSet#getContainerWithWriteLock}, the destination
+   * is closed to {@link State#CLOSED}.
+   */
+  @ContainerTestVersionInfo.ContainerTest
+  void closeContainerAppliesOnDestVolumeContainer(
+      ContainerTestVersionInfo versionInfo) throws Exception {
+    LogCapturer diskBalancerLogs = 
LogCapturer.captureLogs(DiskBalancerService.class);
+    String schemaVersion = versionInfo.getSchemaVersion();
+    ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+    KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID, 
hotVolume, versionInfo);
+    persistQuasiClosedState(oldReplica);
+    Container staleContainerRef = oldReplica;
+
+    AfterInMemoryUpdateInjector raceInjector = new 
AfterInMemoryUpdateInjector();
+    DiskBalancerService.setInjector(raceInjector);
+
+    DiskBalancerService.DiskBalancerTask balancerTask =
+        (DiskBalancerService.DiskBalancerTask) 
diskBalancerService.getTasks().poll();
+    assertNotNull(balancerTask);
+    CompletableFuture<Void> balancerDone =
+        CompletableFuture.runAsync(balancerTask::call);
+
+    raceInjector.awaitSwapPoint();
+
+    Container<?> liveReplica = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(liveReplica);
+    assertNotEquals(staleContainerRef, liveReplica);
+    assertEquals(State.QUASI_CLOSED, liveReplica.getContainerState());
+
+    CountDownLatch closeThreadStarted = new CountDownLatch(1);
+    CompletableFuture<Void> closeDone = CompletableFuture.runAsync(() -> {
+      closeThreadStarted.countDown();
+      try {
+        keyValueHandler.closeContainer(staleContainerRef);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    assertThat(closeThreadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+    Thread.sleep(200);
+
+    raceInjector.continueBalancer();
+    CompletableFuture.allOf(balancerDone, closeDone).get(60, TimeUnit.SECONDS);
+
+    Container<?> afterMove = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(afterMove);
+    assertSame(liveReplica, afterMove);
+    assertEquals(State.CLOSED, afterMove.getContainerState(),
+        "CLOSED transition must apply to the live destination replica");
+
+    assertEquals(State.DELETED, staleContainerRef.getContainerState());
+
+    GenericTestUtils.waitFor(
+        () -> diskBalancerLogs.getOutput().contains("Deleted expired container 
42 after delay")
+            && 
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+        100, 10_000);
+  }
+
+  /**
+   * Makes {@link State#QUASI_CLOSED} visible on disk so import/copy sees the 
same state
+   * the in-memory container had before the move.
+   */
+  private void persistQuasiClosedState(KeyValueContainer container) throws 
IOException {
+    KeyValueContainerData data = container.getContainerData();
+    data.setState(State.QUASI_CLOSED);
+    File containerFile = ContainerUtils.getContainerFile(new 
File(data.getContainerPath()));
+    ContainerDataYaml.createContainerFile(data, containerFile);
+  }
+
+  private long readPendingDeleteBlockCount(KeyValueContainerData data) throws 
IOException {
+    try (DBHandle db = BlockUtils.getDB(data, conf)) {
+      Table<String, Long> meta = db.getStore().getMetadataTable();
+      Long v = meta.get(data.getPendingDeleteBlockCountKey());
+      return v == null ? 0L : v;
+    }
+  }
+
+  /**
+   * Persists pending-deletion counters in container metadata so they survive
+   * disk balancer copy/import and can be verified on the destination replica.
+   */
+  private void seedPendingDeletionInMetadata(KeyValueContainer container) 
throws IOException {
+    KeyValueContainerData data = container.getContainerData();
+    try (DBHandle metadata = BlockUtils.getDB(data, conf)) {
+      Table<String, Long> meta = metadata.getStore().getMetadataTable();
+      meta.put(data.getPendingDeleteBlockCountKey(), SEEDED_PENDING_BLOCKS);
+      meta.put(data.getPendingDeleteBlockBytesKey(), SEEDED_PENDING_BYTES);
+    }
+    data.incrPendingDeletionBlocks(SEEDED_PENDING_BLOCKS, 
SEEDED_PENDING_BYTES);
+  }
+
+  private static HddsVolume coldestVolume(HddsVolume... volumes) {
+    return Arrays.stream(volumes)
+        .min(volumePolicyOrder())
+        .get();
+  }
+
+  private static HddsVolume hottestVolume(HddsVolume... volumes) {
+    return Arrays.stream(volumes)
+        .max(volumePolicyOrder())
+        .get();
+  }
+
+  private static Comparator<HddsVolume> volumePolicyOrder() {
+    return Comparator
+        .comparingDouble((HddsVolume v) -> {
+          SpaceUsageSource usage = v.getCurrentUsage();
+          return (double) (usage.getCapacity() - usage.getAvailable()) / 
usage.getCapacity();
+        })
+        .thenComparing(HddsVolume::getStorageID);
+  }
+
+  private KeyValueContainer createClosedContainer(long containerId, HddsVolume 
vol,
+      ContainerTestVersionInfo versionInfo)
+      throws IOException {
+    KeyValueContainerData containerData = new KeyValueContainerData(
+        containerId, versionInfo.getLayout(), CONTAINER_SIZE,
+        UUID.randomUUID().toString(), datanodeUuid);
+    containerData.setState(State.CLOSED);
+    containerData.getStatistics().setBlockBytesForTesting(CONTAINER_SIZE);
+    containerData.setSchemaVersion(versionInfo.getSchemaVersion());
+
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
+    VolumeChoosingPolicy policy = mock(VolumeChoosingPolicy.class);
+    when(policy.chooseVolume(any(List.class), anyLong())).thenReturn(vol);
+    container.create((VolumeSet) volumeSet, policy, scmId);
+    containerSet.addContainer(container);
+    vol.incrementUsedSpace(containerData.getBytesUsed());
+    return container;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 0385564ebf7..7ba862389d1 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -370,18 +370,20 @@ private ContainerCommandRequestProto 
getDummyCommandRequestProto(
   @ContainerLayoutTestInfo.ContainerTest
   public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
       throws IOException {
-    KeyValueHandler keyValueHandler = createKeyValueHandler(tempDir);
     conf = new OzoneConfiguration();
-    KeyValueContainerData kvData = new 
KeyValueContainerData(DUMMY_CONTAINER_ID,
-        layoutVersion,
-        (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
-        UUID.randomUUID().toString());
-    kvData.setMetadataPath(tempDir.toString());
-    kvData.setDbFile(dbFile.toFile());
-    KeyValueContainer container = new KeyValueContainer(kvData, conf);
+    conf.set(OZONE_SCM_CONTAINER_LAYOUT_KEY, layoutVersion.name());
+    HandlerWithVolumeSet handlerCtx = createKeyValueHandler(tempDir);
+    KeyValueHandler keyValueHandler = handlerCtx.getHandler();
+    ContainerSet containerSet = handlerCtx.getContainerSet();
+
+    long containerId = DUMMY_CONTAINER_ID + layoutVersion.getVersion();
     ContainerCommandRequestProto createContainerRequest =
-        createContainerRequest(DATANODE_UUID, DUMMY_CONTAINER_ID);
-    keyValueHandler.handleCreateContainer(createContainerRequest, container);
+        createContainerRequest(DATANODE_UUID, containerId);
+    keyValueHandler.handleCreateContainer(createContainerRequest, null);
+
+    KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+    KeyValueContainerData kvData = container.getContainerData();
 
     // Make the container state as invalid.
     kvData.setState(ContainerProtos.ContainerDataProto.State.INVALID);
@@ -390,12 +392,11 @@ public void 
testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
     ContainerCommandRequestProto closeContainerRequest =
         ContainerProtos.ContainerCommandRequestProto.newBuilder()
             .setCmdType(ContainerProtos.Type.CloseContainer)
-            .setContainerID(DUMMY_CONTAINER_ID)
+            .setContainerID(containerId)
             .setDatanodeUuid(DATANODE_UUID)
             .setCloseContainer(ContainerProtos.CloseContainerRequestProto
                 .getDefaultInstance())
             .build();
-    dispatcher.dispatch(closeContainerRequest, null);
 
     // Closing invalid container should return error response.
     ContainerProtos.ContainerCommandResponseProto response =
@@ -923,7 +924,7 @@ private static ContainerCommandRequestProto 
createContainerRequest(
         .build();
   }
 
-  private KeyValueHandler createKeyValueHandler(Path path) throws IOException {
+  private HandlerWithVolumeSet createKeyValueHandler(Path path) throws 
IOException {
     final ContainerSet containerSet = newContainerSet();
     final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
 
@@ -950,7 +951,7 @@ private KeyValueHandler createKeyValueHandler(Path path) 
throws IOException {
         conf.getObject(ContainerScannerConfiguration.class), controller);
     containerSet.registerOnDemandScanner(onDemandScanner);
 
-    return kvHandler;
+    return new HandlerWithVolumeSet(kvHandler, volumeSet, containerSet);
   }
 
   private static class HandlerWithVolumeSet {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index 8361959e6da..358b56157fc 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -29,6 +29,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atMostOnce;
 import static org.mockito.Mockito.mock;
@@ -220,7 +221,8 @@ void testNPEFromPutBlock() throws IOException {
 
   @Test
   public void testMarkContainerUnhealthyInFailedVolume() throws IOException {
-    KeyValueHandler handler = getDummyHandler();
+    ContainerSet containerSet = mock(ContainerSet.class);
+    KeyValueHandler handler = getDummyHandler(containerSet);
     KeyValueContainerData kvData = new KeyValueContainerData(1L,
         ContainerLayoutVersion.FILE_PER_BLOCK,
         (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
@@ -233,6 +235,10 @@ public void testMarkContainerUnhealthyInFailedVolume() 
throws IOException {
         .build();
     kvData.setVolume(hddsVolume);
     KeyValueContainer container = new KeyValueContainer(kvData, conf);
+    when(containerSet.getContainerWithWriteLock(eq(1L))).thenAnswer(invocation 
-> {
+      container.writeLock();
+      return container;
+    });
 
     // When volume is failed, the call to mark the container unhealthy should
     // be ignored.
@@ -252,6 +258,10 @@ public void testMarkContainerUnhealthyInFailedVolume() 
throws IOException {
   // -- Helper methods below.
 
   private KeyValueHandler getDummyHandler() {
+    return getDummyHandler(mock(ContainerSet.class));
+  }
+
+  private KeyValueHandler getDummyHandler(ContainerSet containerSet) {
     DatanodeDetails dnDetails = DatanodeDetails.newBuilder()
         .setUuid(UUID.fromString(DATANODE_UUID))
         .setHostName("dummyHost")
@@ -263,7 +273,7 @@ private KeyValueHandler getDummyHandler() {
     return new KeyValueHandler(
         conf,
         stateMachine.getDatanodeDetails().getUuidString(),
-        mock(ContainerSet.class),
+        containerSet,
         mock(MutableVolumeSet.class),
         mock(ContainerMetrics.class), mockIcrSender, new 
ContainerChecksumTreeManager(conf));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to