This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a792e8fd29 HDDS-7241. EC: Reconstruction could fail with orphan 
blocks. (#4718)
a792e8fd29 is described below

commit a792e8fd297c81e3368188a58347c8048071032e
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed May 17 09:53:41 2023 +0100

    HDDS-7241. EC: Reconstruction could fail with orphan blocks. (#4718)
---
 .../ECReconstructionCoordinator.java               |  30 ++++
 .../hdds/scm/storage/TestContainerCommandsEC.java  | 189 ++++++++++++++++++++-
 2 files changed, 214 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 6bd0a4de96..badf999e47 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -444,6 +444,36 @@ public class ECReconstructionCoordinator implements 
Closeable {
         resultMap.put(blockID.getLocalID(), blkDataArr);
       }
     }
+    // When a stripe is written, the put block is sent to all nodes even if
+    // that nodes has zero bytes written to it. If the
+    // client does not get an ACK from all nodes, it will abandon the stripe,
+    // which can leave incomplete stripes on the DNs. Therefore, we should 
check
+    // that all blocks in the result map have an entry for all nodes. If they
+    // do not, it means this is an abandoned stripe and we should not attempt
+    // to reconstruct it.
+    // Note that if some nodes report different values for the block length,
+    // it also indicate garbage data at the end of the block. A different part
+    // of the code handles this and only reconstructs the valid part of the
+    // block, ie the minimum length reported by the nodes.
+    Iterator<Map.Entry<Long, BlockData[]>> resultIterator
+        = resultMap.entrySet().iterator();
+    while (resultIterator.hasNext()) {
+      Map.Entry<Long, BlockData[]> entry = resultIterator.next();
+      BlockData[] blockDataArr = entry.getValue();
+      for (Map.Entry<Integer, DatanodeDetails> e : sourceNodeMap.entrySet()) {
+        // There should be an entry in the Array for each keyset node. If there
+        // is not, this is an orphaned stripe and we should remove it from the
+        // result.
+        if (blockDataArr[e.getKey() - 1] == null) {
+          LOG.warn("In container {} block {} does not have a putBlock entry " +
+              "for index {} on datanode {} making it an orphan block / " +
+              "stripe. It will not be reconstructed", containerID,
+              entry.getKey(), e.getKey(), e.getValue());
+          resultIterator.remove();
+          break;
+        }
+      }
+    }
     return resultMap;
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index 9f96448f3b..0f3cdf8d35 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -31,12 +31,14 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -49,6 +51,7 @@ import 
org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
+import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -64,12 +67,15 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient;
 import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
 import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -83,6 +89,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -148,6 +155,10 @@ public class TestContainerCommandsEC {
   private static OzoneConfiguration config;
   private static CertificateClient certClient;
 
+  private static OzoneBucket classBucket;
+  private static OzoneVolume classVolume;
+  private static ReplicationConfig repConfig;
+
   @BeforeAll
   public static void init() throws Exception {
     config = new OzoneConfiguration();
@@ -157,6 +168,10 @@ public class TestContainerCommandsEC {
         OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
     config.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
     config.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true);
+    DatanodeConfiguration dnConf = config.getObject(
+        DatanodeConfiguration.class);
+    dnConf.setBlockDeletionInterval(Duration.ofSeconds(1));
+    config.setFromObject(dnConf);
     startCluster(config);
     prepareData(KEY_SIZE_RANGES);
   }
@@ -211,6 +226,170 @@ public class TestContainerCommandsEC {
     }
   }
 
+  private void closeAllPipelines(ReplicationConfig replicationConfig) {
+    scm.getPipelineManager().getPipelines(replicationConfig,
+            Pipeline.PipelineState.OPEN)
+        .forEach(p -> {
+          try {
+            scm.getPipelineManager().closePipeline(p, false);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          } catch (TimeoutException e) {
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  @Test
+  public void testOrphanBlock() throws Exception {
+    // Close all pipelines so we are guaranteed to get a new one
+    closeAllPipelines(repConfig);
+    // First write a full stripe, which is chunksize * dataNum
+    int keyLen = EC_CHUNK_SIZE * EC_DATA;
+    String keyName = UUID.randomUUID().toString();
+    try (OutputStream out = classBucket
+        .createKey(keyName, keyLen, repConfig, new HashMap<>())) {
+      out.write(RandomUtils.nextBytes(keyLen));
+    }
+    long orphanContainerID = classBucket.getKey(keyName)
+        .getOzoneKeyLocations().get(0).getContainerID();
+
+    PipelineID orphanPipelineID = scm.getContainerManager()
+        .getContainer(ContainerID.valueOf(orphanContainerID)).getPipelineID();
+
+    Pipeline orphanPipeline = scm.getPipelineManager()
+        .getPipeline(orphanPipelineID);
+
+    Token<ContainerTokenIdentifier> orphanContainerToken =
+        containerTokenGenerator.generateToken(
+            ANY_USER, new ContainerID(orphanContainerID));
+
+    // Close the container by closing the pipeline
+    scm.getPipelineManager().closePipeline(orphanPipeline, false);
+
+    // Find the datanode hosting Replica index = 2
+    DatanodeDetails dn2 = null;
+    HddsDatanodeService dn2Service = null;
+    List<DatanodeDetails> pipelineNodes = orphanPipeline.getNodes();
+    for (DatanodeDetails node : pipelineNodes) {
+      if (orphanPipeline.getReplicaIndex(node) == 2) {
+        dn2 = node;
+        break;
+      }
+    }
+    // Find the Cluster node corresponding to the datanode hosting index = 2
+    for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+      if (dn.getDatanodeDetails().equals(dn2)) {
+        dn2Service = dn;
+        break;
+      }
+    }
+
+    if (dn2 == null || dn2Service == null) {
+      throw new RuntimeException("Could not find datanode hosting index 2");
+    }
+
+    // Wait for all replicas in the pipeline to report as closed.
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return scm.getContainerManager().getContainerReplicas(
+            ContainerID.valueOf(orphanContainerID)).stream()
+            .allMatch(cr -> cr.getState() ==
+                StorageContainerDatanodeProtocolProtos.
+                    ContainerReplicaProto.State.CLOSED);
+      } catch (ContainerNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    }, 500, 10000);
+
+    // Get the block ID of the key we have just written. This will be used to
+    // delete the block from one of the datanode to make the stripe look like
+    // a orphan block.
+    long localID = classBucket.getKey(keyName)
+        .getOzoneKeyLocations().get(0).getLocalID();
+
+    // Create a delete command for the block and sent it.
+    DeleteBlocksCommand deleteBlocksCommand =
+        new DeleteBlocksCommand(ImmutableList.of(
+            StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+                .newBuilder()
+                .setContainerID(orphanContainerID)
+                .addLocalID(localID)
+                .setTxID(1L)
+                .setCount(10)
+                .build()));
+    dn2Service.getDatanodeStateMachine().getContext()
+        .addCommand(deleteBlocksCommand);
+
+    try (XceiverClientGrpc client = new XceiverClientGrpc(
+        createSingleNodePipeline(orphanPipeline, dn2, 1), cluster.getConf())) {
+      // Wait for the block to be actually deleted
+      GenericTestUtils.waitFor(() -> {
+        try {
+          ListBlockResponseProto response = ContainerProtocolCalls
+              .listBlock(client, orphanContainerID, null, Integer.MAX_VALUE,
+                  orphanContainerToken);
+          for (BlockData bd : response.getBlockDataList()) {
+            if (bd.getBlockID().getLocalID() == localID) {
+              return false;
+            }
+          }
+          return true;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }, 500, 30000);
+    }
+
+    ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(
+        config, certClient, null, ECReconstructionMetrics.create());
+
+    // Create a reconstruction command to create a new copy of indexes 4 and 5
+    // which means 1 to 3 must be available. However we know the block
+    // information is missing for index 2. As all containers in the stripe must
+    // have the block information, this makes the stripe look like a orphan
+    // block, where the write went to some nodes but not all.
+    SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
+    for (DatanodeDetails node : orphanPipeline.getNodes()) {
+      if (orphanPipeline.getReplicaIndex(node) <= EC_DATA) {
+        sourceNodeMap.put(orphanPipeline.getReplicaIndex(node), node);
+      }
+    }
+    // Here we find some spare nodes - ie nodes in the cluster that are not in
+    // the original pipeline.
+    List<DatanodeDetails> targets = cluster.getHddsDatanodes().stream()
+        .map(HddsDatanodeService::getDatanodeDetails)
+        .filter(d -> !orphanPipeline.getNodes().contains(d))
+        .limit(2)
+        .collect(Collectors.toList());
+    SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
+    for (int j = 0; j < targets.size(); j++) {
+      targetNodeMap.put(EC_DATA + j + 1, targets.get(j));
+    }
+
+    // Attempt to reconstruct the container.
+    coordinator.reconstructECContainerGroup(orphanContainerID,
+        (ECReplicationConfig) repConfig,
+        sourceNodeMap, targetNodeMap);
+
+    // Check the block listing for the recovered containers 4 or 5 and they
+    // should be present but with no blocks as the only block in the container
+    // was an orphan block.
+    try (XceiverClientGrpc reconClient = new XceiverClientGrpc(
+        createSingleNodePipeline(orphanPipeline, targetNodeMap.get(4), 4),
+        cluster.getConf())) {
+      ListBlockResponseProto response = ContainerProtocolCalls
+          .listBlock(reconClient, orphanContainerID, null, Integer.MAX_VALUE,
+              orphanContainerToken);
+      long count = response.getBlockDataList().stream()
+          .filter(bd -> bd.getBlockID().getLocalID() == localID)
+          .count();
+
+      Assert.assertEquals(0L, count);
+      Assert.assertEquals(0, response.getBlockDataList().size());
+    }
+  }
+
   @Test
   public void testListBlock() throws Exception {
     for (int i = 0; i < datanodeDetails.size(); i++) {
@@ -677,17 +856,17 @@ public class TestContainerCommandsEC {
     final String volumeName = UUID.randomUUID().toString();
     final String bucketName = UUID.randomUUID().toString();
     store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-    OzoneBucket bucket = volume.getBucket(bucketName);
-    final ReplicationConfig repConfig =
+    classVolume = store.getVolume(volumeName);
+    classVolume.createBucket(bucketName);
+    classBucket = classVolume.getBucket(bucketName);
+    repConfig =
         new ECReplicationConfig(EC_DATA, EC_PARITY, EC_CODEC, EC_CHUNK_SIZE);
     values = new byte[ranges.length][];
     for (int i = 0; i < ranges.length; i++) {
       int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]);
       values[i] = RandomUtils.nextBytes(keySize);
       final String keyName = UUID.randomUUID().toString();
-      try (OutputStream out = bucket
+      try (OutputStream out = classBucket
           .createKey(keyName, values[i].length, repConfig, new HashMap<>())) {
         out.write(values[i]);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to