This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dc0a10403a HDDS-9322. Remove duplicate containers when loading volumes 
on a datanode (#5324)
dc0a10403a is described below

commit dc0a10403a069892aec8152a65d646eeccd43a1d
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Dec 20 20:05:47 2023 +0000

    HDDS-9322. Remove duplicate containers when loading volumes on a datanode 
(#5324)
---
 .../ozone/container/ozoneimpl/ContainerReader.java |  85 ++++++++++++-
 .../container/ozoneimpl/TestContainerReader.java   | 133 ++++++++++++++++++---
 2 files changed, 202 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 5f300a446d..edbff14aca 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.DELETED;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerDataProto.State.RECOVERING;
@@ -225,7 +226,15 @@ public class ContainerReader implements Runnable {
           cleanupContainer(hddsVolume, kvContainer);
           return;
         }
-        containerSet.addContainer(kvContainer);
+        try {
+          containerSet.addContainer(kvContainer);
+        } catch (StorageContainerException e) {
+          if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
+            throw e;
+          }
+          resolveDuplicate((KeyValueContainer) containerSet.getContainer(
+              kvContainer.getContainerData().getContainerID()), kvContainer);
+        }
       } else {
         throw new StorageContainerException("Container File is corrupted. " +
             "ContainerType is KeyValueContainer but cast to " +
@@ -240,6 +249,80 @@ public class ContainerReader implements Runnable {
     }
   }
 
+  private void resolveDuplicate(KeyValueContainer existing,
+      KeyValueContainer toAdd) throws IOException {
+    if (existing.getContainerData().getReplicaIndex() != 0 ||
+        toAdd.getContainerData().getReplicaIndex() != 0) {
+      // This is an EC container. As EC Containers don't have a BSCID, we can't
+      // know which one has the most recent data. Additionally, it is possible
+      // for both copies to have a different replica index for the same
+      // container. Therefore we just let whatever one is loaded first win AND
+      // leave the other one on disk.
+      LOG.warn("Container {} is present at {} and at {}. Both are EC " +
+              "containers. Leaving both containers on disk.",
+          existing.getContainerData().getContainerID(),
+          existing.getContainerData().getContainerPath(),
+          toAdd.getContainerData().getContainerPath());
+      return;
+    }
+
+    long existingBCSID = existing.getBlockCommitSequenceId();
+    ContainerProtos.ContainerDataProto.State existingState
+        = existing.getContainerState();
+    long toAddBCSID = toAdd.getBlockCommitSequenceId();
+    ContainerProtos.ContainerDataProto.State toAddState
+        = toAdd.getContainerState();
+
+    if (existingState != toAddState) {
+      if (existingState == CLOSED) {
+        // If we have mis-matched states, always pick a closed one
+        LOG.warn("Container {} is present at {} with state CLOSED and at " +
+                "{} with state {}. Removing the latter container.",
+            existing.getContainerData().getContainerID(),
+            existing.getContainerData().getContainerPath(),
+            toAdd.getContainerData().getContainerPath(), toAddState);
+        KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
+            hddsVolume.getConf());
+        return;
+      } else if (toAddState == CLOSED) {
+        LOG.warn("Container {} is present at {} with state CLOSED and at " +
+                "{} with state {}. Removing the latter container.",
+            toAdd.getContainerData().getContainerID(),
+            toAdd.getContainerData().getContainerPath(),
+            existing.getContainerData().getContainerPath(), existingState);
+        swapAndRemoveContainer(existing, toAdd);
+        return;
+      }
+    }
+
+    if (existingBCSID >= toAddBCSID) {
+      // existing is newer or equal, so remove the one we have yet to load.
+      LOG.warn("Container {} is present at {} with a newer or equal BCSID " +
+          "than at {}. Removing the latter container.",
+          existing.getContainerData().getContainerID(),
+          existing.getContainerData().getContainerPath(),
+          toAdd.getContainerData().getContainerPath());
+      KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
+          hddsVolume.getConf());
+    } else {
+      LOG.warn("Container {} is present at {} with a lesser BCSID " +
+              "than at {}. Removing the former container.",
+          existing.getContainerData().getContainerID(),
+          existing.getContainerData().getContainerPath(),
+          toAdd.getContainerData().getContainerPath());
+      swapAndRemoveContainer(existing, toAdd);
+    }
+  }
+
+  private void swapAndRemoveContainer(KeyValueContainer existing,
+      KeyValueContainer toAdd) throws IOException {
+    containerSet.removeContainer(
+        existing.getContainerData().getContainerID());
+    containerSet.addContainer(toAdd);
+    KeyValueContainerUtil.removeContainer(existing.getContainerData(),
+        hddsVolume.getConf());
+  }
+
   private void cleanupContainer(
       HddsVolume volume, KeyValueContainer kvContainer) {
     try {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index 3e947e135b..5248caaf65 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -32,12 +32,14 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -49,8 +51,10 @@ import org.junit.jupiter.api.io.TempDir;
 import org.mockito.Mockito;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -321,6 +325,10 @@ public class TestContainerReader {
     MutableVolumeSet volumeSets =
         new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null,
             StorageVolume.VolumeType.DATA_VOLUME, null);
+    for (StorageVolume v : volumeSets.getVolumesList()) {
+      StorageVolumeUtil.checkVolume(v, clusterId, clusterId, conf,
+          null, null);
+    }
     createDbInstancesForTestIfNeeded(volumeSets, clusterId, clusterId, conf);
     ContainerCache cache = ContainerCache.getInstance(conf);
     cache.shutdownCache();
@@ -330,24 +338,42 @@ public class TestContainerReader {
 
     final int containerCount = 100;
     blockCount = containerCount;
-    for (int i = 0; i < containerCount; i++) {
-      KeyValueContainerData keyValueContainerData =
-          new KeyValueContainerData(i, layout,
-              (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
-              datanodeId.toString());
 
-      KeyValueContainer keyValueContainer =
-          new KeyValueContainer(keyValueContainerData,
-              conf);
-      keyValueContainer.create(volumeSets, policy, clusterId);
+    KeyValueContainer conflict01 = null;
+    KeyValueContainer conflict02 = null;
+    KeyValueContainer conflict11 = null;
+    KeyValueContainer conflict12 = null;
+    KeyValueContainer conflict21 = null;
+    KeyValueContainer conflict22 = null;
+    KeyValueContainer ec1 = null;
+    KeyValueContainer ec2 = null;
+    long baseBCSID = 10L;
 
-      List<Long> blkNames;
-      if (i % 2 == 0) {
-        blkNames = addBlocks(keyValueContainer, true);
-        markBlocksForDelete(keyValueContainer, true, blkNames, i);
+    for (int i = 0; i < containerCount; i++) {
+      if (i == 0) {
+        // Create a duplicate container with ID 0. Both have the same BSCID
+        conflict01 =
+            createContainerWithId(0, volumeSets, policy, baseBCSID, 0);
+        conflict02 =
+            createContainerWithId(0, volumeSets, policy, baseBCSID, 0);
+      } else if (i == 1) {
+        // Create a duplicate container with ID 1 so that the one has a
+        // larger BCSID
+        conflict11 =
+            createContainerWithId(1, volumeSets, policy, baseBCSID, 0);
+        conflict12 = createContainerWithId(
+            1, volumeSets, policy, baseBCSID - 1, 0);
+      } else if (i == 2) {
+        conflict21 =
+            createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
+        conflict22 =
+            createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
+        conflict22.close();
+      } else if (i == 3) {
+        ec1 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1);
+        ec2 = createContainerWithId(i, volumeSets, policy, baseBCSID, 1);
       } else {
-        blkNames = addBlocks(keyValueContainer, false);
-        markBlocksForDelete(keyValueContainer, false, blkNames, i);
+        createContainerWithId(i, volumeSets, policy, baseBCSID, 0);
       }
     }
     // Close the RocksDB instance for this container and remove from the cache
@@ -374,11 +400,88 @@ public class TestContainerReader {
         " costs " + (System.currentTimeMillis() - startTime) / 1000 + "s");
     Assertions.assertEquals(containerCount,
         containerSet.getContainerMap().entrySet().size());
+    Assertions.assertEquals(volumeSet.getFailedVolumesList().size(), 0);
+
+    // One of the conflict01 or conflict02 should have had its container path
+    // removed.
+    List<Path> paths = new ArrayList<>();
+    paths.add(Paths.get(conflict01.getContainerData().getContainerPath()));
+    paths.add(Paths.get(conflict02.getContainerData().getContainerPath()));
+    int exist = 0;
+    for (Path p : paths) {
+      if (Files.exists(p)) {
+        exist++;
+      }
+    }
+    Assertions.assertEquals(1, exist);
+    Assertions.assertTrue(paths.contains(Paths.get(
+        containerSet.getContainer(0).getContainerData().getContainerPath())));
+
+    // For conflict1, the one with the larger BCSID should win, which is
+    // conflict11.
+    Assertions.assertFalse(Files.exists(Paths.get(
+        conflict12.getContainerData().getContainerPath())));
+    Assertions.assertEquals(conflict11.getContainerData().getContainerPath(),
+        containerSet.getContainer(1).getContainerData().getContainerPath());
+    Assertions.assertEquals(baseBCSID, containerSet.getContainer(1)
+        .getContainerData().getBlockCommitSequenceId());
+
+    // For conflict2, the closed on (conflict22) should win.
+    Assertions.assertFalse(Files.exists(Paths.get(
+        conflict21.getContainerData().getContainerPath())));
+    Assertions.assertEquals(conflict22.getContainerData().getContainerPath(),
+        containerSet.getContainer(2).getContainerData().getContainerPath());
+    Assertions.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+        containerSet.getContainer(2).getContainerData().getState());
+
+    // For the EC conflict, both containers should be left on disk
+    Assertions.assertTrue(Files.exists(Paths.get(
+        ec1.getContainerData().getContainerPath())));
+    Assertions.assertTrue(Files.exists(Paths.get(
+        ec2.getContainerData().getContainerPath())));
+    Assertions.assertNotNull(containerSet.getContainer(3));
+
     // There should be no open containers cached by the ContainerReader as it
     // opens and closed them avoiding the cache.
     Assertions.assertEquals(0, cache.size());
   }
 
+  private KeyValueContainer createContainerWithId(int id, VolumeSet volSet,
+      VolumeChoosingPolicy policy, long bcsid, int replicaIndex)
+      throws Exception {
+    KeyValueContainerData keyValueContainerData =
+        new KeyValueContainerData(id, layout,
+            (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
+            datanodeId.toString());
+    keyValueContainerData.setReplicaIndex(replicaIndex);
+
+    KeyValueContainer keyValueContainer =
+        new KeyValueContainer(keyValueContainerData,
+            conf);
+    keyValueContainer.create(volSet, policy, clusterId);
+
+    List<Long> blkNames;
+    if (id % 2 == 0) {
+      blkNames = addBlocks(keyValueContainer, true);
+      markBlocksForDelete(keyValueContainer, true, blkNames, id);
+    } else {
+      blkNames = addBlocks(keyValueContainer, false);
+      markBlocksForDelete(keyValueContainer, false, blkNames, id);
+    }
+    setBlockCommitSequence(keyValueContainerData, bcsid);
+    return keyValueContainer;
+  }
+
+  private void setBlockCommitSequence(KeyValueContainerData cData, long val)
+      throws IOException {
+    try (DBHandle metadataStore = BlockUtils.getDB(cData, conf)) {
+      metadataStore.getStore().getMetadataTable()
+          .put(cData.getBcsIdKey(), val);
+      metadataStore.getStore().flushDB();
+    }
+    cData.updateBlockCommitSequenceId(val);
+  }
+
   @ContainerTestVersionInfo.ContainerTest
   public void testMarkedDeletedContainerCleared(
       ContainerTestVersionInfo versionInfo) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to