This is an automated email from the ASF dual-hosted git repository.

erose pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e3e47ea41e HDDS-12235. Reserve space on DN during container import 
operation. (#7981)
e3e47ea41e is described below

commit e3e47ea41eb85d604b6ace6586ef6aedb9bba12d
Author: Ashish Kumar <[email protected]>
AuthorDate: Wed Mar 26 20:38:01 2025 +0530

    HDDS-12235. Reserve space on DN during container import operation. (#7981)
---
 .../common/volume/AvailableSpaceFilter.java        |   2 +-
 .../container/replication/ContainerImporter.java   |   5 +
 .../replication/DownloadAndImportReplicator.java   |  23 +++-
 .../replication/SendContainerRequestHandler.java   |  69 ++++++----
 .../replication/TestReplicationSupervisor.java     | 139 +++++++++++++++++++++
 5 files changed, 214 insertions(+), 24 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
index d584600acf..cea77fdf4b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
@@ -33,7 +33,7 @@ public class AvailableSpaceFilter implements 
Predicate<HddsVolume> {
       new HashMap<>();
   private long mostAvailableSpace = Long.MIN_VALUE;
 
-  AvailableSpaceFilter(long requiredSpace) {
+  public AvailableSpaceFilter(long requiredSpace) {
     this.requiredSpace = requiredSpace;
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index 90fc50f84a..b5ea8902c0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -129,6 +129,8 @@ public void importContainer(long containerID, Path 
tarFilePath,
       try (InputStream input = Files.newInputStream(tarFilePath)) {
         Container container = controller.importContainer(
             containerData, input, packer);
+        // After container import is successful, increase used space for the 
volume
+        
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
         containerSet.addContainerByOverwriteMissingContainer(container);
       }
     } finally {
@@ -173,4 +175,7 @@ protected TarContainerPacker 
getPacker(CopyContainerCompression compression) {
     return new TarContainerPacker(compression);
   }
 
+  public long getDefaultContainerSize() {
+    return containerSize;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 69f375b06c..8c44d0d078 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -22,8 +22,11 @@
 import java.nio.file.Path;
 import java.util.List;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import 
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
 import org.slf4j.Logger;
@@ -44,6 +47,7 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
   private final ContainerDownloader downloader;
   private final ContainerImporter containerImporter;
   private final ContainerSet containerSet;
+  private final long containerSize;
 
   public DownloadAndImportReplicator(
       ConfigurationSource conf, ContainerSet containerSet,
@@ -53,6 +57,9 @@ public DownloadAndImportReplicator(
     this.containerSet = containerSet;
     this.downloader = downloader;
     this.containerImporter = containerImporter;
+    containerSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
   }
 
   @Override
@@ -70,9 +77,19 @@ public void replicate(ReplicationTask task) {
 
     LOG.info("Starting replication of container {} from {} using {}",
         containerID, sourceDatanodes, compression);
+    HddsVolume targetVolume = null;
 
     try {
-      HddsVolume targetVolume = containerImporter.chooseNextVolume();
+      targetVolume = containerImporter.chooseNextVolume();
+      // Increment committed bytes and verify if it doesn't cross the space 
left.
+      targetVolume.incCommittedBytes(containerSize * 2);
+      // Already committed bytes increased above, so required space is not 
required here in AvailableSpaceFilter
+      AvailableSpaceFilter filter = new AvailableSpaceFilter(0);
+      if (!filter.test(targetVolume)) {
+        LOG.warn("Container {} replication was unsuccessful, due to no space 
left", containerID);
+        task.setStatus(Status.FAILED);
+        return;
+      }
       // Wait for the download. This thread pool is limiting the parallel
       // downloads, so it's ok to block here and wait for the full download.
       Path tarFilePath =
@@ -95,6 +112,10 @@ public void replicate(ReplicationTask task) {
     } catch (IOException e) {
       LOG.error("Container {} replication was unsuccessful.", containerID, e);
       task.setStatus(Status.FAILED);
+    } finally {
+      if (targetVolume != null) {
+        targetVolume.incCommittedBytes(-containerSize * 2);
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index a89e274f9c..e76a44e680 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -29,7 +29,9 @@
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
@@ -50,7 +52,7 @@ class SendContainerRequestHandler
   private long containerId = -1;
   private long nextOffset;
   private OutputStream output;
-  private HddsVolume volume;
+  private HddsVolume volume = null;
   private Path path;
   private CopyContainerCompression compression;
   private final ZeroCopyMessageMarshaller<SendContainerRequest> marshaller;
@@ -85,6 +87,17 @@ public void onNext(SendContainerRequest req) {
       if (containerId == -1) {
         containerId = req.getContainerID();
         volume = importer.chooseNextVolume();
+        // Increment committed bytes and verify if it doesn't cross the space 
left.
+        volume.incCommittedBytes(importer.getDefaultContainerSize() * 2);
+        // Already committed bytes increased above, so required space is not 
required here in AvailableSpaceFilter
+        AvailableSpaceFilter filter = new AvailableSpaceFilter(0);
+        if (!filter.test(volume)) {
+          volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+          LOG.warn("Container {} import was unsuccessful, due to no space 
left", containerId);
+          volume = null;
+          throw new DiskChecker.DiskOutOfSpaceException("No more available 
volumes");
+        }
+
         Path dir = ContainerImporter.getUntarDirectory(volume);
         Files.createDirectories(dir);
         path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
@@ -110,32 +123,44 @@ public void onNext(SendContainerRequest req) {
 
   @Override
   public void onError(Throwable t) {
-    LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t);
-    closeOutput();
-    deleteTarball();
-    responseObserver.onError(t);
+    try {
+      LOG.warn("Error receiving container {} at {}", containerId, nextOffset, 
t);
+      closeOutput();
+      deleteTarball();
+      responseObserver.onError(t);
+    } finally {
+      if (volume != null) {
+        volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+      }
+    }
   }
 
   @Override
   public void onCompleted() {
-    if (output == null) {
-      LOG.warn("Received container without any parts");
-      return;
-    }
-
-    LOG.info("Container {} is downloaded with size {}, starting to import.",
-        containerId, nextOffset);
-    closeOutput();
-
     try {
-      importer.importContainer(containerId, path, volume, compression);
-      LOG.info("Container {} is replicated successfully", containerId);
-      responseObserver.onNext(SendContainerResponse.newBuilder().build());
-      responseObserver.onCompleted();
-    } catch (Throwable t) {
-      LOG.warn("Failed to import container {}", containerId, t);
-      deleteTarball();
-      responseObserver.onError(t);
+      if (output == null) {
+        LOG.warn("Received container without any parts");
+        return;
+      }
+
+      LOG.info("Container {} is downloaded with size {}, starting to import.",
+          containerId, nextOffset);
+      closeOutput();
+
+      try {
+        importer.importContainer(containerId, path, volume, compression);
+        LOG.info("Container {} is replicated successfully", containerId);
+        responseObserver.onNext(SendContainerResponse.newBuilder().build());
+        responseObserver.onCompleted();
+      } catch (Throwable t) {
+        LOG.warn("Failed to import container {}", containerId, t);
+        deleteTarball();
+        responseObserver.onError(t);
+      }
+    } finally {
+      if (volume != null) {
+        volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index b6517f4fea..fb60645994 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -42,6 +42,9 @@
 import jakarta.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Clock;
@@ -52,24 +55,36 @@
 import java.util.SortedMap;
 import java.util.UUID;
 import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
@@ -77,6 +92,7 @@
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo;
 import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
 import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask;
@@ -100,6 +116,9 @@ public class TestReplicationSupervisor {
 
   private static final long CURRENT_TERM = 1;
 
+  @TempDir
+  private File tempDir;
+
   private final ContainerReplicator noopReplicator = task -> { };
   private final ContainerReplicator throwingReplicator = task -> {
     throw new RuntimeException("testing replication failure");
@@ -328,6 +347,126 @@ public void 
testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout
         .contains("Container 1 replication was unsuccessful.");
   }
 
+  @ContainerLayoutTestInfo.ContainerTest
+  public void testReplicationImportReserveSpace(ContainerLayoutVersion layout)
+      throws IOException, InterruptedException, TimeoutException {
+    this.layoutVersion = layout;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
+
+    long containerSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+    ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder()
+        .stateContext(context)
+        .executor(newDirectExecutorService())
+        .clock(clock)
+        .build();
+
+    long containerId = 1;
+    // create container
+    KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
+        ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
+    HddsVolume vol = mock(HddsVolume.class);
+    containerData.setVolume(vol);
+    containerData.incrBytesUsed(100);
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
+    ContainerController controllerMock = mock(ContainerController.class);
+    Semaphore semaphore = new Semaphore(1);
+    when(controllerMock.importContainer(any(), any(), any()))
+        .thenAnswer((invocation) -> {
+          semaphore.acquire();
+          return container;
+        });
+    MutableVolumeSet volumeSet = new 
MutableVolumeSet(datanode.getUuidString(), conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    File tarFile = containerTarFile(containerId, containerData);
+
+    SimpleContainerDownloader moc =
+        mock(SimpleContainerDownloader.class);
+    when(
+        moc.getContainerDataFromReplicas(anyLong(), anyList(),
+            any(Path.class), any()))
+        .thenReturn(tarFile.toPath());
+
+    ContainerImporter importer =
+        new ContainerImporter(conf, set, controllerMock, volumeSet);
+
+    HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0);
+    // Initially volume has 0 commit space
+    assertEquals(0, vol1.getCommittedBytes());
+    long usedSpace = vol1.getCurrentUsage().getUsedSpace();
+    // Initially volume has 0 used space
+    assertEquals(0, usedSpace);
+    // Increase committed bytes so that volume has only remaining 3 times 
container size space
+    long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - 
containerSize * 3;
+    vol1.incCommittedBytes(initialCommittedBytes);
+    ContainerReplicator replicator =
+        new DownloadAndImportReplicator(conf, set, importer, moc);
+    replicatorRef.set(replicator);
+
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(DownloadAndImportReplicator.LOG);
+
+    // Acquire semaphore so that container import will pause after reserving 
space.
+    semaphore.acquire();
+    CompletableFuture.runAsync(() -> {
+      try {
+        supervisor.addTask(createTask(containerId));
+      } catch (Exception ex) {
+      }
+    });
+
+    // Wait such that first container import reserve space
+    GenericTestUtils.waitFor(() ->
+        vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - 
containerSize * 3,
+        1000, 50000);
+
+    // Volume has reserved space of 2 * containerSize
+    assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * 
containerSize);
+    // Container 2 import will fail as container 1 has reserved space and no 
space left to import new container
+    // New container import requires at least (2 * container size)
+    long containerId2 = 2;
+    supervisor.addTask(createTask(containerId2));
+    GenericTestUtils.waitFor(() -> 1 == 
supervisor.getReplicationFailureCount(),
+        1000, 50000);
+    assertThat(logCapturer.getOutput()).contains("No volumes have enough space 
for a new container");
+    // Release semaphore so that first container import will pass
+    semaphore.release();
+    GenericTestUtils.waitFor(() ->
+        1 == supervisor.getReplicationSuccessCount(), 1000, 50000);
+
+    usedSpace = vol1.getCurrentUsage().getUsedSpace();
+    // After replication, volume used space should be increased by container 
used bytes
+    assertEquals(100, usedSpace);
+
+    // Volume committed bytes should become initial committed bytes which was 
before replication
+    assertEquals(initialCommittedBytes, vol1.getCommittedBytes());
+
+  }
+
+
+  private File containerTarFile(
+      long containerId, ContainerData containerData) throws IOException {
+    File yamlFile = new File(tempDir, "container.yaml");
+    ContainerDataYaml.createContainerFile(containerData,
+        yamlFile);
+    File tarFile = new File(tempDir,
+        ContainerUtils.getContainerTarName(containerId));
+    try (OutputStream output = Files.newOutputStream(tarFile.toPath())) {
+      ArchiveOutputStream<TarArchiveEntry> archive = new 
TarArchiveOutputStream(output);
+      TarArchiveEntry entry = archive.createArchiveEntry(yamlFile,
+          "container.yaml");
+      archive.putArchiveEntry(entry);
+      try (InputStream input = Files.newInputStream(yamlFile.toPath())) {
+        IOUtils.copy(input, archive);
+      }
+      archive.closeArchiveEntry();
+    }
+    return tarFile;
+  }
+
   @ContainerLayoutTestInfo.ContainerTest
   public void testTaskBeyondDeadline(ContainerLayoutVersion layout) {
     this.layoutVersion = layout;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to