This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new 575ca9ead66 HDDS-12434. [DiskBalancer] Refactor DiskBalancerTask#call 
to improve the atomicity of container move (#8693)
575ca9ead66 is described below

commit 575ca9ead66eca2b71cb9b273a25d60a50b3c1ce
Author: Gargi Jaiswal <134698352+gargi-jai...@users.noreply.github.com>
AuthorDate: Wed Jul 30 13:51:01 2025 +0530

    HDDS-12434. [DiskBalancer] Refactor DiskBalancerTask#call to improve the 
atomicity of container move (#8693)
---
 .../diskbalancer/DiskBalancerService.java          |  71 ++++-
 .../container/keyvalue/KeyValueContainer.java      |  44 +--
 .../ozone/container/keyvalue/KeyValueHandler.java  |  38 +--
 .../diskbalancer/DiskBalancerServiceTestImpl.java  |   4 +-
 .../diskbalancer/TestDiskBalancerTask.java         | 344 +++++++++++++++++++++
 5 files changed, 444 insertions(+), 57 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 52a6f74aca4..efc06463d61 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -36,6 +36,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
@@ -55,7 +56,9 @@
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
 import 
org.apache.hadoop.ozone.container.diskbalancer.policy.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.apache.hadoop.util.Time;
@@ -447,7 +450,7 @@ private boolean shouldDelay() {
     return false;
   }
 
-  private class DiskBalancerTask implements BackgroundTask {
+  protected class DiskBalancerTask implements BackgroundTask {
 
     private HddsVolume sourceVolume;
     private HddsVolume destVolume;
@@ -468,15 +471,36 @@ public BackgroundTaskResult call() {
       boolean destVolumeIncreased = false;
       Path diskBalancerTmpDir = null, diskBalancerDestDir = null;
       long containerSize = containerData.getBytesUsed();
+      String originalContainerChecksum = 
containerData.getContainerFileChecksum();
       try {
+        // Step 1: Copy container to new Volume's tmp Dir
         diskBalancerTmpDir = destVolume.getTmpDir().toPath()
             .resolve(DISK_BALANCER_DIR).resolve(String.valueOf(containerId));
-
-        // Copy container to new Volume's tmp Dir
         ozoneContainer.getController().copyContainer(containerData,
             diskBalancerTmpDir);
 
-        // Move container directory to final place on new volume
+        // Step 2: verify checksum and Transition Temp container to Temp 
C1-RECOVERING
+        File tempContainerFile = ContainerUtils.getContainerFile(
+            diskBalancerTmpDir.toFile());
+        if (!tempContainerFile.exists()) {
+          throw new IOException("ContainerFile for container " + containerId
+              + " doesn't exist in temp directory "
+              + tempContainerFile.getAbsolutePath());
+        }
+        ContainerData tempContainerData = ContainerDataYaml
+            .readContainerFile(tempContainerFile);
+        String copiedContainerChecksum = 
tempContainerData.getContainerFileChecksum();
+        if (!originalContainerChecksum.equals(copiedContainerChecksum)) {
+          throw new IOException("Container checksum mismatch for container "
+              + containerId + ". Original: " + originalContainerChecksum
+              + ", Copied: " + copiedContainerChecksum);
+        }
+        
tempContainerData.setState(ContainerProtos.ContainerDataProto.State.RECOVERING);
+
+        // overwrite the .container file with the new state.
+        ContainerDataYaml.createContainerFile(tempContainerData, 
tempContainerFile);
+
+        // Step 3: Move container directory to final place on new volume
         String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
             destVolume, destVolume.getClusterID());
         diskBalancerDestDir =
@@ -491,7 +515,7 @@ public BackgroundTaskResult call() {
             StandardCopyOption.ATOMIC_MOVE,
             StandardCopyOption.REPLACE_EXISTING);
 
-        // Generate a new Container based on destDir
+        // Generate a new Container based on destDir which is in C1-RECOVERING 
state.
         File containerFile = ContainerUtils.getContainerFile(
             diskBalancerDestDir.toFile());
         if (!containerFile.exists()) {
@@ -506,16 +530,40 @@ public BackgroundTaskResult call() {
             .incrementUsedSpace(containerSize);
         destVolumeIncreased = true;
 
-        // Update container for containerID
+        // The import process loaded the temporary RECOVERING state from disk.
+        // Now, restore the original state and persist it to the .container 
file.
+        newContainer.getContainerData().setState(containerData.getState());
+        newContainer.update(newContainer.getContainerData().getMetadata(), 
true);
+
+        // Step 5: Update container for containerID and delete old container.
         Container oldContainer = ozoneContainer.getContainerSet()
             .getContainer(containerId);
         oldContainer.writeLock();
         try {
+          // First, update the in-memory set to point to the new replica.
           ozoneContainer.getContainerSet().updateContainer(newContainer);
-          oldContainer.delete();
+
+          // Mark old container as DELETED and persist state.
+          oldContainer.getContainerData().setState(
+              ContainerProtos.ContainerDataProto.State.DELETED);
+          oldContainer.update(oldContainer.getContainerData().getMetadata(),
+              true);
+
+          // Remove the old container from the KeyValueContainerUtil.
+          try {
+            KeyValueContainerUtil.removeContainer(
+                (KeyValueContainerData) oldContainer.getContainerData(), conf);
+            oldContainer.delete();
+          } catch (IOException ex) {
+            LOG.warn("Failed to cleanup old container {} after move. It is " +
+                    "marked DELETED and will be handled by background 
scanners.",
+                containerId, ex);
+          }
         } finally {
           oldContainer.writeUnlock();
         }
+
+        //The move is now successful.
         oldContainer.getContainerData().getVolume()
             .decrementUsedSpace(containerSize);
         balancedBytesInLastWindow.addAndGet(containerSize);
@@ -642,6 +690,15 @@ public VolumeChoosingPolicy getVolumeChoosingPolicy() {
     return volumeChoosingPolicy;
   }
 
+  @VisibleForTesting
+  public DiskBalancerTask createDiskBalancerTask(ContainerData containerData, 
HddsVolume source, HddsVolume dest) {
+    inProgressContainers.add(containerData.getContainerID());
+    deltaSizes.put(source, deltaSizes.getOrDefault(source, 0L)
+        - containerData.getBytesUsed());
+    dest.incCommittedBytes(containerData.getBytesUsed());
+    return new DiskBalancerTask(containerData, source, dest);
+  }
+
   @VisibleForTesting
   public void setVolumeChoosingPolicy(VolumeChoosingPolicy 
volumeChoosingPolicy) {
     this.volumeChoosingPolicy = volumeChoosingPolicy;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index d1de21c32e4..ad287989536 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -631,19 +631,7 @@ public void importContainerData(InputStream input,
       }
 
       // delete all other temporary data in case of any exception.
-      try {
-        if (containerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
-          BlockUtils.removeContainerFromDB(containerData, config);
-        }
-        FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
-        FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
-        FileUtils.deleteDirectory(
-            new File(getContainerData().getContainerPath()));
-      } catch (Exception deleteex) {
-        LOG.error(
-            "Can not cleanup destination directories after a container import"
-                + " error (cid: {}", containerId, deleteex);
-      }
+      cleanupFailedImport();
       throw ex;
     } finally {
       writeUnlock();
@@ -695,27 +683,27 @@ public void importContainerData(Path containerPath) 
throws IOException {
         throw ex;
       }
       //delete all the temporary data in case of any exception.
-      try {
-        if (containerData.getSchemaVersion() != null &&
-            containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
-          BlockUtils.removeContainerFromDB(containerData, config);
-        }
-        FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
-        FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
-        FileUtils.deleteDirectory(
-            new File(getContainerData().getContainerPath()));
-      } catch (Exception deleteex) {
-        LOG.error(
-            "Can not cleanup destination directories after a container load"
-                + " error (cid" +
-                containerData.getContainerID() + ")", deleteex);
-      }
+      cleanupFailedImport();
       throw ex;
     } finally {
       writeUnlock();
     }
   }
 
+  private void cleanupFailedImport() {
+    try {
+      if (containerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
+        BlockUtils.removeContainerFromDB(containerData, config);
+      }
+      FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
+      FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
+      FileUtils.deleteDirectory(new 
File(getContainerData().getContainerPath()));
+    } catch (Exception ex) {
+      LOG.error("Failed to cleanup destination directories for container {}",
+          containerData.getContainerID(), ex);
+    }
+  }
+
   @Override
   public void exportContainerData(OutputStream destination,
       ContainerPacker<KeyValueContainerData> packer) throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 30090ef1b05..622515d0f87 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -1320,21 +1320,13 @@ private boolean checkContainerClose(KeyValueContainer 
kvContainer) {
   @Override
   public Container importContainer(ContainerData originalContainerData,
       final InputStream rawContainerStream,
-      final TarContainerPacker packer)
-      throws IOException {
-    Preconditions.checkState(originalContainerData instanceof
-        KeyValueContainerData, "Should be KeyValueContainerData instance");
-
-    KeyValueContainerData containerData = new KeyValueContainerData(
-        (KeyValueContainerData) originalContainerData);
-
-    KeyValueContainer container = new KeyValueContainer(containerData,
-        conf);
+      final TarContainerPacker packer) throws IOException {
+    KeyValueContainer container = createNewContainer(originalContainerData);
 
     HddsVolume targetVolume = originalContainerData.getVolume();
     populateContainerPathFields(container, targetVolume);
     container.importContainerData(rawContainerStream, packer);
-    ContainerLogger.logImported(containerData);
+    ContainerLogger.logImported(container.getContainerData());
     sendICR(container);
     return container;
 
@@ -1342,8 +1334,8 @@ public Container importContainer(ContainerData 
originalContainerData,
 
   @Override
   public void exportContainer(final Container container,
-                              final OutputStream outputStream,
-                              final TarContainerPacker packer)
+      final OutputStream outputStream,
+      final TarContainerPacker packer)
       throws IOException {
     final KeyValueContainer kvc = (KeyValueContainer) container;
     kvc.exportContainerData(outputStream, packer);
@@ -1568,14 +1560,7 @@ public void copyContainer(final Container container, 
Path destinationPath)
   @Override
   public Container importContainer(ContainerData originalContainerData,
       final Path containerPath) throws IOException {
-    Preconditions.checkState(originalContainerData instanceof
-        KeyValueContainerData, "Should be KeyValueContainerData instance");
-
-    KeyValueContainerData containerData = new KeyValueContainerData(
-        (KeyValueContainerData) originalContainerData);
-
-    KeyValueContainer container = new KeyValueContainer(containerData,
-        conf);
+    KeyValueContainer container = createNewContainer(originalContainerData);
 
     HddsVolume volume = HddsVolumeUtil.matchHddsVolume(
         StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
@@ -1590,6 +1575,17 @@ public Container importContainer(ContainerData 
originalContainerData,
     return container;
   }
 
+  private KeyValueContainer createNewContainer(
+      ContainerData originalContainerData) {
+    Preconditions.checkState(originalContainerData instanceof
+        KeyValueContainerData, "Should be KeyValueContainerData instance");
+
+    KeyValueContainerData containerData = new KeyValueContainerData(
+        (KeyValueContainerData) originalContainerData);
+
+    return new KeyValueContainer(containerData, conf);
+  }
+
   @Override
   public void deleteContainer(Container container, boolean force)
       throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
index b0594b7846a..ef150321907 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java
@@ -96,7 +96,9 @@ public void start() {
 
   @Override
   public void shutdown() {
-    testingThread.interrupt();
+    if (testingThread != null) {
+      testingThread.interrupt();
+    }
     super.shutdown();
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
new file mode 100644
index 00000000000..9208eacf09b
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+/**
+ * Tests the container move logic within DiskBalancerTask.
+ */
+@Timeout(60)
+public class TestDiskBalancerTask {
+  @TempDir
+  private Path tmpDir;
+
+  private File testRoot;
+  private final String scmId = UUID.randomUUID().toString();
+  private final String datanodeUuid = UUID.randomUUID().toString();
+  private final OzoneConfiguration conf = new OzoneConfiguration();
+
+  private OzoneContainer ozoneContainer;
+  private ContainerSet containerSet;
+  private ContainerController controller;
+  private MutableVolumeSet volumeSet;
+  private HddsVolume sourceVolume;
+  private HddsVolume destVolume;
+  private DiskBalancerServiceTestImpl diskBalancerService;
+
+  private static final long CONTAINER_ID = 1L;
+  private static final long CONTAINER_SIZE = 1024L * 1024L; // 1 MB
+
+  @BeforeEach
+  public void setup() throws Exception {
+    testRoot = tmpDir.toFile();
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
+
+    // Setup with 2 volumes
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+        testRoot.getAbsolutePath() + "/vol1," + testRoot.getAbsolutePath()
+            + "/vol2");
+    volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+
+    containerSet = ContainerSet.newReadOnlyContainerSet(1000);
+    ContainerMetrics containerMetrics = ContainerMetrics.create(conf);
+    KeyValueHandler keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
+        containerSet, volumeSet, containerMetrics, c -> {
+    }, new ContainerChecksumTreeManager(conf));
+
+    Map<ContainerProtos.ContainerType, Handler> handlers = new HashMap<>();
+    handlers.put(ContainerProtos.ContainerType.KeyValueContainer, 
keyValueHandler);
+    controller = new ContainerController(containerSet, handlers);
+    ozoneContainer = mock(OzoneContainer.class);
+    when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+    when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet);
+    when(ozoneContainer.getController()).thenReturn(controller);
+    when(ozoneContainer.getDispatcher())
+        .thenReturn(mock(ContainerDispatcher.class));
+
+    diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer,
+        100, conf, 1);
+
+    List<StorageVolume> volumes = volumeSet.getVolumesList();
+    sourceVolume = (HddsVolume) volumes.get(0);
+    destVolume = (HddsVolume) volumes.get(1);
+  }
+
+  @AfterEach
+  public void cleanup() throws IOException {
+    if (diskBalancerService != null) {
+      diskBalancerService.shutdown();
+    }
+
+    BlockUtils.shutdownCache(conf);
+    if (volumeSet != null) {
+      volumeSet.shutdown();
+    }
+    if (testRoot.exists()) {
+      FileUtils.deleteDirectory(testRoot);
+    }
+  }
+
+  @Test
+  public void moveSuccess() throws IOException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume);
+    long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+    long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    String oldContainerPath = container.getContainerData().getContainerPath();
+
+    DiskBalancerService.DiskBalancerTask task = 
getTask(container.getContainerData());
+    task.call();
+
+    Container newContainer = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(newContainer);
+    assertNotEquals(container, newContainer);
+    assertEquals(destVolume, newContainer.getContainerData().getVolume());
+    assertEquals(initialSourceUsed - CONTAINER_SIZE,
+        sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed + CONTAINER_SIZE,
+        destVolume.getCurrentUsage().getUsedSpace());
+    assertFalse(new File(oldContainerPath).exists());
+    assertTrue(
+        new File(newContainer.getContainerData().getContainerPath()).exists());
+    assertEquals(1,
+        diskBalancerService.getMetrics().getSuccessCount());
+    assertEquals(CONTAINER_SIZE,
+        diskBalancerService.getMetrics().getSuccessBytes());
+  }
+
+  @Test
+  public void moveFailsOnCopy() throws IOException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume);
+    long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+    long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    String oldContainerPath = container.getContainerData().getContainerPath();
+
+    // Use spy ContainerController to inject failure during copy
+    ContainerController spyController = spy(controller);
+    doThrow(new IOException("Mockito spy: copy failed"))
+        .when(spyController).copyContainer(any(ContainerData.class), 
any(Path.class));
+    when(ozoneContainer.getController()).thenReturn(spyController);
+
+    DiskBalancerService.DiskBalancerTask task = 
getTask(container.getContainerData());
+    task.call();
+
+    Container originalContainer = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(originalContainer);
+    assertEquals(container, originalContainer);
+    assertEquals(sourceVolume,
+        originalContainer.getContainerData().getVolume());
+    assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
+    assertTrue(new File(oldContainerPath).exists());
+    Path tempDir = destVolume.getTmpDir().toPath()
+        .resolve(DiskBalancerService.DISK_BALANCER_DIR);
+    assertFalse(Files.exists(tempDir),
+        "Temp directory should be cleaned up");
+    assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+  }
+
+  @Test
+  public void moveFailsOnImportContainer() throws IOException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume);
+    long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+    long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    String oldContainerPath = container.getContainerData().getContainerPath();
+
+    // Use spy to inject failure during the atomic move
+    ContainerController spyController = spy(controller);
+    doThrow(new IOException("Mockito spy: container import failed"))
+        .when(spyController).importContainer(any(ContainerData.class), 
any(Path.class));
+    when(ozoneContainer.getController()).thenReturn(spyController);
+
+    DiskBalancerService.DiskBalancerTask task = getTask(
+        container.getContainerData());
+    task.call();
+
+    Container originalContainer = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(originalContainer);
+    assertEquals(container, originalContainer);
+    assertEquals(sourceVolume, 
originalContainer.getContainerData().getVolume());
+    assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
+    assertTrue(new File(oldContainerPath).exists());
+    Path tempDir = destVolume.getTmpDir().toPath()
+        .resolve(DiskBalancerService.DISK_BALANCER_DIR)
+        .resolve(String.valueOf(CONTAINER_ID));
+    assertFalse(Files.exists(tempDir), "Temp copy should be cleaned up");
+    assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+  }
+
+  @Test
+  public void moveFailsDuringInMemoryUpdate() throws IOException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume);
+    long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+    long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+    String oldContainerPath = container.getContainerData().getContainerPath();
+
+    ContainerSet spyContainerSet = spy(containerSet);
+    doThrow(new StorageContainerException("Mockito spy: updateContainer 
failed",
+        CONTAINER_INTERNAL_ERROR))
+        .when(spyContainerSet).updateContainer(any(Container.class));
+    when(ozoneContainer.getContainerSet()).thenReturn(spyContainerSet);
+
+
+    DiskBalancerService.DiskBalancerTask task = getTask(
+        container.getContainerData());
+    task.call();
+
+    // Asserts for rollback
+    // The move succeeded on disk but should be reverted by the catch block
+    Container originalContainer = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(originalContainer);
+    assertEquals(container, originalContainer);
+    assertEquals(sourceVolume, 
originalContainer.getContainerData().getVolume());
+    assertEquals(initialSourceUsed, 
sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed, destVolume.getCurrentUsage().getUsedSpace());
+    assertTrue(new File(oldContainerPath).exists());
+
+    // Verify the partially moved container at destination is cleaned up
+    String idDir = container.getContainerData().getOriginNodeId();
+    Path finalDestPath = Paths.get(
+        KeyValueContainerLocationUtil.getBaseContainerLocation(
+            destVolume.getHddsRootDir().toString(), idDir,
+            container.getContainerData().getContainerID()));
+    assertFalse(Files.exists(finalDestPath),
+        "Moved container at destination should be cleaned up on failure");
+    assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
+  }
+
+  @Test
+  public void moveFailsDuringOldContainerRemove() throws IOException {
+    Container container = createContainer(CONTAINER_ID, sourceVolume);
+    long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
+    long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
+
+    // Use a static mock for the KeyValueContainer utility class
+    try (MockedStatic<KeyValueContainerUtil> mockedUtil =
+             mockStatic(KeyValueContainerUtil.class, 
Mockito.CALLS_REAL_METHODS)) {
+      // Stub the static method to throw an exception
+      mockedUtil.when(() -> KeyValueContainerUtil.removeContainer(
+              any(KeyValueContainerData.class), any(OzoneConfiguration.class)))
+          .thenThrow(new IOException("Mockito: old container delete() 
failed"));
+
+      DiskBalancerService.DiskBalancerTask task = getTask(
+          container.getContainerData());
+      task.call();
+    }
+
+    // Assertions for successful move despite old container cleanup failure
+    assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
+    assertEquals(0, diskBalancerService.getMetrics().getFailureCount());
+    assertEquals(CONTAINER_SIZE, 
diskBalancerService.getMetrics().getSuccessBytes());
+
+    // Verify new container is active on the destination volume
+    Container newContainer = containerSet.getContainer(CONTAINER_ID);
+    assertNotNull(newContainer);
+    assertEquals(destVolume, newContainer.getContainerData().getVolume());
+    assertTrue(new 
File(newContainer.getContainerData().getContainerPath()).exists());
+
+    // Verify volume usage is updated correctly
+    assertEquals(initialSourceUsed - CONTAINER_SIZE,
+        sourceVolume.getCurrentUsage().getUsedSpace());
+    assertEquals(initialDestUsed + CONTAINER_SIZE,
+        destVolume.getCurrentUsage().getUsedSpace());
+  }
+
+  private KeyValueContainer createContainer(long containerId, HddsVolume vol)
+      throws IOException {
+    KeyValueContainerData containerData = new KeyValueContainerData(
+        containerId, ContainerLayoutVersion.FILE_PER_BLOCK, CONTAINER_SIZE,
+        UUID.randomUUID().toString(), datanodeUuid);
+    containerData.setState(State.CLOSED);
+    containerData.getStatistics().setBlockBytesForTesting(CONTAINER_SIZE);
+
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
+    VolumeChoosingPolicy policy = mock(VolumeChoosingPolicy.class);
+    when(policy.chooseVolume(any(List.class), any(Long.class)))
+        .thenReturn(vol);
+    container.create((VolumeSet) volumeSet, policy, scmId);
+    containerSet.addContainer(container);
+
+    // Manually update volume usage for test purposes
+    vol.incrementUsedSpace(containerData.getBytesUsed());
+    return container;
+  }
+
+  private DiskBalancerService.DiskBalancerTask getTask(ContainerData data) {
+    return diskBalancerService.createDiskBalancerTask(data, sourceVolume,
+        destVolume);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@ozone.apache.org
For additional commands, e-mail: commits-h...@ozone.apache.org

Reply via email to