This is an automated email from the ASF dual-hosted git repository.
erose pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e3e47ea41e HDDS-12235. Reserve space on DN during container import
operation. (#7981)
e3e47ea41e is described below
commit e3e47ea41eb85d604b6ace6586ef6aedb9bba12d
Author: Ashish Kumar <[email protected]>
AuthorDate: Wed Mar 26 20:38:01 2025 +0530
HDDS-12235. Reserve space on DN during container import operation. (#7981)
---
.../common/volume/AvailableSpaceFilter.java | 2 +-
.../container/replication/ContainerImporter.java | 5 +
.../replication/DownloadAndImportReplicator.java | 23 +++-
.../replication/SendContainerRequestHandler.java | 69 ++++++----
.../replication/TestReplicationSupervisor.java | 139 +++++++++++++++++++++
5 files changed, 214 insertions(+), 24 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
index d584600acf..cea77fdf4b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java
@@ -33,7 +33,7 @@ public class AvailableSpaceFilter implements
Predicate<HddsVolume> {
new HashMap<>();
private long mostAvailableSpace = Long.MIN_VALUE;
- AvailableSpaceFilter(long requiredSpace) {
+ public AvailableSpaceFilter(long requiredSpace) {
this.requiredSpace = requiredSpace;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index 90fc50f84a..b5ea8902c0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -129,6 +129,8 @@ public void importContainer(long containerID, Path
tarFilePath,
try (InputStream input = Files.newInputStream(tarFilePath)) {
Container container = controller.importContainer(
containerData, input, packer);
+ // After container import is successful, increase used space for the
volume
+
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
containerSet.addContainerByOverwriteMissingContainer(container);
}
} finally {
@@ -173,4 +175,7 @@ protected TarContainerPacker
getPacker(CopyContainerCompression compression) {
return new TarContainerPacker(compression);
}
+ public long getDefaultContainerSize() {
+ return containerSize;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 69f375b06c..8c44d0d078 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -22,8 +22,11 @@
import java.nio.file.Path;
import java.util.List;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
import org.slf4j.Logger;
@@ -44,6 +47,7 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
private final ContainerDownloader downloader;
private final ContainerImporter containerImporter;
private final ContainerSet containerSet;
+ private final long containerSize;
public DownloadAndImportReplicator(
ConfigurationSource conf, ContainerSet containerSet,
@@ -53,6 +57,9 @@ public DownloadAndImportReplicator(
this.containerSet = containerSet;
this.downloader = downloader;
this.containerImporter = containerImporter;
+ containerSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
}
@Override
@@ -70,9 +77,19 @@ public void replicate(ReplicationTask task) {
LOG.info("Starting replication of container {} from {} using {}",
containerID, sourceDatanodes, compression);
+ HddsVolume targetVolume = null;
try {
- HddsVolume targetVolume = containerImporter.chooseNextVolume();
+ targetVolume = containerImporter.chooseNextVolume();
+ // Increment committed bytes and verify if it doesn't cross the space
left.
+ targetVolume.incCommittedBytes(containerSize * 2);
+ // Already committed bytes increased above, so required space is not
required here in AvailableSpaceFilter
+ AvailableSpaceFilter filter = new AvailableSpaceFilter(0);
+ if (!filter.test(targetVolume)) {
+ LOG.warn("Container {} replication was unsuccessful, due to no space
left", containerID);
+ task.setStatus(Status.FAILED);
+ return;
+ }
// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
@@ -95,6 +112,10 @@ public void replicate(ReplicationTask task) {
} catch (IOException e) {
LOG.error("Container {} replication was unsuccessful.", containerID, e);
task.setStatus(Status.FAILED);
+ } finally {
+ if (targetVolume != null) {
+ targetVolume.incCommittedBytes(-containerSize * 2);
+ }
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index a89e274f9c..e76a44e680 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -29,7 +29,9 @@
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
@@ -50,7 +52,7 @@ class SendContainerRequestHandler
private long containerId = -1;
private long nextOffset;
private OutputStream output;
- private HddsVolume volume;
+ private HddsVolume volume = null;
private Path path;
private CopyContainerCompression compression;
private final ZeroCopyMessageMarshaller<SendContainerRequest> marshaller;
@@ -85,6 +87,17 @@ public void onNext(SendContainerRequest req) {
if (containerId == -1) {
containerId = req.getContainerID();
volume = importer.chooseNextVolume();
+ // Increment committed bytes and verify if it doesn't cross the space
left.
+ volume.incCommittedBytes(importer.getDefaultContainerSize() * 2);
+ // Already committed bytes increased above, so required space is not
required here in AvailableSpaceFilter
+ AvailableSpaceFilter filter = new AvailableSpaceFilter(0);
+ if (!filter.test(volume)) {
+ volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+ LOG.warn("Container {} import was unsuccessful, due to no space
left", containerId);
+ volume = null;
+ throw new DiskChecker.DiskOutOfSpaceException("No more available
volumes");
+ }
+
Path dir = ContainerImporter.getUntarDirectory(volume);
Files.createDirectories(dir);
path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
@@ -110,32 +123,44 @@ public void onNext(SendContainerRequest req) {
@Override
public void onError(Throwable t) {
- LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t);
- closeOutput();
- deleteTarball();
- responseObserver.onError(t);
+ try {
+ LOG.warn("Error receiving container {} at {}", containerId, nextOffset,
t);
+ closeOutput();
+ deleteTarball();
+ responseObserver.onError(t);
+ } finally {
+ if (volume != null) {
+ volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+ }
+ }
}
@Override
public void onCompleted() {
- if (output == null) {
- LOG.warn("Received container without any parts");
- return;
- }
-
- LOG.info("Container {} is downloaded with size {}, starting to import.",
- containerId, nextOffset);
- closeOutput();
-
try {
- importer.importContainer(containerId, path, volume, compression);
- LOG.info("Container {} is replicated successfully", containerId);
- responseObserver.onNext(SendContainerResponse.newBuilder().build());
- responseObserver.onCompleted();
- } catch (Throwable t) {
- LOG.warn("Failed to import container {}", containerId, t);
- deleteTarball();
- responseObserver.onError(t);
+ if (output == null) {
+ LOG.warn("Received container without any parts");
+ return;
+ }
+
+ LOG.info("Container {} is downloaded with size {}, starting to import.",
+ containerId, nextOffset);
+ closeOutput();
+
+ try {
+ importer.importContainer(containerId, path, volume, compression);
+ LOG.info("Container {} is replicated successfully", containerId);
+ responseObserver.onNext(SendContainerResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ } catch (Throwable t) {
+ LOG.warn("Failed to import container {}", containerId, t);
+ deleteTarball();
+ responseObserver.onError(t);
+ }
+ } finally {
+ if (volume != null) {
+ volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
+ }
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index b6517f4fea..fb60645994 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -42,6 +42,9 @@
import jakarta.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
@@ -52,24 +55,36 @@
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
@@ -77,6 +92,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo;
import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask;
@@ -100,6 +116,9 @@ public class TestReplicationSupervisor {
private static final long CURRENT_TERM = 1;
+ @TempDir
+ private File tempDir;
+
private final ContainerReplicator noopReplicator = task -> { };
private final ContainerReplicator throwingReplicator = task -> {
throw new RuntimeException("testing replication failure");
@@ -328,6 +347,126 @@ public void
testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout
.contains("Container 1 replication was unsuccessful.");
}
+ @ContainerLayoutTestInfo.ContainerTest
+ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout)
+ throws IOException, InterruptedException, TimeoutException {
+ this.layoutVersion = layout;
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());
+
+ long containerSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+ ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder()
+ .stateContext(context)
+ .executor(newDirectExecutorService())
+ .clock(clock)
+ .build();
+
+ long containerId = 1;
+ // create container
+ KeyValueContainerData containerData = new
KeyValueContainerData(containerId,
+ ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
+ HddsVolume vol = mock(HddsVolume.class);
+ containerData.setVolume(vol);
+ containerData.incrBytesUsed(100);
+ KeyValueContainer container = new KeyValueContainer(containerData, conf);
+ ContainerController controllerMock = mock(ContainerController.class);
+ Semaphore semaphore = new Semaphore(1);
+ when(controllerMock.importContainer(any(), any(), any()))
+ .thenAnswer((invocation) -> {
+ semaphore.acquire();
+ return container;
+ });
+ MutableVolumeSet volumeSet = new
MutableVolumeSet(datanode.getUuidString(), conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ File tarFile = containerTarFile(containerId, containerData);
+
+ SimpleContainerDownloader moc =
+ mock(SimpleContainerDownloader.class);
+ when(
+ moc.getContainerDataFromReplicas(anyLong(), anyList(),
+ any(Path.class), any()))
+ .thenReturn(tarFile.toPath());
+
+ ContainerImporter importer =
+ new ContainerImporter(conf, set, controllerMock, volumeSet);
+
+ HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0);
+ // Initially volume has 0 commit space
+ assertEquals(0, vol1.getCommittedBytes());
+ long usedSpace = vol1.getCurrentUsage().getUsedSpace();
+ // Initially volume has 0 used space
+ assertEquals(0, usedSpace);
+ // Increase committed bytes so that volume has only remaining 3 times
container size space
+ long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() -
containerSize * 3;
+ vol1.incCommittedBytes(initialCommittedBytes);
+ ContainerReplicator replicator =
+ new DownloadAndImportReplicator(conf, set, importer, moc);
+ replicatorRef.set(replicator);
+
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(DownloadAndImportReplicator.LOG);
+
+ // Acquire semaphore so that container import will pause after reserving
space.
+ semaphore.acquire();
+ CompletableFuture.runAsync(() -> {
+ try {
+ supervisor.addTask(createTask(containerId));
+ } catch (Exception ex) {
+ }
+ });
+
+ // Wait such that first container import reserve space
+ GenericTestUtils.waitFor(() ->
+ vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() -
containerSize * 3,
+ 1000, 50000);
+
+ // Volume has reserved space of 2 * containerSize
+ assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 *
containerSize);
+ // Container 2 import will fail as container 1 has reserved space and no
space left to import new container
+ // New container import requires at least (2 * container size)
+ long containerId2 = 2;
+ supervisor.addTask(createTask(containerId2));
+ GenericTestUtils.waitFor(() -> 1 ==
supervisor.getReplicationFailureCount(),
+ 1000, 50000);
+ assertThat(logCapturer.getOutput()).contains("No volumes have enough space
for a new container");
+ // Release semaphore so that first container import will pass
+ semaphore.release();
+ GenericTestUtils.waitFor(() ->
+ 1 == supervisor.getReplicationSuccessCount(), 1000, 50000);
+
+ usedSpace = vol1.getCurrentUsage().getUsedSpace();
+ // After replication, volume used space should be increased by container
used bytes
+ assertEquals(100, usedSpace);
+
+ // Volume committed bytes should become initial committed bytes which was
before replication
+ assertEquals(initialCommittedBytes, vol1.getCommittedBytes());
+
+ }
+
+
+ private File containerTarFile(
+ long containerId, ContainerData containerData) throws IOException {
+ File yamlFile = new File(tempDir, "container.yaml");
+ ContainerDataYaml.createContainerFile(containerData,
+ yamlFile);
+ File tarFile = new File(tempDir,
+ ContainerUtils.getContainerTarName(containerId));
+ try (OutputStream output = Files.newOutputStream(tarFile.toPath())) {
+ ArchiveOutputStream<TarArchiveEntry> archive = new
TarArchiveOutputStream(output);
+ TarArchiveEntry entry = archive.createArchiveEntry(yamlFile,
+ "container.yaml");
+ archive.putArchiveEntry(entry);
+ try (InputStream input = Files.newInputStream(yamlFile.toPath())) {
+ IOUtils.copy(input, archive);
+ }
+ archive.closeArchiveEntry();
+ }
+ return tarFile;
+ }
+
@ContainerLayoutTestInfo.ContainerTest
public void testTaskBeyondDeadline(ContainerLayoutVersion layout) {
this.layoutVersion = layout;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]