This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a5129d6953 HDDS-12810. Check and reserve space atomically in 
VolumeChoosingPolicy (#8360)
7a5129d6953 is described below

commit 7a5129d6953302430c4b7813b33902b29379de99
Author: Peter Lee <[email protected]>
AuthorDate: Wed May 14 16:43:05 2025 +0800

    HDDS-12810. Check and reserve space atomically in VolumeChoosingPolicy 
(#8360)
---
 .../ozone/container/common/impl/ContainerData.java |  20 +---
 .../ozone/container/common/impl/ContainerSet.java  |   2 -
 .../volume/CapacityVolumeChoosingPolicy.java       |  20 ++--
 .../volume/RoundRobinVolumeChoosingPolicy.java     |  11 +-
 .../container/keyvalue/KeyValueContainer.java      |  13 ++-
 .../ozone/container/keyvalue/KeyValueHandler.java  |   1 +
 .../ozone/container/ozoneimpl/ContainerReader.java | 100 ++++++++++-------
 .../container/replication/ContainerImporter.java   |  13 +--
 .../replication/DownloadAndImportReplicator.java   |  18 +--
 .../replication/SendContainerRequestHandler.java   |  16 +--
 .../common/impl/TestContainerPersistence.java      |   5 +-
 .../volume/TestCapacityVolumeChoosingPolicy.java   |  11 ++
 .../volume/TestRoundRobinVolumeChoosingPolicy.java |  13 +++
 .../container/keyvalue/TestKeyValueContainer.java  |  17 +++
 .../container/keyvalue/TestKeyValueHandler.java    |  80 ++++++++++++-
 .../container/ozoneimpl/TestContainerReader.java   |  31 +++++-
 .../TestDownloadAndImportReplicator.java           | 123 ++++++++++++++++++++
 .../replication/TestReplicationSupervisor.java     |  31 +++---
 .../TestSendContainerRequestHandler.java           | 124 +++++++++++++++++++--
 19 files changed, 494 insertions(+), 155 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index d431b494d78..7bb59247ca5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -29,7 +29,6 @@
 import static org.apache.hadoop.ozone.OzoneConsts.STATE;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import jakarta.annotation.Nullable;
@@ -214,21 +213,14 @@ public synchronized void 
setState(ContainerDataProto.State state) {
         (state != oldState)) {
       releaseCommitSpace();
     }
+  }
 
-    /**
-     * commit space when container transitions (back) to Open.
-     * when? perhaps closing a container threw an exception
-     */
-    if ((state == ContainerDataProto.State.OPEN) &&
-        (state != oldState)) {
-      Preconditions.checkState(getMaxSize() > 0);
-      commitSpace();
-    }
+  public boolean isCommittedSpace() {
+    return committedSpace;
   }
 
-  @VisibleForTesting
-  void setCommittedSpace(boolean committedSpace) {
-    this.committedSpace = committedSpace;
+  public void setCommittedSpace(boolean committed) {
+    committedSpace = committed;
   }
 
   /**
@@ -356,7 +348,7 @@ public synchronized void closeContainer() {
     setState(ContainerDataProto.State.CLOSED);
   }
 
-  private void releaseCommitSpace() {
+  public void releaseCommitSpace() {
     long unused = getMaxSize() - getBytesUsed();
 
     // only if container size < max size
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 9b5c89e1f73..0f5c19fd336 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -154,8 +154,6 @@ private boolean addContainer(Container<?> container, 
boolean overwrite) throws
         throw new StorageContainerException(e, 
ContainerProtos.Result.IO_EXCEPTION);
       }
       missingContainerSet.remove(containerId);
-      // wish we could have done this from ContainerData.setState
-      container.getContainerData().commitSpace();
       if (container.getContainerData().getState() == RECOVERING) {
         recoveringContainerMap.put(
             clock.millis() + recoveringTimeout, containerId);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
index 89c686645ea..e323eeb4b17 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java
@@ -22,7 +22,7 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -44,11 +44,8 @@ public class CapacityVolumeChoosingPolicy implements 
VolumeChoosingPolicy {
   private static final Logger LOG = LoggerFactory.getLogger(
       CapacityVolumeChoosingPolicy.class);
 
-  // Stores the index of the next volume to be returned.
-  private final Random random = new Random();
-
   @Override
-  public HddsVolume chooseVolume(List<HddsVolume> volumes,
+  public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
       long maxContainerSize) throws IOException {
 
     // No volumes available to choose from
@@ -69,9 +66,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
     }
 
     int count = volumesWithEnoughSpace.size();
-    if (count == 1) {
-      return volumesWithEnoughSpace.get(0);
-    } else {
+    HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
+    if (count > 1) {
       // Even if we don't have too many volumes in volumesWithEnoughSpace, this
       // algorithm will still help us choose the volume with larger
       // available space than other volumes.
@@ -83,8 +79,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
       // 4. vol2 + vol2: 25%, result is vol2
       // So we have a total of 75% chances to choose vol1, which meets our
       // expectation.
-      int firstIndex = random.nextInt(count);
-      int secondIndex = random.nextInt(count);
+      int firstIndex = ThreadLocalRandom.current().nextInt(count);
+      int secondIndex = ThreadLocalRandom.current().nextInt(count);
 
       HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex);
       HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex);
@@ -93,7 +89,9 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
           - firstVolume.getCommittedBytes();
       long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
           - secondVolume.getCommittedBytes();
-      return firstAvailable < secondAvailable ? secondVolume : firstVolume;
+      selectedVolume = firstAvailable < secondAvailable ? secondVolume : 
firstVolume;
     }
+    selectedVolume.incCommittedBytes(maxContainerSize);
+    return selectedVolume;
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
index 9945a3256b3..52c8c599703 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
@@ -22,7 +22,6 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.slf4j.Logger;
@@ -38,10 +37,10 @@ public class RoundRobinVolumeChoosingPolicy implements 
VolumeChoosingPolicy {
       RoundRobinVolumeChoosingPolicy.class);
 
   // Stores the index of the next volume to be returned.
-  private AtomicInteger nextVolumeIndex = new AtomicInteger(0);
+  private int nextVolumeIndex = 0;
 
   @Override
-  public HddsVolume chooseVolume(List<HddsVolume> volumes,
+  public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
       long maxContainerSize) throws IOException {
 
     // No volumes available to choose from
@@ -53,8 +52,7 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
 
     // since volumes could've been removed because of the failure
     // make sure we are not out of bounds
-    int nextIndex = nextVolumeIndex.get();
-    int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0;
+    int currentVolumeIndex = nextVolumeIndex < volumes.size() ? 
nextVolumeIndex : 0;
 
     int startVolumeIndex = currentVolumeIndex;
 
@@ -67,7 +65,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
 
       if (hasEnoughSpace) {
         logIfSomeVolumesOutOfSpace(filter, LOG);
-        nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex);
+        nextVolumeIndex = currentVolumeIndex;
+        volume.incCommittedBytes(maxContainerSize);
         return volume;
       }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 09cadd5d13f..030392045d5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -157,8 +157,15 @@ public void create(VolumeSet volumeSet, 
VolumeChoosingPolicy
           = StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
       while (true) {
         HddsVolume containerVolume;
+        String hddsVolumeDir;
         try {
           containerVolume = volumeChoosingPolicy.chooseVolume(volumes, 
maxSize);
+          hddsVolumeDir = containerVolume.getHddsRootDir().toString();
+          // Set volume before getContainerDBFile(), because we may need the
+          // volume to deduce the db file.
+          containerData.setVolume(containerVolume);
+          // commit bytes have been reserved in 
volumeChoosingPolicy#chooseVolume
+          containerData.setCommittedSpace(true);
         } catch (DiskOutOfSpaceException ex) {
           throw new StorageContainerException("Container creation failed, " +
               "due to disk out of space", ex, DISK_OUT_OF_SPACE);
@@ -169,11 +176,6 @@ public void create(VolumeSet volumeSet, 
VolumeChoosingPolicy
         }
 
         try {
-          String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
-          // Set volume before getContainerDBFile(), because we may need the
-          // volume to deduce the db file.
-          containerData.setVolume(containerVolume);
-
           long containerID = containerData.getContainerID();
           String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
               containerVolume, clusterId);
@@ -206,7 +208,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
           // Create .container file
           File containerFile = getContainerFile();
           createContainerFile(containerFile);
-
           return;
         } catch (StorageContainerException ex) {
           if (containerMetaDataPath != null
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index c933dc76cef..7f192afc29e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -434,6 +434,7 @@ ContainerCommandResponseProto handleCreateContainer(
         LOG.debug("Container already exists. container Id {}", containerID);
       }
     } catch (StorageContainerException ex) {
+      newContainerData.releaseCommitSpace();
       return ContainerUtils.logAndReturnError(LOG, ex, request);
     } finally {
       containerIdLock.unlock();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 90bbb3186ad..f3b39333e08 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -202,49 +202,56 @@ public void verifyAndFixupContainerData(ContainerData 
containerData)
       throws IOException {
     switch (containerData.getContainerType()) {
     case KeyValueContainer:
-      if (containerData instanceof KeyValueContainerData) {
-        KeyValueContainerData kvContainerData = (KeyValueContainerData)
-            containerData;
-        containerData.setVolume(hddsVolume);
-        KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
-        KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
-            config);
-        if (kvContainer.getContainerState() == RECOVERING) {
-          if (shouldDelete) {
-            // delete Ratis replicated RECOVERING containers
-            if (kvContainer.getContainerData().getReplicaIndex() == 0) {
-              cleanupContainer(hddsVolume, kvContainer);
-            } else {
-              kvContainer.markContainerUnhealthy();
-              LOG.info("Stale recovering container {} marked UNHEALTHY",
-                  kvContainerData.getContainerID());
-              containerSet.addContainer(kvContainer);
-            }
-          }
-          return;
-        }
-        if (kvContainer.getContainerState() == DELETED) {
-          if (shouldDelete) {
+      if (!(containerData instanceof KeyValueContainerData)) {
+        throw new StorageContainerException("Container File is corrupted. " +
+            "ContainerType is KeyValueContainer but cast to " +
+            "KeyValueContainerData failed. ",
+            ContainerProtos.Result.CONTAINER_METADATA_ERROR);
+      }
+
+      KeyValueContainerData kvContainerData = (KeyValueContainerData)
+          containerData;
+      containerData.setVolume(hddsVolume);
+      KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
+      KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
+          config);
+      if (kvContainer.getContainerState() == RECOVERING) {
+        if (shouldDelete) {
+          // delete Ratis replicated RECOVERING containers
+          if (kvContainer.getContainerData().getReplicaIndex() == 0) {
             cleanupContainer(hddsVolume, kvContainer);
+          } else {
+            kvContainer.markContainerUnhealthy();
+            LOG.info("Stale recovering container {} marked UNHEALTHY",
+                kvContainerData.getContainerID());
+            containerSet.addContainer(kvContainer);
           }
-          return;
         }
-        try {
-          containerSet.addContainer(kvContainer);
-        } catch (StorageContainerException e) {
-          if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
-            throw e;
-          }
-          if (shouldDelete) {
-            resolveDuplicate((KeyValueContainer) containerSet.getContainer(
-                kvContainer.getContainerData().getContainerID()), kvContainer);
+        return;
+      } else if (kvContainer.getContainerState() == DELETED) {
+        if (shouldDelete) {
+          cleanupContainer(hddsVolume, kvContainer);
+        }
+        return;
+      }
+
+      try {
+        containerSet.addContainer(kvContainer);
+        // this should be the last step of this block
+        containerData.commitSpace();
+      } catch (StorageContainerException e) {
+        if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
+          throw e;
+        }
+        if (shouldDelete) {
+          KeyValueContainer existing = (KeyValueContainer) 
containerSet.getContainer(
+              kvContainer.getContainerData().getContainerID());
+          boolean swapped = resolveDuplicate(existing, kvContainer);
+          if (swapped) {
+            existing.getContainerData().releaseCommitSpace();
+            kvContainer.getContainerData().commitSpace();
           }
         }
-      } else {
-        throw new StorageContainerException("Container File is corrupted. " +
-            "ContainerType is KeyValueContainer but cast to " +
-            "KeyValueContainerData failed. ",
-            ContainerProtos.Result.CONTAINER_METADATA_ERROR);
       }
       break;
     default:
@@ -254,7 +261,14 @@ public void verifyAndFixupContainerData(ContainerData 
containerData)
     }
   }
 
-  private void resolveDuplicate(KeyValueContainer existing,
+  /**
+   * Resolve duplicate containers.
+   * @param existing
+   * @param toAdd
+   * @return true if the container was swapped, false otherwise
+   * @throws IOException
+   */
+  private boolean resolveDuplicate(KeyValueContainer existing,
       KeyValueContainer toAdd) throws IOException {
     if (existing.getContainerData().getReplicaIndex() != 0 ||
         toAdd.getContainerData().getReplicaIndex() != 0) {
@@ -268,7 +282,7 @@ private void resolveDuplicate(KeyValueContainer existing,
           existing.getContainerData().getContainerID(),
           existing.getContainerData().getContainerPath(),
           toAdd.getContainerData().getContainerPath());
-      return;
+      return false;
     }
 
     long existingBCSID = existing.getBlockCommitSequenceId();
@@ -288,7 +302,7 @@ private void resolveDuplicate(KeyValueContainer existing,
             toAdd.getContainerData().getContainerPath(), toAddState);
         KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
             hddsVolume.getConf());
-        return;
+        return false;
       } else if (toAddState == CLOSED) {
         LOG.warn("Container {} is present at {} with state CLOSED and at " +
                 "{} with state {}. Removing the latter container.",
@@ -296,7 +310,7 @@ private void resolveDuplicate(KeyValueContainer existing,
             toAdd.getContainerData().getContainerPath(),
             existing.getContainerData().getContainerPath(), existingState);
         swapAndRemoveContainer(existing, toAdd);
-        return;
+        return true;
       }
     }
 
@@ -309,6 +323,7 @@ private void resolveDuplicate(KeyValueContainer existing,
           toAdd.getContainerData().getContainerPath());
       KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
           hddsVolume.getConf());
