This is an automated email from the ASF dual-hosted git repository.
Gargi-jais11 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a60b0236413 HDDS-15066. Read-Write Lock race leave stale references to
container creating orphan replicas (#10109).
a60b0236413 is described below
commit a60b02364135a5a24ea0797fa5971bcd8992397a
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Thu May 14 16:11:49 2026 +0530
HDDS-15066. Read-Write Lock race leave stale references to container
creating orphan replicas (#10109).
---
.../ozone/container/common/impl/ContainerSet.java | 54 ++
.../commandhandler/DeleteBlocksCommandHandler.java | 28 +-
.../diskbalancer/DiskBalancerService.java | 41 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 107 ++--
.../statemachine/background/BlockDeletingTask.java | 52 +-
.../container/common/impl/TestContainerSet.java | 98 ++++
.../TestDeleteBlocksCommandHandler.java | 89 +++-
...tDiskBalancerWithConcurrentBackgroundTasks.java | 591 +++++++++++++++++++++
.../container/keyvalue/TestKeyValueHandler.java | 29 +-
.../TestKeyValueHandlerWithUnhealthyContainer.java | 14 +-
10 files changed, 1001 insertions(+), 102 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 13bf7789929..d122dfb587f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -63,6 +63,12 @@ public class ContainerSet implements Iterable<Container<?>> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerSet.class);
+ /**
+ * Max attempts to acquire {@link Container#writeLock()} while verifying
this set's id → container mapping
+ * is same (e.g. another thread may {@link #updateContainer} / DiskBalancer
swap the instance).
+ */
+ private static final int MAX_CONTAINER_MAP_SWAP_RETRIES = 5;
+
private final ConcurrentSkipListMap<Long, Container<?>> containerMap = new
ConcurrentSkipListMap<>();
private final ConcurrentSkipListSet<Long> missingContainerSet =
@@ -279,6 +285,54 @@ public Container<?> getContainer(long containerId) {
return containerMap.get(containerId);
}
+ /**
+ * Returns the max retry for a container map swap while acquiring container
lock.
+ * @return max retry count
+ */
+ public static int maxContainerMapSwapRetries() {
+ return MAX_CONTAINER_MAP_SWAP_RETRIES;
+ }
+
+ /**
+ * Locks the container mapped to {@code containerId} for write, and verifies
that the instance locked is still
+ * the one stored in this set. If the mapping is swapped, unlocks and
retries up to
+ * {@link #maxContainerMapSwapRetries()} times, then returns {@code null}.
+ *
+ * @return the locked container, or {@code null} if the mapping could not be
stabilized after all retries
+ * @throws StorageContainerException with {@code CONTAINER_NOT_FOUND}
+ */
+ @Nullable
+ public Container<?> getContainerWithWriteLock(long containerId) throws
StorageContainerException {
+ for (int retry = 0; retry < MAX_CONTAINER_MAP_SWAP_RETRIES; retry++) {
+ Container<?> candidate = getContainer(containerId);
+ if (candidate == null) {
+ throw new StorageContainerException(
+ "Container " + containerId + " not found in ContainerSet.",
+ ContainerProtos.Result.CONTAINER_NOT_FOUND);
+ }
+ candidate.writeLock();
+ Container<?> current = getContainer(containerId);
+ if (current == null) {
+ candidate.writeUnlock();
+ throw new StorageContainerException(
+ "Container " + containerId + " not found in ContainerSet.",
+ ContainerProtos.Result.CONTAINER_NOT_FOUND);
+ }
+ if (current != candidate) {
+ candidate.writeUnlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container {} mapping changed during lock acquisition
(attempt {}); retrying.",
+ containerId, retry);
+ }
+ continue;
+ }
+ return candidate;
+ }
+ LOG.warn("Container {} mapping kept changing after {} attempts; giving
up.",
+ containerId, MAX_CONTAINER_MAP_SWAP_RETRIES);
+ return null;
+ }
+
/**
* Removes container from both memory and database. This should be used when
the containerData on disk has been
* removed completely from the node.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 48d5053eb76..45ea5bffdec 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -303,19 +303,31 @@ public DeleteBlockTransactionExecutionResult call() {
if (keyValueContainer.
writeLockTryLock(tryLockTimeoutMs, TimeUnit.MILLISECONDS)) {
try {
- String schemaVersion = containerData
- .getSupportedSchemaVersionOrDefault();
- if (getSchemaHandlers().containsKey(schemaVersion)) {
- schemaHandlers.get(schemaVersion).handle(containerData, tx);
+ // Re-fetch the container after acquiring the lock. DiskBalancer
may have relocated
+ // this container to a different disk while we waited — in that
case, the container
+ // object in ContainerSet has changed and containerData points
to the old replica.
+ Container<?> current = containerSet.getContainer(containerId);
+ if (current == null || current.getContainerData() !=
containerData) {
+ LOG.debug("DeleteBlocks: containerData for container {} is
stale "
+ + ", Will retry on the new replica.",
+ containerId);
+ lockAcquisitionFailed = true;
+ txResultBuilder.setContainerID(containerId).setSuccess(false);
} else {
- throw new UnsupportedOperationException(
- "Only schema version 1,2,3 are supported.");
+ String schemaVersion = containerData
+ .getSupportedSchemaVersionOrDefault();
+ if (getSchemaHandlers().containsKey(schemaVersion)) {
+ schemaHandlers.get(schemaVersion).handle(containerData, tx);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1,2,3 are supported.");
+ }
+ txResultBuilder.setContainerID(containerId)
+ .setSuccess(true);
}
} finally {
keyValueContainer.writeUnlock();
}
- txResultBuilder.setContainerID(containerId)
- .setSuccess(true);
} else {
lockAcquisitionFailed = true;
txResultBuilder.setContainerID(containerId)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index bf22d811ff2..b1bac677178 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -512,19 +512,19 @@ public BackgroundTaskResult call() {
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
- // Double check container state before acquiring lock to start move
process.
- // Container state may have changed after selection.
- State containerState = container.getContainerData().getState();
- if (!movableContainerStates.contains(containerState)) {
- LOG.warn("Container {} is in {} state, skipping move process.",
containerId, containerState);
- postCall(false, startTime);
- return BackgroundTaskResult.EmptyTaskResult.newResult();
- }
-
// hold read lock on the container first, to avoid other threads to
update the container state,
// such as block deletion.
container.readLock();
try {
+ // Double check container state after acquiring lock to start move
process.
+ // Container state may have changed after selection.
+ State containerState = container.getContainerData().getState();
+ if (!movableContainerStates.contains(containerState)) {
+ LOG.warn("Container {} is in {} state, skipping move process.",
containerId, containerState);
+ moveSucceeded = false;
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
// Step 1: Copy container to new Volume's tmp Dir
diskBalancerTmpDir = getDiskBalancerTmpDir(destVolume)
.resolve(String.valueOf(containerId));
@@ -580,6 +580,10 @@ public BackgroundTaskResult call() {
// old caller can still hold the old Container object.
ozoneContainer.getContainerSet().updateContainer(newContainer);
destVolume.incrementUsedSpace(containerSize);
+
+ // Test injector: ContainerSet now references newContainer while this
thread still holds
+ // readLock on the old replica.
+ pauseInjector();
// Mark old container as DELETED and persist state.
// markContainerForDelete require writeLock, so release readLock first
container.readUnlock();
@@ -597,13 +601,7 @@ public BackgroundTaskResult call() {
metrics.incrSuccessBytes(containerSize);
totalBalancedBytes.addAndGet(containerSize);
} catch (IOException e) {
- if (injector != null) {
- try {
- injector.pause();
- } catch (IOException ex) {
- // do nothing
- }
- }
+ pauseInjector();
moveSucceeded = false;
LOG.warn("Failed to move container {}", containerId, e);
if (diskBalancerTmpDir != null) {
@@ -861,6 +859,17 @@ public static void setInjector(FaultInjector instance) {
injector = instance;
}
+ // call FaultInjector#pause when an injector is registered; ignore
IOException.
+ private static void pauseInjector() {
+ if (injector != null) {
+ try {
+ injector.pause();
+ } catch (IOException ex) {
+ // do nothing
+ }
+ }
+ }
+
@VisibleForTesting
public void setReplicaDeletionDelay(long durationMills) {
this.replicaDeletionDelay = durationMills;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 6f93bcef4fd..474b7458644 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1511,10 +1511,17 @@ private ContainerProtos.ContainerChecksumInfo
updateAndGetContainerChecksum(Cont
@Override
public void markContainerUnhealthy(Container container, ScanResult reason)
throws IOException {
- container.writeLock();
long containerID = container.getContainerData().getContainerID();
+ Container<?> lockedContainer =
containerSet.getContainerWithWriteLock(containerID);
+ if (lockedContainer == null) {
+ // null means container retries exhausted ;
+ // container not-found throws StorageContainerException.
+ LOG.warn("Exceeded {} attempts locking live container {}; skipping
markContainerUnhealthy.",
+ ContainerSet.maxContainerMapSwapRetries(), containerID);
+ return;
+ }
try {
- if (container.getContainerState() == State.UNHEALTHY) {
+ if (lockedContainer.getContainerState() == State.UNHEALTHY) {
LOG.debug("Call to mark already unhealthy container {} as unhealthy",
containerID);
return;
@@ -1522,25 +1529,25 @@ public void markContainerUnhealthy(Container container,
ScanResult reason)
// If the volume is unhealthy, no action is needed. The container has
// already been discarded and SCM notified. Once a volume is failed, it
// cannot be restored without a restart.
- HddsVolume containerVolume = container.getContainerData().getVolume();
+ HddsVolume containerVolume =
lockedContainer.getContainerData().getVolume();
if (containerVolume.isFailed()) {
LOG.debug("Ignoring unhealthy container {} detected on an " +
"already failed volume {}", containerID, containerVolume);
return;
}
- container.markContainerUnhealthy();
+ lockedContainer.markContainerUnhealthy();
} catch (StorageContainerException ex) {
LOG.warn("Unexpected error while marking container {} unhealthy",
containerID, ex);
} finally {
- container.writeUnlock();
+ lockedContainer.writeUnlock();
}
- updateContainerChecksumFromMetadataIfNeeded(container);
+ updateContainerChecksumFromMetadataIfNeeded(lockedContainer);
// Even if the container file is corrupted/missing and the unhealthy
// update fails, the unhealthy state is kept in memory and sent to
// SCM. Write a corresponding entry to the container log as well.
- ContainerLogger.logUnhealthy(container.getContainerData(), reason);
- sendICR(container);
+ ContainerLogger.logUnhealthy(lockedContainer.getContainerData(), reason);
+ sendICR(lockedContainer);
}
@Override
@@ -1574,17 +1581,24 @@ public void quasiCloseContainer(Container container,
String reason)
@Override
public void closeContainer(Container container)
throws IOException {
- container.writeLock();
+ long containerID = container.getContainerData().getContainerID();
+ Container<?> lockedContainer =
containerSet.getContainerWithWriteLock(containerID);
+ if (lockedContainer == null) {
+ // null means container locking retries exhausted ;
+ // container not-found throws StorageContainerException.
+ LOG.warn("Exceeded {} attempts locking live container {}; skipping
closeContainer.",
+ ContainerSet.maxContainerMapSwapRetries(), containerID);
+ return;
+ }
try {
- final State state = container.getContainerState();
+ final State state = lockedContainer.getContainerState();
// Close call is idempotent.
if (state == State.CLOSED) {
return;
}
if (state == State.UNHEALTHY) {
throw new StorageContainerException(
- "Cannot close container #" + container.getContainerData()
- .getContainerID() + " while in " + state + " state.",
+ "Cannot close container #" + containerID + " while in " + state +
" state.",
ContainerProtos.Result.CONTAINER_UNHEALTHY);
}
// The container has to be either in CLOSING or in QUASI_CLOSED state.
@@ -1593,16 +1607,15 @@ public void closeContainer(Container container)
state == State.INVALID ? INVALID_CONTAINER_STATE :
CONTAINER_INTERNAL_ERROR;
throw new StorageContainerException(
- "Cannot close container #" + container.getContainerData()
- .getContainerID() + " while in " + state + " state.", error);
+ "Cannot close container #" + containerID + " while in " + state +
" state.", error);
}
- container.close();
+ lockedContainer.close();
} finally {
- container.writeUnlock();
+ lockedContainer.writeUnlock();
}
- updateContainerChecksumFromMetadataIfNeeded(container);
- ContainerLogger.logClosed(container.getContainerData());
- sendICR(container);
+ updateContainerChecksumFromMetadataIfNeeded(lockedContainer);
+ ContainerLogger.logClosed(lockedContainer.getContainerData());
+ sendICR(lockedContainer);
}
@Override
@@ -2293,24 +2306,33 @@ private boolean logBlocksFoundOnDisk(Container
container) throws IOException {
private void deleteInternal(Container container, boolean force)
throws StorageContainerException {
+ final long containerId = container.getContainerData().getContainerID();
long startTime = clock.millis();
- container.writeLock();
+ Container<?> containerLocked =
containerSet.getContainerWithWriteLock(containerId);
+ if (containerLocked == null) {
+ // null means container locking retries exhausted ;
+ // container not-found throws StorageContainerException.
+ LOG.info("Exceeded {} retries to lock container {}; Now SCM will resend
for delete with " +
+ "the current container replica",
ContainerSet.maxContainerMapSwapRetries(),
+ containerId);
+ return;
+ }
try {
- final ContainerData data = container.getContainerData();
- if (container.getContainerData().getVolume().isFailed()) {
+ final ContainerData data = containerLocked.getContainerData();
+ if (containerLocked.getContainerData().getVolume().isFailed()) {
// if the volume in which the container resides fails
// don't attempt to delete/move it. When a volume fails,
// failedVolumeListener will pick it up and clear the container
// from the container set.
LOG.info("Delete container issued on containerID {} which is in a " +
- "failed volume. Skipping", container.getContainerData()
+ "failed volume. Skipping", containerLocked.getContainerData()
.getContainerID());
return;
}
// If force is false, we check container state.
if (!force) {
// Check if container is open
- if (container.getContainerData().isOpen()) {
+ if (containerLocked.getContainerData().isOpen()) {
throw new StorageContainerException(
"Deletion of Open Container is not allowed.",
DELETE_ON_OPEN_CONTAINER);
@@ -2319,14 +2341,14 @@ private void deleteInternal(Container container,
boolean force)
// If the container is not empty, it should not be deleted unless the
// container is being forcefully deleted (which happens when
// container is unhealthy or over-replicated).
- if (container.hasBlocks()) {
+ if (containerLocked.hasBlocks()) {
metrics.incContainerDeleteFailedNonEmpty();
LOG.error("Received container deletion command for non-empty {}:
{}", data, data.getStatistics());
// blocks table for future debugging.
// List blocks
- logBlocksIfNonZero(container);
+ logBlocksIfNonZero(containerLocked);
// Log chunks
- logBlocksFoundOnDisk(container);
+ logBlocksFoundOnDisk(containerLocked);
throw new StorageContainerException("Non-force deletion of " +
"non-empty container is not allowed.",
DELETE_ON_NON_EMPTY_CONTAINER);
@@ -2334,9 +2356,9 @@ private void deleteInternal(Container container, boolean
force)
} else {
metrics.incContainersForceDelete();
}
- if (container.getContainerData() instanceof KeyValueContainerData) {
+ if (containerLocked.getContainerData() instanceof KeyValueContainerData)
{
KeyValueContainerData keyValueContainerData =
- (KeyValueContainerData) container.getContainerData();
+ (KeyValueContainerData) containerLocked.getContainerData();
HddsVolume hddsVolume = keyValueContainerData.getVolume();
// Steps to delete
@@ -2350,21 +2372,20 @@ private void deleteInternal(Container container,
boolean force)
if (waitTime > maxDeleteLockWaitMs) {
LOG.warn("An attempt to delete container {} took {} ms acquiring
locks and pre-checks. " +
"The delete has been skipped and should be retried
automatically by SCM.",
- container.getContainerData().getContainerID(), waitTime);
+ containerLocked.getContainerData().getContainerID(), waitTime);
return;
}
- container.markContainerForDelete();
- long containerId = container.getContainerData().getContainerID();
+ containerLocked.markContainerForDelete();
containerSet.removeContainer(containerId);
- ContainerLogger.logDeleted(container.getContainerData(), force);
+ ContainerLogger.logDeleted(containerLocked.getContainerData(),
force);
KeyValueContainerUtil.removeContainer(keyValueContainerData, conf);
} catch (IOException ioe) {
LOG.error("Failed to move container under " + hddsVolume
.getDeletedContainerDir());
String errorMsg =
- "Failed to move container" + container.getContainerData()
+ "Failed to move container" + containerLocked.getContainerData()
.getContainerID();
- triggerVolumeScanAndThrowException(container, errorMsg,
+ triggerVolumeScanAndThrowException(containerLocked, errorMsg,
CONTAINER_INTERNAL_ERROR);
}
}
@@ -2374,20 +2395,20 @@ private void deleteInternal(Container container,
boolean force)
// All other IO Exceptions should be treated as if the container is not
// empty as a defensive check.
LOG.error("Could not determine if the container {} is empty",
- container.getContainerData().getContainerID(), e);
+ containerLocked.getContainerData().getContainerID(), e);
String errorMsg =
- "Failed to read container dir" + container.getContainerData()
+ "Failed to read container dir" + containerLocked.getContainerData()
.getContainerID();
- triggerVolumeScanAndThrowException(container, errorMsg,
+ triggerVolumeScanAndThrowException(containerLocked, errorMsg,
CONTAINER_INTERNAL_ERROR);
} finally {
- container.writeUnlock();
+ containerLocked.writeUnlock();
}
// Avoid holding write locks for disk operations
- sendICR(container);
- long bytesUsed = container.getContainerData().getBytesUsed();
- HddsVolume volume = container.getContainerData().getVolume();
- container.delete();
+ sendICR(containerLocked);
+ long bytesUsed = containerLocked.getContainerData().getBytesUsed();
+ HddsVolume volume = containerLocked.getContainerData().getVolume();
+ containerLocked.delete();
volume.decrementUsedSpace(bytesUsed);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
index 1f66cad476f..a4ab1a7ef0e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -67,7 +68,7 @@ public class BlockDeletingTask implements BackgroundTask {
private final BlockDeletingServiceMetrics metrics;
private final int priority;
- private final KeyValueContainerData containerData;
+ private KeyValueContainerData containerData;
private long blocksToDelete;
private final OzoneContainer ozoneContainer;
private final ConfigurationSource conf;
@@ -139,24 +140,39 @@ public BackgroundTaskResult call() throws Exception {
private ContainerBackgroundTaskResult handleDeleteTask() throws Exception {
ContainerBackgroundTaskResult crr;
- final Container container = ozoneContainer.getContainerSet()
- .getContainer(containerData.getContainerID());
- container.writeLock();
- File dataDir = new File(containerData.getChunksPath());
- long startTime = Time.monotonicNow();
- // Scan container's db and get list of under deletion blocks
- try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
- if (containerData.hasSchema(SCHEMA_V1)) {
- crr = deleteViaSchema1(meta, container, dataDir, startTime);
- } else if (containerData.hasSchema(SCHEMA_V2)) {
- crr = deleteViaSchema2(meta, container, dataDir, startTime);
- } else if (containerData.hasSchema(SCHEMA_V3)) {
- crr = deleteViaSchema3(meta, container, dataDir, startTime);
- } else {
- throw new UnsupportedOperationException(
- "Only schema version 1,2,3 are supported.");
+ ContainerSet cs = ozoneContainer.getContainerSet();
+ final long containerId = containerData.getContainerID();
+
+ Container<?> container = cs.getContainerWithWriteLock(containerId);
+ if (container == null) {
+ // null means container locking retries exhausted ;
+ // container not-found throws StorageContainerException.
+ LOG.info("Exceeded {} retries to lock container {}; Now DN will resend
for delete with " +
+ "the current container replica",
ContainerSet.maxContainerMapSwapRetries(),
+ containerId);
+ return new ContainerBackgroundTaskResult();
+ }
+ try {
+ // Always use ContainerData from the locked live Container so paths /
RocksDB locations match deleteViaSchema*.
+ containerData = (KeyValueContainerData) container.getContainerData();
+
+ File dataDir = new File(containerData.getChunksPath());
+ long startTime = Time.monotonicNow();
+ // Scan container's db and get list of under deletion blocks
+ try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
+ if (containerData.hasSchema(SCHEMA_V1)) {
+ crr = deleteViaSchema1(meta, container, dataDir, startTime);
+ } else if (containerData.hasSchema(SCHEMA_V2)) {
+ crr = deleteViaSchema2(meta, container, dataDir, startTime);
+ } else if (containerData.hasSchema(SCHEMA_V3)) {
+ crr = deleteViaSchema3(meta, container, dataDir, startTime);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1,2,3 are supported.");
+ }
+ return crr;
+
}
- return crr;
} finally {
container.writeUnlock();
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
index efb4be86e8d..33b87f220ed 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
@@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -30,6 +31,10 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -55,6 +60,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
+import org.junit.jupiter.api.Test;
/**
* Class used to test ContainerSet operations.
@@ -371,6 +377,98 @@ public void
testContainerScanHandlerWithoutGap(ContainerLayoutVersion layout) th
assertEquals(1, invocationCount.get());
}
+ // -------------------------------------------------------------------------
+ // getContainerWithWriteLock tests
+ // -------------------------------------------------------------------------
+
+ /**
+ * Happy path: container is in the map and the mapping is stable.
+ * Expect the locked container to be returned, with writeLock called and
writeUnlock NOT yet called
+ */
+ @Test
+ public void testAcquireContainerLockStableMapping() throws
StorageContainerException {
+ ContainerSet cs = spy(newContainerSet());
+ Container<?> c1 = mock(Container.class);
+ doAnswer(inv -> c1).when(cs).getContainer(1L);
+
+ Container<?> result = cs.getContainerWithWriteLock(1L);
+
+ assertSame(c1, result);
+ verify(c1).writeLock();
+ verify(c1, never()).writeUnlock();
+ }
+
+ /**
+ * Container is present when first fetched, but removed from the map after
writeLock is acquired
+ * (second getContainer check returns null).
+ * Expect StorageContainerException(CONTAINER_NOT_FOUND), and the lock
released before throwing (no lock leak).
+ */
+ @Test
+ public void testContainerRemovedAfterWriteLock() {
+ ContainerSet cs = spy(newContainerSet());
+ Container<?> c1 = mock(Container.class);
+ int[] callCount = {0};
+ // First call → candidate c1; second call (re-check after lock) → null
(container removed)
+ doAnswer(inv -> callCount[0]++ == 0 ? c1 : null).when(cs).getContainer(1L);
+
+ assertThrows(StorageContainerException.class, () ->
cs.getContainerWithWriteLock(1L));
+ verify(c1).writeLock();
+ verify(c1).writeUnlock(); // lock must be released before throwing
+ }
+
+ /**
+ * Mapping is swapped once (DiskBalancer moves container from C1 to C2)
while the lock is being
+ * acquired. The first attempt detects the mismatch (current=C2 ≠
candidate=C1), releases C1's lock,
+ * and retries. The second attempt finds C2 stable and returns it locked.
+ */
+ @Test
+ public void testRetriesOnMappingSwapThenSucceeds()
+ throws StorageContainerException {
+ ContainerSet cs = spy(newContainerSet());
+ Container<?> c1 = mock(Container.class);
+ Container<?> c2 = mock(Container.class);
+ // Sequence: c1 (candidate retry-0), c2 (current retry-0 → mismatch),
+ // c2 (candidate retry-1), c2 (current retry-1 → match)
+ int[] n = {0};
+ Container<?>[] seq = {c1, c2, c2, c2};
+ doAnswer(inv -> seq[Math.min(n[0]++, seq.length -
1)]).when(cs).getContainer(1L);
+
+ Container<?> result = cs.getContainerWithWriteLock(1L);
+
+ assertSame(c2, result);
+ // c1 was locked then released during the retry
+ verify(c1).writeLock();
+ verify(c1).writeUnlock();
+ // c2 was locked and is held by the caller
+ verify(c2).writeLock();
+ verify(c2, never()).writeUnlock();
+ }
+
+ /**
+ * The mapping keeps changing on every retry. After {@link
ContainerSet#maxContainerMapSwapRetries()}
+ * retries, null is returned. All intermediate locks on C1 must be released
(no lock leak).
+ */
+ @Test
+ public void testExhaustsMaxRetriesReturnsNull()
+ throws StorageContainerException {
+ ContainerSet cs = spy(newContainerSet());
+ Container<?> c1 = mock(Container.class);
+ Container<?> c2 = mock(Container.class);
+ // Alternate: c1 as candidate, c2 as current → always mismatched → all
retries fail
+ int[] n = {0};
+ doAnswer(inv -> n[0]++ % 2 == 0 ? c1 : c2).when(cs).getContainer(1L);
+
+ Container<?> result = cs.getContainerWithWriteLock(1L);
+
+ assertNull(result);
+ int maxRetries = ContainerSet.maxContainerMapSwapRetries();
+ // c1 is locked and released once per retry
+ verify(c1, times(maxRetries)).writeLock();
+ verify(c1, times(maxRetries)).writeUnlock();
+ // c2 is only ever seen as "current" — it is never locked
+ verify(c2, never()).writeLock();
+ }
+
/**
* Verify that {@code result} contains {@code count} containers
* with IDs in increasing order starting at {@code startId}.
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
index 2b6b387dbe7..6ac39d9ba4f 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
@@ -29,10 +29,12 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -53,6 +55,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -96,6 +99,7 @@ public class TestDeleteBlocksCommandHandler {
private String schemaVersion;
private HddsVolume volume1;
private BlockDeletingServiceMetrics blockDeleteMetrics;
+ private static final long STALE_CONTAINER_ID = 5L;
private void prepareTest(ContainerTestVersionInfo versionInfo)
throws Exception {
@@ -481,6 +485,89 @@ public void
testDuplicateTxFromSCMHandledByDeleteBlocksCommandHandler(
assertEquals(afterSecondPendingBytes + 768L,
containerData.getBlockPendingDeletionBytes());
}
+ /**
+ * Simulates DiskBalancer swapping the {@link ContainerSet} entry after the
+ * delete-blocks worker read the container once but before it re-checks after
+ * {@code writeLockTryLock}: the first attempt must treat {@code
containerData} as stale, fail
+ * without calling the schema handler on the old replica, and succeed on the
built-in retry
+ * against the live replica.
+ */
+ @Test
+ public void deleteBlocksRetriesWhenContainerDataStale() throws Exception {
+ schemaVersion = SCHEMA_V3;
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
+ ContainerLayoutVersion layout = ContainerLayoutVersion.FILE_PER_BLOCK;
+ ContainerSet realSet = newContainerSet();
+ volume1 = mockHddsVolume("uuid-1");
+ for (int i = 0; i <= 10; i++) {
+ KeyValueContainerData data =
+ new KeyValueContainerData(i,
+ layout,
+ ContainerTestHelper.CONTAINER_MAX_SIZE,
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString());
+ data.setSchemaVersion(schemaVersion);
+ data.setVolume(volume1);
+ KeyValueContainer container = new KeyValueContainer(data, conf);
+ data.closeContainer();
+ realSet.addContainer(container);
+ }
+
+ KeyValueContainer oldContainer =
+ (KeyValueContainer) realSet.getContainer(STALE_CONTAINER_ID);
+ KeyValueContainerData newReplicaData =
+ new KeyValueContainerData((KeyValueContainerData)
oldContainer.getContainerData());
+ newReplicaData.setVolume(volume1);
+ newReplicaData.closeContainer();
+ KeyValueContainer newContainer = new KeyValueContainer(newReplicaData,
conf);
+
+ AtomicInteger getContainerSequence = new AtomicInteger(0);
+ ContainerSet containerSetSpy = spy(realSet);
+ doAnswer(invocation -> {
+ long id = invocation.getArgument(0);
+ if (id != STALE_CONTAINER_ID) {
+ return invocation.callRealMethod();
+ }
+ int seq = getContainerSequence.getAndIncrement();
+ if (seq == 0) {
+ return oldContainer;
+ }
+ if (seq == 1) {
+ return newContainer;
+ }
+ return newContainer;
+ }).when(containerSetSpy).getContainer(anyLong());
+
+ when(ozoneContainer.getContainerSet()).thenReturn(containerSetSpy);
+ containerSet = containerSetSpy;
+
+ DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
+ handler = spy(new DeleteBlocksCommandHandler(ozoneContainer, conf, dnConf,
""));
+ blockDeleteMetrics = handler.getBlockDeleteMetrics();
+ TestSchemaHandler testSchemaHandler1 = spy(new TestSchemaHandler());
+ TestSchemaHandler testSchemaHandler2 = spy(new TestSchemaHandler());
+ TestSchemaHandler testSchemaHandler3 = spy(new TestSchemaHandler());
+ handler.getSchemaHandlers().put(SCHEMA_V1, testSchemaHandler1);
+ handler.getSchemaHandlers().put(SCHEMA_V2, testSchemaHandler2);
+ handler.getSchemaHandlers().put(SCHEMA_V3, testSchemaHandler3);
+
+ DeletedBlocksTransaction transaction =
+ createDeletedBlocksTransaction(77L, STALE_CONTAINER_ID);
+ List<DeleteBlockTransactionResult> results =
+ handler.executeCmdWithRetry(Collections.singletonList(transaction));
+
+ assertEquals(1, results.size());
+ assertTrue(results.get(0).getSuccess());
+ verify(handler, times(2)).submitTasks(any());
+ verify(handler.getSchemaHandlers().get(SCHEMA_V3), times(1))
+ .handle(eq(newReplicaData), eq(transaction));
+ verify(handler.getSchemaHandlers().get(SCHEMA_V3), never())
+ .handle(eq((KeyValueContainerData) oldContainer.getContainerData()),
any());
+ assertEquals(0, blockDeleteMetrics.getTotalLockTimeoutTransactionCount());
+ }
+
private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID,
long containerID) {
return DeletedBlocksTransaction.newBuilder()
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerWithConcurrentBackgroundTasks.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerWithConcurrentBackgroundTasks.java
new file mode 100644
index 00000000000..2463d0f872f
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerWithConcurrentBackgroundTasks.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.FaultInjector;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
+import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingTask;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * The balancer thread holds a <em>read</em> lock on the old replica while it
copies the
+ * container and then calls {@link ContainerSet#updateContainer} so the map
points at the new
+ * replica on another disk. Concurrent delete / block-deletion / unhealthy /
close paths resolve
+ * the live container by id via {@link ContainerSet#getContainerWithWriteLock}
and then operate on
+ * that instance so paths, DB, and state match the destination replica.
+ */
+@Timeout(60)
+class TestDiskBalancerWithConcurrentBackgroundTasks {
+
+ @TempDir
+ private java.nio.file.Path tmpDir;
+
+ private File testDir;
+ private final String scmId = UUID.randomUUID().toString();
+ private final String datanodeUuid = UUID.randomUUID().toString();
+ private final OzoneConfiguration conf = new OzoneConfiguration();
+
+ private OzoneContainer ozoneContainer;
+ private ContainerSet containerSet;
+ private MutableVolumeSet volumeSet;
+ private KeyValueHandler keyValueHandler;
+ private ContainerChecksumTreeManager checksumTreeManager;
+ private DiskBalancerServiceTestImpl diskBalancerService;
+
+ private HddsVolume hotVolume;
+ private HddsVolume coldVolume;
+
+ private static final long CONTAINER_ID = 42L;
+ private static final long CONTAINER_SIZE = 1024L * 1024L;
+ private static final long SEEDED_PENDING_BLOCKS = 2L;
+ private static final long SEEDED_PENDING_BYTES = 2048L;
+
+ /**
+ * Pauses disk balancer immediately after {@link
ContainerSet#updateContainer} (map points at
+ * the new replica) but before readUnlock, so another thread can run while
the balancer
+ * still holds readLock on the old container.
+ */
+ private static final class AfterInMemoryUpdateInjector extends FaultInjector
{
+ private final CountDownLatch reachedSwapPoint = new CountDownLatch(1);
+ private final CountDownLatch continueBalancer = new CountDownLatch(1);
+
+ @Override
+ public void pause() throws IOException {
+ // Signal the test thread that the race window has started, then block
the balancer.
+ reachedSwapPoint.countDown();
+ try {
+ continueBalancer.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+
+ void awaitSwapPoint() throws InterruptedException, TimeoutException {
+ if (!reachedSwapPoint.await(60, TimeUnit.SECONDS)) {
+ throw new TimeoutException("balancer did not reach
post-updateContainer hook");
+ }
+ }
+
+ // Unblock the balancer so it can readUnlock and finish the move.
+ void continueBalancer() {
+ continueBalancer.countDown();
+ }
+ }
+
+ @BeforeEach
+ void setup() throws Exception {
+ testDir = tmpDir.toFile();
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+
+ conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+ testDir.getAbsolutePath() + "/vol0,"
+ + testDir.getAbsolutePath() + "/vol1,"
+ + testDir.getAbsolutePath() + "/vol2");
+ conf.setClass(SpaceUsageCheckFactory.Conf.configKeyForClassName(),
+ MockSpaceUsageCheckFactory.HalfTera.class,
+ SpaceUsageCheckFactory.class);
+
+ volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+
+ List<StorageVolume> volumes = volumeSet.getVolumesList();
+ HddsVolume v0 = (HddsVolume) volumes.get(0);
+ HddsVolume v1 = (HddsVolume) volumes.get(1);
+ HddsVolume v2 = (HddsVolume) volumes.get(2);
+
+ for (HddsVolume v : new HddsVolume[] {v0, v1, v2}) {
+ v.incrementUsedSpace(0 - v.getCurrentUsage().getUsedSpace());
+ }
+
+ long capacity = v0.getCurrentUsage().getCapacity();
+ v0.incrementUsedSpace(capacity / 20);
+ v1.incrementUsedSpace(capacity / 20);
+ v2.incrementUsedSpace(capacity / 2);
+
+ coldVolume = coldestVolume(v0, v1, v2);
+ hotVolume = hottestVolume(v0, v1, v2);
+
+ containerSet = ContainerSet.newReadOnlyContainerSet(1000);
+ ContainerMetrics containerMetrics = ContainerMetrics.create(conf);
+ checksumTreeManager = new ContainerChecksumTreeManager(conf);
+ keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
+ containerSet, volumeSet, containerMetrics, c -> { },
+ checksumTreeManager);
+ keyValueHandler.setClusterID(scmId);
+
+ Map<ContainerProtos.ContainerType, Handler> handlers = new HashMap<>();
+ handlers.put(ContainerProtos.ContainerType.KeyValueContainer,
keyValueHandler);
+ ContainerController controller = new ContainerController(containerSet,
handlers);
+ ContainerDispatcher dispatcher = mock(ContainerDispatcher.class);
+
when(dispatcher.getHandler(ContainerProtos.ContainerType.KeyValueContainer))
+ .thenReturn(keyValueHandler);
+
+ ozoneContainer = mock(OzoneContainer.class);
+ when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+ when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet);
+ when(ozoneContainer.getController()).thenReturn(controller);
+ when(ozoneContainer.getDispatcher()).thenReturn(dispatcher);
+
+ DiskBalancerConfiguration diskBalancerConfiguration =
conf.getObject(DiskBalancerConfiguration.class);
+ diskBalancerConfiguration.setDiskBalancerShouldRun(true);
+ conf.setFromObject(diskBalancerConfiguration);
+ diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer, 100,
conf, 1);
+ // Immediate cleanup of the source replica after a move
+ diskBalancerService.setReplicaDeletionDelay(0);
+ }
+
+ @AfterEach
+ void cleanup() throws IOException {
+ DiskBalancerService.setInjector(null);
+ if (diskBalancerService != null) {
+ diskBalancerService.shutdown();
+ }
+ BlockUtils.shutdownCache(conf);
+ if (volumeSet != null) {
+ volumeSet.shutdown();
+ }
+ if (testDir != null && testDir.exists()) {
+ FileUtils.deleteDirectory(testDir);
+ }
+ }
+
+ /**
+ * Force-delete with a stale {@link Container} handle still targets the
container id; after a
+ * DiskBalancer swap, {@code deleteInternal} locks the live map entry
(destination replica) and
+ * applies deletion there — not on the old source object passed in from the
RPC.
+ */
+ @ContainerTestVersionInfo.ContainerTest
+ void containerDeleteStaleRefKeepsSwappedReplica(ContainerTestVersionInfo
versionInfo)
+ throws Exception {
+ // Capture delete path logs: deleteInternal and diskBalancer
+ LogCapturer kvLogs = LogCapturer.captureLogs(KeyValueHandler.class);
+ LogCapturer diskBalancerLogs =
LogCapturer.captureLogs(DiskBalancerService.class);
+
+ String schemaVersion = versionInfo.getSchemaVersion();
+ ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+ KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID,
hotVolume, versionInfo);
+ Container staleContainerRef = oldReplica;
+ String oldReplicaPathOnHot =
oldReplica.getContainerData().getContainerPath();
+ assertThat(new File(oldReplicaPathOnHot)).exists();
+
+ // Install injector so balancer pauses right after ContainerSet points at
the new location of replica.
+ AfterInMemoryUpdateInjector raceInjector = new
AfterInMemoryUpdateInjector();
+ DiskBalancerService.setInjector(raceInjector);
+
+ DiskBalancerService.DiskBalancerTask task =
+ (DiskBalancerService.DiskBalancerTask)
diskBalancerService.getTasks().poll();
+ assertNotNull(task);
+
+ // Run the move on a background thread; it will block inside the injector.
+ CompletableFuture<Void> balancerDone =
+ CompletableFuture.runAsync(() -> task.call());
+
+ // Wait until updateContainer(newReplica) is done; balancer still holds
readLock on old replica.
+ raceInjector.awaitSwapPoint();
+
+ Container<?> currentContainerRef = containerSet.getContainer(CONTAINER_ID);
+ assertNotNull(currentContainerRef);
+ assertNotEquals(staleContainerRef, currentContainerRef,
+ "ContainerSet should reference the new replica before readUnlock");
+ assertEquals(coldVolume,
currentContainerRef.getContainerData().getVolume(),
+ "Replica should already be on the destination volume");
+
+ String destinationPathBeforeDelete =
+ currentContainerRef.getContainerData().getContainerPath();
+
+ // Start RM-style force delete using the stale Container handle;
deleteInternal resolves id 42,
+ // acquires writeLock on the live map entry (destination replica), not on
the stale source handle.
+ CountDownLatch deleteThreadPastSchedule = new CountDownLatch(1);
+ CompletableFuture<Void> deleteDone = CompletableFuture.runAsync(() -> {
+ deleteThreadPastSchedule.countDown();
+ try {
+ keyValueHandler.deleteContainer(staleContainerRef, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertThat(deleteThreadPastSchedule.await(10, TimeUnit.SECONDS)).isTrue();
+ Thread.sleep(200);
+
+ raceInjector.continueBalancer();
+
+ CompletableFuture.allOf(balancerDone, deleteDone).get(60,
TimeUnit.SECONDS);
+
+ assertThat(kvLogs.getOutput()).doesNotContain("reference is stale");
+
+ // Old replica: disk balancer marks it DELETED after the move.
+ assertEquals(State.DELETED, staleContainerRef.getContainerState());
+ assertEquals(hotVolume, staleContainerRef.getContainerData().getVolume());
+
+ // Live map entry was removed by force-delete on the destination replica.
+ assertNull(containerSet.getContainer(CONTAINER_ID));
+ assertThat(new File(destinationPathBeforeDelete)).doesNotExist();
+
+ // Disk balancer delayed cleanup removes the old replica from the source
path — not RM delete.
+ assertThat(new File(oldReplicaPathOnHot)).doesNotExist();
+ GenericTestUtils.waitFor(
+ () -> diskBalancerLogs.getOutput().contains("Deleted expired container
42 after delay")
+ &&
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+ 100, 10_000);
+ }
+
+ /**
+ * BlockDeletingTask queued with stale ref of KeyValueContainerData still
resolves
+ * the live replica by id; after {@code getContainerWithWriteLock}, it uses
the destination
+ * replica's DB and paths (updated {@code containerData} field).
+ */
+ @ContainerTestVersionInfo.ContainerTest
+ void blockTaskStaleDataKeepsPendingOnDestination(ContainerTestVersionInfo
versionInfo)
+ throws Exception {
+ LogCapturer logs = LogCapturer.captureLogs(BlockDeletingTask.class);
+ LogCapturer diskBalancerLogs =
LogCapturer.captureLogs(DiskBalancerService.class);
+ String schemaVersion = versionInfo.getSchemaVersion();
+ ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+ KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID,
hotVolume, versionInfo);
+ seedPendingDeletionInMetadata(oldReplica);
+ KeyValueContainerData staleReplicaData = oldReplica.getContainerData();
+
+ // Balancer injector — pause after map swap, before readUnlock.
+ AfterInMemoryUpdateInjector raceInjector = new
AfterInMemoryUpdateInjector();
+ DiskBalancerService.setInjector(raceInjector);
+
+ DiskBalancerService.DiskBalancerTask balancerTask =
+ (DiskBalancerService.DiskBalancerTask)
diskBalancerService.getTasks().poll();
+ assertNotNull(balancerTask);
+ CompletableFuture<Void> balancerDone =
+ CompletableFuture.runAsync(() -> balancerTask.call());
+
+ // We are past updateContainer; new replica is in ContainerSet; balancer
still read-locks old replica.
+ raceInjector.awaitSwapPoint();
+
+ // New ContainerData instance must differ from what the queued block task
still holds.
+ Container<?> newReplicaData = containerSet.getContainer(CONTAINER_ID);
+ assertNotNull(newReplicaData);
+ assertNotEquals(staleReplicaData, newReplicaData.getContainerData());
+ KeyValueContainerData newData = (KeyValueContainerData)
newReplicaData.getContainerData();
+ long pendingBefore = readPendingDeleteBlockCount(newData);
+ assertEquals(pendingBefore, SEEDED_PENDING_BLOCKS,
+ "new replica should report same pending deletions from copied
metadata");
+
+ BlockDeletingService blockDeletingService =
+ new BlockDeletingService(ozoneContainer, 500, 500,
TimeUnit.MILLISECONDS, 1, conf,
+ checksumTreeManager);
+ BlockDeletingService.ContainerBlockInfo blockInfo =
+ new BlockDeletingService.ContainerBlockInfo(staleReplicaData,
SEEDED_PENDING_BLOCKS + 10);
+ BlockDeletingTask blockDeletingTask =
+ new BlockDeletingTask(blockDeletingService, blockInfo,
checksumTreeManager, 1);
+
+ // Run block deletion concurrently; getContainerWithWriteLock targets the
live map entry on the destination.
+ CountDownLatch blockThreadStarted = new CountDownLatch(1);
+ CompletableFuture<Void> blockDone = CompletableFuture.runAsync(() -> {
+ blockThreadStarted.countDown();
+ try {
+ blockDeletingTask.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertThat(blockThreadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+ Thread.sleep(200);
+
+ raceInjector.continueBalancer();
+ CompletableFuture.allOf(balancerDone, blockDone).get(60, TimeUnit.SECONDS);
+
+ assertThat(logs.getOutput()).doesNotContain("reference is stale");
+
+ KeyValueContainerData newContainerData =
+ (KeyValueContainerData)
containerSet.getContainer(CONTAINER_ID).getContainerData();
+ assertNotNull(newContainerData);
+
assertThat(readPendingDeleteBlockCount(newContainerData)).isLessThanOrEqualTo(pendingBefore);
+ assertThat(new File(newContainerData.getChunksPath())).exists();
+ // Disk balancer delayed cleanup removes the old replica from the source
path — not RM delete.
+ assertThat(new File(staleReplicaData.getContainerPath())).doesNotExist();
+ GenericTestUtils.waitFor(
+ () -> diskBalancerLogs.getOutput().contains("Deleted expired container
42 after delay")
+ &&
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+ 100, 10_000);
+ }
+
+ /**
+ * {@link KeyValueHandler#markContainerUnhealthy} with a stale container
+ * reference while DiskBalancer has already run {@link
ContainerSet#updateContainer}
+ * (map = destination). Without {@link
ContainerSet#getContainerWithWriteLock}, the handler would
+ * take writeLock on the old object and mark it unhealthy after {@code
markContainerForDelete}
+ * turns it {@link State#DELETED}, sending a false UNHEALTHY ICR for a
replica SCM no longer tracks.
+ * With the fix, the live map entry (destination) is locked and marked
{@link State#UNHEALTHY}.
+ */
+ @ContainerTestVersionInfo.ContainerTest
+ void markUnhealthyAppliedOnDestVolumeContainer(
+ ContainerTestVersionInfo versionInfo) throws Exception {
+ LogCapturer diskBalancerLogs =
LogCapturer.captureLogs(DiskBalancerService.class);
+ String schemaVersion = versionInfo.getSchemaVersion();
+ ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+ KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID,
hotVolume, versionInfo);
+ Container staleContainerRef = oldReplica;
+
+ AfterInMemoryUpdateInjector raceInjector = new
AfterInMemoryUpdateInjector();
+ DiskBalancerService.setInjector(raceInjector);
+
+ DiskBalancerService.DiskBalancerTask balancerTask =
+ (DiskBalancerService.DiskBalancerTask)
diskBalancerService.getTasks().poll();
+ assertNotNull(balancerTask);
+ CompletableFuture<Void> balancerDone =
+ CompletableFuture.runAsync(balancerTask::call);
+
+ raceInjector.awaitSwapPoint();
+
+ Container<?> liveReplica = containerSet.getContainer(CONTAINER_ID);
+ assertNotNull(liveReplica);
+ assertNotEquals(staleContainerRef, liveReplica,
+ "ContainerSet should reference the destination replica at the race
hook");
+ assertEquals(State.CLOSED, liveReplica.getContainerState());
+
+ ScanResult reason = getUnhealthyDataScanResult();
+ CountDownLatch unhealthyThreadStarted = new CountDownLatch(1);
+ CompletableFuture<Void> unhealthyDone = CompletableFuture.runAsync(() -> {
+ unhealthyThreadStarted.countDown();
+ try {
+ keyValueHandler.markContainerUnhealthy(staleContainerRef, reason);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertThat(unhealthyThreadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+ Thread.sleep(200);
+
+ raceInjector.continueBalancer();
+ CompletableFuture.allOf(balancerDone, unhealthyDone).get(60,
TimeUnit.SECONDS);
+
+ Container<?> afterMove = containerSet.getContainer(CONTAINER_ID);
+ assertNotNull(afterMove);
+ assertSame(liveReplica, afterMove);
+ assertEquals(State.UNHEALTHY, afterMove.getContainerState(),
+ "UNHEALTHY must apply to the live destination replica, not the stale
source handle");
+
+ assertEquals(State.DELETED, staleContainerRef.getContainerState());
+ assertEquals(hotVolume, staleContainerRef.getContainerData().getVolume());
+
+ GenericTestUtils.waitFor(
+ () -> diskBalancerLogs.getOutput().contains("Deleted expired container
42 after delay")
+ &&
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+ 100, 10_000);
+ }
+
+ /**
+ * SCM closeContainer with a stale source container while the map already
references
+ * the destination after {@link ContainerSet#updateContainer}. Without
resolving the live instance,
+ * the close would run on the source after it is DELETED and throw, leaving
the destination
+ * {@link State#QUASI_CLOSED}. With {@link
ContainerSet#getContainerWithWriteLock}, the destination
+ * is closed to {@link State#CLOSED}.
+ */
+ @ContainerTestVersionInfo.ContainerTest
+ void closeContainerAppliesOnDestVolumeContainer(
+ ContainerTestVersionInfo versionInfo) throws Exception {
+ LogCapturer diskBalancerLogs =
LogCapturer.captureLogs(DiskBalancerService.class);
+ String schemaVersion = versionInfo.getSchemaVersion();
+ ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+
+ KeyValueContainer oldReplica = createClosedContainer(CONTAINER_ID,
hotVolume, versionInfo);
+ persistQuasiClosedState(oldReplica);
+ Container staleContainerRef = oldReplica;
+
+ AfterInMemoryUpdateInjector raceInjector = new
AfterInMemoryUpdateInjector();
+ DiskBalancerService.setInjector(raceInjector);
+
+ DiskBalancerService.DiskBalancerTask balancerTask =
+ (DiskBalancerService.DiskBalancerTask)
diskBalancerService.getTasks().poll();
+ assertNotNull(balancerTask);
+ CompletableFuture<Void> balancerDone =
+ CompletableFuture.runAsync(balancerTask::call);
+
+ raceInjector.awaitSwapPoint();
+
+ Container<?> liveReplica = containerSet.getContainer(CONTAINER_ID);
+ assertNotNull(liveReplica);
+ assertNotEquals(staleContainerRef, liveReplica);
+ assertEquals(State.QUASI_CLOSED, liveReplica.getContainerState());
+
+ CountDownLatch closeThreadStarted = new CountDownLatch(1);
+ CompletableFuture<Void> closeDone = CompletableFuture.runAsync(() -> {
+ closeThreadStarted.countDown();
+ try {
+ keyValueHandler.closeContainer(staleContainerRef);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertThat(closeThreadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+ Thread.sleep(200);
+
+ raceInjector.continueBalancer();
+ CompletableFuture.allOf(balancerDone, closeDone).get(60, TimeUnit.SECONDS);
+
+ Container<?> afterMove = containerSet.getContainer(CONTAINER_ID);
+ assertNotNull(afterMove);
+ assertSame(liveReplica, afterMove);
+ assertEquals(State.CLOSED, afterMove.getContainerState(),
+ "CLOSED transition must apply to the live destination replica");
+
+ assertEquals(State.DELETED, staleContainerRef.getContainerState());
+
+ GenericTestUtils.waitFor(
+ () -> diskBalancerLogs.getOutput().contains("Deleted expired container
42 after delay")
+ &&
diskBalancerLogs.getOutput().contains(String.valueOf(CONTAINER_ID)),
+ 100, 10_000);
+ }
+
+ /**
+ * Makes {@link State#QUASI_CLOSED} visible on disk so import/copy sees the
same state
+ * the in-memory container had before the move.
+ */
+ private void persistQuasiClosedState(KeyValueContainer container) throws
IOException {
+ KeyValueContainerData data = container.getContainerData();
+ data.setState(State.QUASI_CLOSED);
+ File containerFile = ContainerUtils.getContainerFile(new
File(data.getContainerPath()));
+ ContainerDataYaml.createContainerFile(data, containerFile);
+ }
+
+ private long readPendingDeleteBlockCount(KeyValueContainerData data) throws
IOException {
+ try (DBHandle db = BlockUtils.getDB(data, conf)) {
+ Table<String, Long> meta = db.getStore().getMetadataTable();
+ Long v = meta.get(data.getPendingDeleteBlockCountKey());
+ return v == null ? 0L : v;
+ }
+ }
+
+ /**
+ * Persists pending-deletion counters in container metadata so they survive
+ * disk balancer copy/import and can be verified on the destination replica.
+ */
+ private void seedPendingDeletionInMetadata(KeyValueContainer container)
throws IOException {
+ KeyValueContainerData data = container.getContainerData();
+ try (DBHandle metadata = BlockUtils.getDB(data, conf)) {
+ Table<String, Long> meta = metadata.getStore().getMetadataTable();
+ meta.put(data.getPendingDeleteBlockCountKey(), SEEDED_PENDING_BLOCKS);
+ meta.put(data.getPendingDeleteBlockBytesKey(), SEEDED_PENDING_BYTES);
+ }
+ data.incrPendingDeletionBlocks(SEEDED_PENDING_BLOCKS,
SEEDED_PENDING_BYTES);
+ }
+
+ private static HddsVolume coldestVolume(HddsVolume... volumes) {
+ return Arrays.stream(volumes)
+ .min(volumePolicyOrder())
+ .get();
+ }
+
+ private static HddsVolume hottestVolume(HddsVolume... volumes) {
+ return Arrays.stream(volumes)
+ .max(volumePolicyOrder())
+ .get();
+ }
+
+ private static Comparator<HddsVolume> volumePolicyOrder() {
+ return Comparator
+ .comparingDouble((HddsVolume v) -> {
+ SpaceUsageSource usage = v.getCurrentUsage();
+ return (double) (usage.getCapacity() - usage.getAvailable()) /
usage.getCapacity();
+ })
+ .thenComparing(HddsVolume::getStorageID);
+ }
+
+ private KeyValueContainer createClosedContainer(long containerId, HddsVolume
vol,
+ ContainerTestVersionInfo versionInfo)
+ throws IOException {
+ KeyValueContainerData containerData = new KeyValueContainerData(
+ containerId, versionInfo.getLayout(), CONTAINER_SIZE,
+ UUID.randomUUID().toString(), datanodeUuid);
+ containerData.setState(State.CLOSED);
+ containerData.getStatistics().setBlockBytesForTesting(CONTAINER_SIZE);
+ containerData.setSchemaVersion(versionInfo.getSchemaVersion());
+
+ KeyValueContainer container = new KeyValueContainer(containerData, conf);
+ VolumeChoosingPolicy policy = mock(VolumeChoosingPolicy.class);
+ when(policy.chooseVolume(any(List.class), anyLong())).thenReturn(vol);
+ container.create((VolumeSet) volumeSet, policy, scmId);
+ containerSet.addContainer(container);
+ vol.incrementUsedSpace(containerData.getBytesUsed());
+ return container;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 0385564ebf7..7ba862389d1 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -370,18 +370,20 @@ private ContainerCommandRequestProto
getDummyCommandRequestProto(
@ContainerLayoutTestInfo.ContainerTest
public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
throws IOException {
- KeyValueHandler keyValueHandler = createKeyValueHandler(tempDir);
conf = new OzoneConfiguration();
- KeyValueContainerData kvData = new
KeyValueContainerData(DUMMY_CONTAINER_ID,
- layoutVersion,
- (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
- UUID.randomUUID().toString());
- kvData.setMetadataPath(tempDir.toString());
- kvData.setDbFile(dbFile.toFile());
- KeyValueContainer container = new KeyValueContainer(kvData, conf);
+ conf.set(OZONE_SCM_CONTAINER_LAYOUT_KEY, layoutVersion.name());
+ HandlerWithVolumeSet handlerCtx = createKeyValueHandler(tempDir);
+ KeyValueHandler keyValueHandler = handlerCtx.getHandler();
+ ContainerSet containerSet = handlerCtx.getContainerSet();
+
+ long containerId = DUMMY_CONTAINER_ID + layoutVersion.getVersion();
ContainerCommandRequestProto createContainerRequest =
- createContainerRequest(DATANODE_UUID, DUMMY_CONTAINER_ID);
- keyValueHandler.handleCreateContainer(createContainerRequest, container);
+ createContainerRequest(DATANODE_UUID, containerId);
+ keyValueHandler.handleCreateContainer(createContainerRequest, null);
+
+ KeyValueContainer container =
+ (KeyValueContainer) containerSet.getContainer(containerId);
+ KeyValueContainerData kvData = container.getContainerData();
// Make the container state as invalid.
kvData.setState(ContainerProtos.ContainerDataProto.State.INVALID);
@@ -390,12 +392,11 @@ public void
testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
ContainerCommandRequestProto closeContainerRequest =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CloseContainer)
- .setContainerID(DUMMY_CONTAINER_ID)
+ .setContainerID(containerId)
.setDatanodeUuid(DATANODE_UUID)
.setCloseContainer(ContainerProtos.CloseContainerRequestProto
.getDefaultInstance())
.build();
- dispatcher.dispatch(closeContainerRequest, null);
// Closing invalid container should return error response.
ContainerProtos.ContainerCommandResponseProto response =
@@ -923,7 +924,7 @@ private static ContainerCommandRequestProto
createContainerRequest(
.build();
}
- private KeyValueHandler createKeyValueHandler(Path path) throws IOException {
+ private HandlerWithVolumeSet createKeyValueHandler(Path path) throws
IOException {
final ContainerSet containerSet = newContainerSet();
final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
@@ -950,7 +951,7 @@ private KeyValueHandler createKeyValueHandler(Path path)
throws IOException {
conf.getObject(ContainerScannerConfiguration.class), controller);
containerSet.registerOnDemandScanner(onDemandScanner);
- return kvHandler;
+ return new HandlerWithVolumeSet(kvHandler, volumeSet, containerSet);
}
private static class HandlerWithVolumeSet {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index 8361959e6da..358b56157fc 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -29,6 +29,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
@@ -220,7 +221,8 @@ void testNPEFromPutBlock() throws IOException {
@Test
public void testMarkContainerUnhealthyInFailedVolume() throws IOException {
- KeyValueHandler handler = getDummyHandler();
+ ContainerSet containerSet = mock(ContainerSet.class);
+ KeyValueHandler handler = getDummyHandler(containerSet);
KeyValueContainerData kvData = new KeyValueContainerData(1L,
ContainerLayoutVersion.FILE_PER_BLOCK,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
@@ -233,6 +235,10 @@ public void testMarkContainerUnhealthyInFailedVolume()
throws IOException {
.build();
kvData.setVolume(hddsVolume);
KeyValueContainer container = new KeyValueContainer(kvData, conf);
+ when(containerSet.getContainerWithWriteLock(eq(1L))).thenAnswer(invocation
-> {
+ container.writeLock();
+ return container;
+ });
// When volume is failed, the call to mark the container unhealthy should
// be ignored.
@@ -252,6 +258,10 @@ public void testMarkContainerUnhealthyInFailedVolume()
throws IOException {
// -- Helper methods below.
private KeyValueHandler getDummyHandler() {
+ return getDummyHandler(mock(ContainerSet.class));
+ }
+
+ private KeyValueHandler getDummyHandler(ContainerSet containerSet) {
DatanodeDetails dnDetails = DatanodeDetails.newBuilder()
.setUuid(UUID.fromString(DATANODE_UUID))
.setHostName("dummyHost")
@@ -263,7 +273,7 @@ private KeyValueHandler getDummyHandler() {
return new KeyValueHandler(
conf,
stateMachine.getDatanodeDetails().getUuidString(),
- mock(ContainerSet.class),
+ containerSet,
mock(MutableVolumeSet.class),
mock(ContainerMetrics.class), mockIcrSender, new
ContainerChecksumTreeManager(conf));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]