This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new dc0a10403a HDDS-9322. Remove duplicate containers when loading volumes
on a datanode (#5324)
dc0a10403a is described below
commit dc0a10403a069892aec8152a65d646eeccd43a1d
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Dec 20 20:05:47 2023 +0000
HDDS-9322. Remove duplicate containers when loading volumes on a datanode
(#5324)
---
.../ozone/container/ozoneimpl/ContainerReader.java | 85 ++++++++++++-
.../container/ozoneimpl/TestContainerReader.java | 133 ++++++++++++++++++---
2 files changed, 202 insertions(+), 16 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 5f300a446d..edbff14aca 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -35,6 +35,7 @@ import
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.DELETED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto.State.RECOVERING;
@@ -225,7 +226,15 @@ public class ContainerReader implements Runnable {
cleanupContainer(hddsVolume, kvContainer);
return;
}
- containerSet.addContainer(kvContainer);
+ try {
+ containerSet.addContainer(kvContainer);
+ } catch (StorageContainerException e) {
+ if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
+ throw e;
+ }
+ resolveDuplicate((KeyValueContainer) containerSet.getContainer(
+ kvContainer.getContainerData().getContainerID()), kvContainer);
+ }
} else {
throw new StorageContainerException("Container File is corrupted. " +
"ContainerType is KeyValueContainer but cast to " +
@@ -240,6 +249,80 @@ public class ContainerReader implements Runnable {
}
}
+ private void resolveDuplicate(KeyValueContainer existing,
+ KeyValueContainer toAdd) throws IOException {
+ if (existing.getContainerData().getReplicaIndex() != 0 ||
+ toAdd.getContainerData().getReplicaIndex() != 0) {
+ // This is an EC container. As EC Containers don't have a BSCID, we can't
+ // know which one has the most recent data. Additionally, it is possible
+ // for both copies to have a different replica index for the same
+ // container. Therefore we just let whatever one is loaded first win AND
+ // leave the other one on disk.
+ LOG.warn("Container {} is present at {} and at {}. Both are EC " +
+ "containers. Leaving both containers on disk.",
+ existing.getContainerData().getContainerID(),
+ existing.getContainerData().getContainerPath(),
+ toAdd.getContainerData().getContainerPath());
+ return;
+ }
+
+ long existingBCSID = existing.getBlockCommitSequenceId();
+ ContainerProtos.ContainerDataProto.State existingState
+ = existing.getContainerState();
+ long toAddBCSID = toAdd.getBlockCommitSequenceId();
+ ContainerProtos.ContainerDataProto.State toAddState
+ = toAdd.getContainerState();
+
+ if (existingState != toAddState) {
+ if (existingState == CLOSED) {
+ // If we have mis-matched states, always pick a closed one
+ LOG.warn("Container {} is present at {} with state CLOSED and at " +
+ "{} with state {}. Removing the latter container.",
+ existing.getContainerData().getContainerID(),
+ existing.getContainerData().getContainerPath(),
+ toAdd.getContainerData().getContainerPath(), toAddState);
+ KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
+ hddsVolume.getConf());
+ return;
+ } else if (toAddState == CLOSED) {
+ LOG.warn("Container {} is present at {} with state CLOSED and at " +
+ "{} with state {}. Removing the latter container.",
+ toAdd.getContainerData().getContainerID(),
+ toAdd.getContainerData().getContainerPath(),
+ existing.getContainerData().getContainerPath(), existingState);
+ swapAndRemoveContainer(existing, toAdd);
+ return;
+ }
+ }
+
+ if (existingBCSID >= toAddBCSID) {
+ // existing is newer or equal, so remove the one we have yet to load.
+ LOG.warn("Container {} is present at {} with a newer or equal BCSID " +
+ "than at {}. Removing the latter container.",
+ existing.getContainerData().getContainerID(),
+ existing.getContainerData().getContainerPath(),
+ toAdd.getContainerData().getContainerPath());
+ KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
+ hddsVolume.getConf());
+ } else {
+ LOG.warn("Container {} is present at {} with a lesser BCSID " +
+ "than at {}. Removing the former container.",
+ existing.getContainerData().getContainerID(),
+ existing.getContainerData().getContainerPath(),
+ toAdd.getContainerData().getContainerPath());
+ swapAndRemoveContainer(existing, toAdd);
+ }
+ }
+
+ private void swapAndRemoveContainer(KeyValueContainer existing,
+ KeyValueContainer toAdd) throws IOException {
+ containerSet.removeContainer(
+ existing.getContainerData().getContainerID());
+ containerSet.addContainer(toAdd);
+ KeyValueContainerUtil.removeContainer(existing.getContainerData(),
+ hddsVolume.getConf());
+ }
+
private void cleanupContainer(
HddsVolume volume, KeyValueContainer kvContainer) {
try {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index 3e947e135b..5248caaf65 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -32,12 +32,14 @@ import
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -49,8 +51,10 @@ import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -321,6 +325,10 @@ public class TestContainerReader {
MutableVolumeSet volumeSets =
new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
+ for (StorageVolume v : volumeSets.getVolumesList()) {
+ StorageVolumeUtil.checkVolume(v, clusterId, clusterId, conf,
+ null, null);
+ }
createDbInstancesForTestIfNeeded(volumeSets, clusterId, clusterId, conf);
ContainerCache cache = ContainerCache.getInstance(conf);
cache.shutdownCache();
@@ -330,24 +338,42 @@ public class TestContainerReader {
final int containerCount = 100;
blockCount = containerCount;
- for (int i = 0; i < containerCount; i++) {
- KeyValueContainerData keyValueContainerData =
- new KeyValueContainerData(i, layout,
- (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
- datanodeId.toString());
- KeyValueContainer keyValueContainer =
- new KeyValueContainer(keyValueContainerData,
- conf);
- keyValueContainer.create(volumeSets, policy, clusterId);
+ KeyValueContainer conflict01 = null;
+ KeyValueContainer conflict02 = null;
+ KeyValueContainer conflict11 = null;
+ KeyValueContainer conflict12 = null;
+ KeyValueContainer conflict21 = null;
+ KeyValueContainer conflict22 = null;
+ KeyValueContainer ec1 = null;
+ KeyValueContainer ec2 = null;
+ long baseBCSID = 10L;
- List<Long> blkNames;
- if (i % 2 == 0) {
- blkNames = addBlocks(keyValueContainer, true);
- markBlocksForDelete(keyValueContainer, true, blkNames, i);
+ for (int i = 0; i < containerCount; i++) {
+ if (i == 0) {
+ // Create a duplicate container with ID 0. Both have the same BSCID
+ conflict01 =
+ createContainerWithId(0, volumeSets, policy, baseBCSID, 0);
+ conflict02 =
+ createContainerWithId(0, volumeSets, policy, baseBCSID, 0);
+ } else if (i == 1) {
+ // Create a duplicate container with ID 1 so that the one has a
+ // larger BCSID
+ conflict11 =
+ createContainerWithId(1, volumeSets, policy, baseBCSID, 0);
+ conflict12 = createContainerWithId(
+ 1, volumeSets, policy, baseBCSID - 1, 0);
+ } else if (i == 2) {
+ conflict21 =
+ createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
+ conflict22 =
+ createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
+ conflict22.close();
+ } else if (i == 3) {
+ ec1 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1);
+ ec2 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1);
} else {
- blkNames = addBlocks(keyValueContainer, false);
- markBlocksForDelete(keyValueContainer, false, blkNames, i);
+ createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
}
}
// Close the RocksDB instance for this container and remove from the cache
@@ -374,11 +400,88 @@ public class TestContainerReader {
" costs " + (System.currentTimeMillis() - startTime) / 1000 + "s");
Assertions.assertEquals(containerCount,
containerSet.getContainerMap().entrySet().size());
+ Assertions.assertEquals(volumeSet.getFailedVolumesList().size(), 0);
+
+ // One of the conflict01 or conflict02 should have had its container path
+ // removed.
+ List<Path> paths = new ArrayList<>();
+ paths.add(Paths.get(conflict01.getContainerData().getContainerPath()));
+ paths.add(Paths.get(conflict02.getContainerData().getContainerPath()));
+ int exist = 0;
+ for (Path p : paths) {
+ if (Files.exists(p)) {
+ exist++;
+ }
+ }
+ Assertions.assertEquals(1, exist);
+ Assertions.assertTrue(paths.contains(Paths.get(
+ containerSet.getContainer(0).getContainerData().getContainerPath())));
+
+ // For conflict1, the one with the larger BCSID should win, which is
+ // conflict11.
+ Assertions.assertFalse(Files.exists(Paths.get(
+ conflict12.getContainerData().getContainerPath())));
+ Assertions.assertEquals(conflict11.getContainerData().getContainerPath(),
+ containerSet.getContainer(1).getContainerData().getContainerPath());
+ Assertions.assertEquals(baseBCSID, containerSet.getContainer(1)
+ .getContainerData().getBlockCommitSequenceId());
+
+ // For conflict2, the closed on (conflict22) should win.
+ Assertions.assertFalse(Files.exists(Paths.get(
+ conflict21.getContainerData().getContainerPath())));
+ Assertions.assertEquals(conflict22.getContainerData().getContainerPath(),
+ containerSet.getContainer(2).getContainerData().getContainerPath());
+ Assertions.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+ containerSet.getContainer(2).getContainerData().getState());
+
+ // For the EC conflict, both containers should be left on disk
+ Assertions.assertTrue(Files.exists(Paths.get(
+ ec1.getContainerData().getContainerPath())));
+ Assertions.assertTrue(Files.exists(Paths.get(
+ ec2.getContainerData().getContainerPath())));
+ Assertions.assertNotNull(containerSet.getContainer(3));
+
// There should be no open containers cached by the ContainerReader as it
// opens and closed them avoiding the cache.
Assertions.assertEquals(0, cache.size());
}
+ private KeyValueContainer createContainerWithId(int id, VolumeSet volSet,
+ VolumeChoosingPolicy policy, long bcsid, int replicaIndex)
+ throws Exception {
+ KeyValueContainerData keyValueContainerData =
+ new KeyValueContainerData(id, layout,
+ (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
+ datanodeId.toString());
+ keyValueContainerData.setReplicaIndex(replicaIndex);
+
+ KeyValueContainer keyValueContainer =
+ new KeyValueContainer(keyValueContainerData,
+ conf);
+ keyValueContainer.create(volSet, policy, clusterId);
+
+ List<Long> blkNames;
+ if (id % 2 == 0) {
+ blkNames = addBlocks(keyValueContainer, true);
+ markBlocksForDelete(keyValueContainer, true, blkNames, id);
+ } else {
+ blkNames = addBlocks(keyValueContainer, false);
+ markBlocksForDelete(keyValueContainer, false, blkNames, id);
+ }
+ setBlockCommitSequence(keyValueContainerData, bcsid);
+ return keyValueContainer;
+ }
+
+ private void setBlockCommitSequence(KeyValueContainerData cData, long val)
+ throws IOException {
+ try (DBHandle metadataStore = BlockUtils.getDB(cData, conf)) {
+ metadataStore.getStore().getMetadataTable()
+ .put(cData.getBcsIdKey(), val);
+ metadataStore.getStore().flushDB();
+ }
+ cData.updateBlockCommitSequenceId(val);
+ }
+
@ContainerTestVersionInfo.ContainerTest
public void testMarkedDeletedContainerCleared(
ContainerTestVersionInfo versionInfo) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]