+      return false;
     } else {
       LOG.warn("Container {} is present at {} with a lesser BCSID " +
               "than at {}. Removing the former container.",
@@ -316,6 +331,7 @@ private void resolveDuplicate(KeyValueContainer existing,
           existing.getContainerData().getContainerPath(),
           toAdd.getContainerData().getContainerPath());
       swapAndRemoveContainer(existing, toAdd);
+      return true;
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index f69516f94e1..ff7b1d3b732 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -88,7 +88,7 @@ public boolean isAllowedContainerImport(long containerID) {
   }
 
   public void importContainer(long containerID, Path tarFilePath,
-      HddsVolume hddsVolume, CopyContainerCompression compression)
+      HddsVolume targetVolume, CopyContainerCompression compression)
       throws IOException {
     if (!importContainerProgress.add(containerID)) {
       deleteFileQuietely(tarFilePath);
@@ -106,11 +106,6 @@ public void importContainer(long containerID, Path 
tarFilePath,
             ContainerProtos.Result.CONTAINER_EXISTS);
       }
 
-      HddsVolume targetVolume = hddsVolume;
-      if (targetVolume == null) {
-        targetVolume = chooseNextVolume();
-      }
-
       KeyValueContainerData containerData;
       TarContainerPacker packer = getPacker(compression);
 
@@ -148,7 +143,7 @@ HddsVolume chooseNextVolume() throws IOException {
     // Choose volume that can hold both container in tmp and dest directory
     return volumeChoosingPolicy.chooseVolume(
         StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
-        HddsServerUtil.requiredReplicationSpace(containerSize));
+        getDefaultReplicationSpace());
   }
 
   public static Path getUntarDirectory(HddsVolume hddsVolume)
@@ -171,7 +166,7 @@ protected TarContainerPacker 
getPacker(CopyContainerCompression compression) {
     return new TarContainerPacker(compression);
   }
 
-  public long getDefaultContainerSize() {
-    return containerSize;
+  public long getDefaultReplicationSpace() {
+    return HddsServerUtil.requiredReplicationSpace(containerSize);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 9a943c63338..240ba9473d3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -22,11 +22,8 @@
 import java.nio.file.Path;
 import java.util.List;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import 
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
 import org.slf4j.Logger;
@@ -47,7 +44,6 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
   private final ContainerDownloader downloader;
   private final ContainerImporter containerImporter;
   private final ContainerSet containerSet;
-  private final long containerSize;
 
   public DownloadAndImportReplicator(
       ConfigurationSource conf, ContainerSet containerSet,
@@ -57,9 +53,6 @@ public DownloadAndImportReplicator(
     this.containerSet = containerSet;
     this.downloader = downloader;
     this.containerImporter = containerImporter;
-    containerSize = (long) conf.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
   }
 
   @Override
@@ -81,15 +74,6 @@ public void replicate(ReplicationTask task) {
 
     try {
       targetVolume = containerImporter.chooseNextVolume();
-      // Increment committed bytes and verify if it doesn't cross the space 
left.
-      targetVolume.incCommittedBytes(containerSize * 2);
-      StorageLocationReport volumeReport = targetVolume.getReport();
-      // Already committed bytes increased above, so required space is not 
required here in AvailableSpaceFilter
-      if (volumeReport.getUsableSpace() <= 0) {
-        LOG.warn("Container {} replication was unsuccessful, no space left on 
volume {}", containerID, volumeReport);
-        task.setStatus(Status.FAILED);
-        return;
-      }
       // Wait for the download. This thread pool is limiting the parallel
       // downloads, so it's ok to block here and wait for the full download.
       Path tarFilePath =
@@ -114,7 +98,7 @@ public void replicate(ReplicationTask task) {
       task.setStatus(Status.FAILED);
     } finally {
       if (targetVolume != null) {
-        targetVolume.incCommittedBytes(-containerSize * 2);
+        
targetVolume.incCommittedBytes(-containerImporter.getDefaultReplicationSpace());
       }
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index 5224498e727..9cb07a21c5d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -29,9 +29,7 @@
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.util.DiskChecker;
 import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
@@ -87,16 +85,6 @@ public void onNext(SendContainerRequest req) {
       if (containerId == -1) {
         containerId = req.getContainerID();
         volume = importer.chooseNextVolume();
-        // Increment committed bytes and verify if it doesn't cross the space 
left.
-        volume.incCommittedBytes(importer.getDefaultContainerSize() * 2);
-        StorageLocationReport volumeReport = volume.getReport();
-        // Already committed bytes increased above, so required space is not 
required here in AvailableSpaceFilter
-        if (volumeReport.getUsableSpace() <= 0) {
-          volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
-          LOG.warn("Container {} import was unsuccessful, no space left on 
volume {}", containerId, volumeReport);
-          volume = null;
-          throw new DiskChecker.DiskOutOfSpaceException("No more available 
volumes");
-        }
 
         Path dir = ContainerImporter.getUntarDirectory(volume);
         Files.createDirectories(dir);
@@ -130,7 +118,7 @@ public void onError(Throwable t) {
       responseObserver.onError(t);
     } finally {
       if (volume != null) {
-        volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+        volume.incCommittedBytes(-importer.getDefaultReplicationSpace());
       }
     }
   }
@@ -159,7 +147,7 @@ public void onCompleted() {
       }
     } finally {
       if (volume != null) {
-        volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+        volume.incCommittedBytes(-importer.getDefaultReplicationSpace());
       }
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index cf75342efdf..02f999013e6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -202,10 +202,11 @@ private KeyValueContainer addContainer(ContainerSet cSet, 
long cID)
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
     KeyValueContainer container = new KeyValueContainer(data, conf);
+    commitBytesBefore = 
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()).get(0).getCommittedBytes();
+
     container.create(volumeSet, volumeChoosingPolicy, SCM_ID);
-    commitBytesBefore = container.getContainerData()
-        .getVolume().getCommittedBytes();
     cSet.addContainer(container);
+
     commitBytesAfter = container.getContainerData()
         .getVolume().getCommittedBytes();
     commitIncrement = commitBytesAfter - commitBytesBefore;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
index d6c97c5f1a3..07ae372a4cd 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java
@@ -149,4 +149,15 @@ public void testVolumeChoosingPolicyFactory()
         VolumeChoosingPolicyFactory.getPolicy(CONF).getClass());
   }
 
+  @Test
+  public void testVolumeCommittedSpace() throws Exception {
+    Map<HddsVolume, Long> initialCommittedSpace = new HashMap<>();
+    volumes.forEach(vol ->
+        initialCommittedSpace.put(vol, vol.getCommittedBytes()));
+
+    HddsVolume selectedVolume = policy.chooseVolume(volumes, 50);
+
+    assertEquals(initialCommittedSpace.get(selectedVolume) + 50,
+        selectedVolume.getCommittedBytes());
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
index 1c07fe7ab7b..2406011a3d1 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
@@ -25,7 +25,9 @@
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
 import org.apache.hadoop.hdds.fs.MockSpaceUsageSource;
@@ -115,4 +117,15 @@ public void 
throwsDiskOutOfSpaceIfRequestMoreThanAvailable() {
         "Most available space: 150 bytes");
   }
 
+  @Test
+  public void testVolumeCommittedSpace() throws Exception {
+    Map<HddsVolume, Long> initialCommittedSpace = new HashMap<>();
+    volumes.forEach(vol ->
+        initialCommittedSpace.put(vol, vol.getCommittedBytes()));
+
+    HddsVolume selectedVolume = policy.chooseVolume(volumes, 50);
+
+    assertEquals(initialCommittedSpace.get(selectedVolume) + 50,
+        selectedVolume.getCommittedBytes());
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index c700b235faf..51a949e496f 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -38,6 +38,8 @@
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -1097,4 +1099,19 @@ private void testMixedSchemaImport(String dir,
     assertEquals(pendingDeleteBlockCount,
         importedContainer.getContainerData().getNumPendingDeletionBlocks());
   }
+
+  @ContainerTestVersionInfo.ContainerTest
+  public void testContainerCreationCommitSpaceReserve(
+      ContainerTestVersionInfo versionInfo) throws Exception {
+    init(versionInfo);
+    keyValueContainerData = spy(keyValueContainerData);
+    keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
+    keyValueContainer = spy(keyValueContainer);
+
+    keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+
+    // verify that
+    verify(volumeChoosingPolicy).chooseVolume(anyList(), anyLong()); // this 
would reserve commit space
+    assertTrue(keyValueContainerData.isCommittedSpace());
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 256ca20e938..7927864861b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -25,11 +25,13 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -43,6 +45,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.StorageUnit;
@@ -53,6 +57,7 @@
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
@@ -70,17 +75,22 @@
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Unit tests for {@link KeyValueHandler}.
  */
 public class TestKeyValueHandler {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestKeyValueHandler.class);
+
   @TempDir
   private Path tempDir;
 
@@ -91,9 +101,11 @@ public class TestKeyValueHandler {
 
   private HddsDispatcher dispatcher;
   private KeyValueHandler handler;
+  private long maxContainerSize;
 
   @BeforeEach
   public void setup() throws StorageContainerException {
+    OzoneConfiguration conf = new OzoneConfiguration();
     // Create mock HddsDispatcher and KeyValueHandler.
     handler = mock(KeyValueHandler.class);
 
@@ -109,6 +121,10 @@ public void setup() throws StorageContainerException {
         mock(ContainerMetrics.class),
         mock(TokenVerifier.class)
     );
+
+    maxContainerSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
   }
 
   /**
@@ -337,6 +353,68 @@ public void 
testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
         "Close container should return Invalid container error");
   }
 
+  @Test
+  public void testCreateContainerWithFailure() throws Exception {
+    final String testDir = tempDir.toString();
+    final long containerID = 1L;
+    final String clusterId = UUID.randomUUID().toString();
+    final String datanodeId = UUID.randomUUID().toString();
+    final ConfigurationSource conf = new OzoneConfiguration();
+    final ContainerSet containerSet = spy(newContainerSet());
+    final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
+
+    HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(conf)
+        .clusterID(clusterId).datanodeUuid(datanodeId)
+        .volumeSet(volumeSet)
+        .build();
+
+    hddsVolume.format(clusterId);
+    hddsVolume.createWorkingDir(clusterId, null);
+    hddsVolume.createTmpDirs(clusterId);
+
+    when(volumeSet.getVolumesList())
+        .thenReturn(Collections.singletonList(hddsVolume));
+
+    List<HddsVolume> hddsVolumeList = StorageVolumeUtil
+        .getHddsVolumesList(volumeSet.getVolumesList());
+
+    assertEquals(1, hddsVolumeList.size());
+
+    final ContainerMetrics metrics = ContainerMetrics.create(conf);
+    
+    final AtomicInteger icrReceived = new AtomicInteger(0);
+
+    final KeyValueHandler kvHandler = new KeyValueHandler(conf,
+        datanodeId, containerSet, volumeSet, metrics,
+        c -> icrReceived.incrementAndGet());
+    kvHandler.setClusterID(clusterId);
+    
+    final ContainerCommandRequestProto createContainer =
+        createContainerRequest(datanodeId, containerID);
+
+    Semaphore semaphore = new Semaphore(1);
+    doAnswer(invocation -> {
+      semaphore.acquire();
+      throw new StorageContainerException(ContainerProtos.Result.IO_EXCEPTION);
+    }).when(containerSet).addContainer(any());
+
+    semaphore.acquire();
+    CompletableFuture.runAsync(() -> 
+        kvHandler.handleCreateContainer(createContainer, null)
+    );
+
+    // commit bytes has been allocated by volumeChoosingPolicy which is called 
in KeyValueContainer#create
+    GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 
maxContainerSize,
+            1000, 50000);
+    semaphore.release();
+
+    LOG.info("Committed bytes: {}", hddsVolume.getCommittedBytes());
+    
+    // release committed bytes as exception is thrown
+    GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 0,
+            1000, 50000);
+  }
+
   @Test
   public void testDeleteContainer() throws IOException {
     final String testDir = tempDir.toString();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index 6a48765c1a9..ec5c6743e72 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -217,8 +217,18 @@ public void testContainerReader(ContainerTestVersionInfo 
versionInfo)
       throws Exception {
     setLayoutAndSchemaVersion(versionInfo);
     setup(versionInfo);
+
+    ContainerReader containerReader = new ContainerReader(volumeSet,
+        hddsVolume, containerSet, conf, true);
+    Thread thread = new Thread(containerReader);
+    thread.start();
+    thread.join();
+    long originalCommittedBytes = hddsVolume.getCommittedBytes();
+    ContainerCache.getInstance(conf).shutdownCache();
+
+    long recoveringContainerId = 10;
     KeyValueContainerData recoveringContainerData = new KeyValueContainerData(
-        10, layout, (long) StorageUnit.GB.toBytes(5),
+        recoveringContainerId, layout, (long) StorageUnit.GB.toBytes(5),
         UUID.randomUUID().toString(), datanodeId.toString());
     //create a container with recovering state
     recoveringContainerData.setState(RECOVERING);
@@ -229,13 +239,13 @@ public void testContainerReader(ContainerTestVersionInfo 
versionInfo)
     recoveringKeyValueContainer.create(
         volumeSet, volumeChoosingPolicy, clusterId);
 
-    ContainerReader containerReader = new ContainerReader(volumeSet,
-        hddsVolume, containerSet, conf, true);
-
-    Thread thread = new Thread(containerReader);
+    thread = new Thread(containerReader);
     thread.start();
     thread.join();
 
+    // no change, only open containers have committed space
+    assertEquals(originalCommittedBytes, hddsVolume.getCommittedBytes());
+
     // Ratis replicated recovering containers are deleted upon datanode startup
     if (recoveringKeyValueContainer.getContainerData().getReplicaIndex() == 0) 
{
       
assertNull(containerSet.getContainer(recoveringContainerData.getContainerID()));
@@ -262,6 +272,8 @@ public void testContainerReader(ContainerTestVersionInfo 
versionInfo)
 
       assertEquals(i,
           keyValueContainerData.getNumPendingDeletionBlocks());
+
+      assertTrue(keyValueContainerData.isCommittedSpace());
     }
   }
 
@@ -313,6 +325,14 @@ public void testContainerReaderWithLoadException(
         hddsVolume1, containerSet1, conf, true);
     containerReader.readVolume(hddsVolume1.getHddsRootDir());
     assertEquals(containerCount - 1, containerSet1.containerCount());
+    for (Container c : containerSet1.getContainerMap().values()) {
+      if (c.getContainerData().getContainerID() == 0) {
+        assertFalse(c.getContainerData().isCommittedSpace());
+      } else {
+        assertTrue(c.getContainerData().isCommittedSpace());
+      }
+    }
+    assertEquals(hddsVolume1.getCommittedBytes(), (containerCount - 1) * 
StorageUnit.GB.toBytes(5));
   }
 
   @ContainerTestVersionInfo.ContainerTest
@@ -361,6 +381,7 @@ public void testContainerReaderWithInvalidDbPath(
         hddsVolume1, containerSet1, conf, true);
     containerReader.readVolume(hddsVolume1.getHddsRootDir());
     assertEquals(0, containerSet1.containerCount());
+    assertEquals(0, hddsVolume1.getCommittedBytes());
     assertThat(dnLogs.getOutput()).contains("Container DB file is missing");
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java
new file mode 100644
index 00000000000..5993e43e661
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import static 
org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Test for DownloadAndImportReplicator.
+ */
+@Timeout(300)
+public class TestDownloadAndImportReplicator {
+
+  @TempDir
+  private File tempDir;
+
+  private OzoneConfiguration conf;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private ContainerSet containerSet;
+  private MutableVolumeSet volumeSet;
+  private ContainerImporter importer;
+  private SimpleContainerDownloader downloader;
+  private DownloadAndImportReplicator replicator;
+  private long containerMaxSize;
+
+  @BeforeEach
+  void setup() throws IOException {
+    conf = new OzoneConfiguration();
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
+    volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
+    containerSet = newContainerSet(0);
+    volumeSet = new MutableVolumeSet("test", conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    importer = new ContainerImporter(conf, containerSet,
+        mock(ContainerController.class), volumeSet, volumeChoosingPolicy);
+    downloader = mock(SimpleContainerDownloader.class);
+    replicator = new DownloadAndImportReplicator(conf, containerSet, importer,
+        downloader);
+    containerMaxSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+  }
+
+  @Test
+  public void testCommitSpaceReleasedOnReplicationFailure() throws Exception {
+    long containerId = 1;
+    HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+    long initialCommittedBytes = volume.getCommittedBytes();
+
+    // Mock downloader to throw exception
+    Semaphore semaphore = new Semaphore(1);
+    when(downloader.getContainerDataFromReplicas(anyLong(), any(), any(), 
any()))
+        .thenAnswer(invocation -> {
+          semaphore.acquire();
+          throw new IOException("Download failed");
+        });
+
+    ReplicationTask task = new ReplicationTask(containerId,
+        Collections.singletonList(mock(DatanodeDetails.class)), replicator);
+
+    // Acquire semaphore so that container import will pause before 
downloading.
+    semaphore.acquire();
+    CompletableFuture.runAsync(() -> {
+      assertThrows(IOException.class, () -> replicator.replicate(task));
+    });
+
+    // Wait such that first container import reserve space
+    GenericTestUtils.waitFor(() ->
+        volume.getCommittedBytes() > initialCommittedBytes,
+        1000, 50000);
+    assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * 
containerMaxSize);
+    semaphore.release();
+
+    GenericTestUtils.waitFor(() ->
+        volume.getCommittedBytes() == initialCommittedBytes,
+        1000, 50000);
+
+    // Verify commit space is released
+    assertEquals(initialCommittedBytes, volume.getCommittedBytes());
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 9acb73486a0..1e69eac2ea9 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -355,11 +355,12 @@ public void 
testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout
   @ContainerLayoutTestInfo.ContainerTest
   public void testReplicationImportReserveSpace(ContainerLayoutVersion layout)
       throws IOException, InterruptedException, TimeoutException {
+    final long containerUsedSize = 100;
     this.layoutVersion = layout;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
 
-    long containerSize = (long) conf.getStorageSize(
+    long containerMaxSize = (long) conf.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
 
@@ -369,13 +370,16 @@ public void 
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
         .clock(clock)
         .build();
 
+    MutableVolumeSet volumeSet = new 
MutableVolumeSet(datanode.getUuidString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+
     long containerId = 1;
     // create container
     KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
-        ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
-    HddsVolume vol = mock(HddsVolume.class);
-    containerData.setVolume(vol);
-    containerData.incrBytesUsed(100);
+        ContainerLayoutVersion.FILE_PER_BLOCK, containerMaxSize, "test", 
"test");
+    HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0);
+    containerData.setVolume(vol1);
+    containerData.incrBytesUsed(containerUsedSize);
     KeyValueContainer container = new KeyValueContainer(containerData, conf);
     ContainerController controllerMock = mock(ContainerController.class);
     Semaphore semaphore = new Semaphore(1);
@@ -384,8 +388,7 @@ public void 
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
           semaphore.acquire();
           return container;
         });
-    MutableVolumeSet volumeSet = new 
MutableVolumeSet(datanode.getUuidString(), conf, null,
-        StorageVolume.VolumeType.DATA_VOLUME, null);
+    
     File tarFile = containerTarFile(containerId, containerData);
 
     SimpleContainerDownloader moc =
@@ -398,14 +401,13 @@ public void 
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
     ContainerImporter importer =
         new ContainerImporter(conf, set, controllerMock, volumeSet, 
volumeChoosingPolicy);
 
-    HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0);
     // Initially volume has 0 commit space
     assertEquals(0, vol1.getCommittedBytes());
     long usedSpace = vol1.getCurrentUsage().getUsedSpace();
     // Initially volume has 0 used space
     assertEquals(0, usedSpace);
     // Increase committed bytes so that volume has only remaining 3 times 
container size space
-    long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - 
containerSize * 3;
+    long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - 
containerMaxSize * 3;
     vol1.incCommittedBytes(initialCommittedBytes);
     ContainerReplicator replicator =
         new DownloadAndImportReplicator(conf, set, importer, moc);
@@ -424,11 +426,11 @@ public void 
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
 
     // Wait such that first container import reserve space
     GenericTestUtils.waitFor(() ->
-        vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - 
containerSize * 3,
+        vol1.getCommittedBytes() > initialCommittedBytes,
         1000, 50000);
 
     // Volume has reserved space of 2 * containerSize
-    assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * 
containerSize);
+    assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * 
containerMaxSize);
     // Container 2 import will fail as container 1 has reserved space and no 
space left to import new container
     // New container import requires at least (2 * container size)
     long containerId2 = 2;
@@ -443,10 +445,11 @@ public void 
testReplicationImportReserveSpace(ContainerLayoutVersion layout)
 
     usedSpace = vol1.getCurrentUsage().getUsedSpace();
     // After replication, volume used space should be increased by container 
used bytes
-    assertEquals(100, usedSpace);
+    assertEquals(containerUsedSize, usedSpace);
 
-    // Volume committed bytes should become initial committed bytes which was 
before replication
-    assertEquals(initialCommittedBytes, vol1.getCommittedBytes());
+    // Volume committed bytes used for replication has been released, no need 
to reserve space for imported container
+    // only closed container gets replicated, so no new data will be written it
+    assertEquals(vol1.getCommittedBytes(), initialCommittedBytes);
 
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
index 0d15e265ad9..441bc7890b6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
@@ -21,18 +21,25 @@
 import static 
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.mockito.Mockito.any;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
+import java.io.IOException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
@@ -48,7 +55,7 @@
 /**
  * Test for {@link SendContainerRequestHandler}.
  */
-class TestSendContainerRequestHandler {
+public class TestSendContainerRequestHandler {
 
   @TempDir
   private File tempDir;
@@ -57,38 +64,48 @@ class TestSendContainerRequestHandler {
 
   private VolumeChoosingPolicy volumeChoosingPolicy;
 
+  private ContainerSet containerSet;
+  private MutableVolumeSet volumeSet;
+  private ContainerImporter importer;
+  private StreamObserver<ContainerProtos.SendContainerResponse> 
responseObserver;
+  private SendContainerRequestHandler sendContainerRequestHandler;
+  private long containerMaxSize;
+
   @BeforeEach
-  void setup() {
+  void setup() throws IOException {
     conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
     volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
+    containerSet = newContainerSet(0);
+    volumeSet = new MutableVolumeSet("test", conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    importer = new ContainerImporter(conf, containerSet,
+        mock(ContainerController.class), volumeSet, volumeChoosingPolicy);
+    importer = spy(importer);
+    responseObserver = mock(StreamObserver.class);
+    sendContainerRequestHandler = new SendContainerRequestHandler(importer, 
responseObserver, null);
+    containerMaxSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
   }
 
   @Test
   void testReceiveDataForExistingContainer() throws Exception {
     long containerId = 1;
     // create containerImporter
-    ContainerSet containerSet = newContainerSet(0);
-    MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null,
-        StorageVolume.VolumeType.DATA_VOLUME, null);
-    ContainerImporter containerImporter = new ContainerImporter(conf,
-        newContainerSet(0), mock(ContainerController.class), volumeSet, 
volumeChoosingPolicy);
     KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
         ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
     // add container to container set
     KeyValueContainer container = new KeyValueContainer(containerData, conf);
     containerSet.addContainer(container);
 
-    StreamObserver observer = mock(StreamObserver.class);
     doAnswer(invocation -> {
       Object arg = invocation.getArgument(0);
       assertInstanceOf(StorageContainerException.class, arg);
       assertEquals(ContainerProtos.Result.CONTAINER_EXISTS,
           ((StorageContainerException) arg).getResult());
       return null;
-    }).when(observer).onError(any());
-    SendContainerRequestHandler sendContainerRequestHandler
-        = new SendContainerRequestHandler(containerImporter, observer, null);
+    }).when(responseObserver).onError(any());
     ByteString data = ByteString.copyFromUtf8("test");
     ContainerProtos.SendContainerRequest request
         = ContainerProtos.SendContainerRequest.newBuilder()
@@ -99,4 +116,87 @@ void testReceiveDataForExistingContainer() throws Exception 
{
         .build();
     sendContainerRequestHandler.onNext(request);
   }
+
+  @Test
+  public void testSpaceReservedAndReleasedWhenRequestCompleted() throws 
Exception {
+    long containerId = 1;
+    HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+    long initialCommittedBytes = volume.getCommittedBytes();
+
+    // Create request
+    ContainerProtos.SendContainerRequest request = 
ContainerProtos.SendContainerRequest.newBuilder()
+        .setContainerID(containerId)
+        .setData(ByteString.EMPTY)
+        .setOffset(0)
+        .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto())
+        .build();
+
+    // Execute request
+    sendContainerRequestHandler.onNext(request);
+
+    // Verify commit space is reserved
+    assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * 
containerMaxSize);
+
+    // complete the request
+    sendContainerRequestHandler.onCompleted();
+
+    // Verify commit space is released
+    assertEquals(volume.getCommittedBytes(), initialCommittedBytes);
+  }
+
+  @Test
+  public void testSpaceReservedAndReleasedWhenOnNextFails() throws Exception {
+    long containerId = 1;
+    HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+    long initialCommittedBytes = volume.getCommittedBytes();
+
+    // Create request
+    ContainerProtos.SendContainerRequest request = createRequest(containerId, 
ByteString.copyFromUtf8("test"), 0);
+
+    // Execute request
+    sendContainerRequestHandler.onNext(request);
+
+    // Verify commit space is reserved
+    assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * 
containerMaxSize);
+
+    // mock the importer is not allowed to import this container
+    when(importer.isAllowedContainerImport(containerId)).thenReturn(false);
+    
+    sendContainerRequestHandler.onNext(request);
+
+    // Verify commit space is released
+    assertEquals(volume.getCommittedBytes(), initialCommittedBytes);
+  }
+
+  @Test
+  public void testSpaceReservedAndReleasedWhenOnCompletedFails() throws 
Exception {
+    long containerId = 1;
+    HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0);
+    long initialCommittedBytes = volume.getCommittedBytes();
+
+    // Create request
+    ContainerProtos.SendContainerRequest request = createRequest(containerId, 
ByteString.copyFromUtf8("test"), 0);
+
+    // Execute request
+    sendContainerRequestHandler.onNext(request);
+
+    // Verify commit space is reserved
+    assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * 
containerMaxSize);
+
+    doThrow(new 
IOException("Failed")).when(importer).importContainer(anyLong(), any(), any(), 
any());
+
+    sendContainerRequestHandler.onCompleted();
+
+    // Verify commit space is released
+    assertEquals(volume.getCommittedBytes(), initialCommittedBytes);
+  }
+
+  private ContainerProtos.SendContainerRequest createRequest(long containerId, 
ByteString data, int offset) {
+    return ContainerProtos.SendContainerRequest.newBuilder()
+        .setContainerID(containerId)
+        .setData(data)
+        .setOffset(offset)
+        .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto())
+        .build();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to