This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a792e8fd29 HDDS-7241. EC: Reconstruction could fail with orphan
blocks. (#4718)
a792e8fd29 is described below
commit a792e8fd297c81e3368188a58347c8048071032e
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed May 17 09:53:41 2023 +0100
HDDS-7241. EC: Reconstruction could fail with orphan blocks. (#4718)
---
.../ECReconstructionCoordinator.java | 30 ++++
.../hdds/scm/storage/TestContainerCommandsEC.java | 189 ++++++++++++++++++++-
2 files changed, 214 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 6bd0a4de96..badf999e47 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -444,6 +444,36 @@ public class ECReconstructionCoordinator implements
Closeable {
resultMap.put(blockID.getLocalID(), blkDataArr);
}
}
+ // When a stripe is written, the put block is sent to all nodes even if
+ // that nodes has zero bytes written to it. If the
+ // client does not get an ACK from all nodes, it will abandon the stripe,
+ // which can leave incomplete stripes on the DNs. Therefore, we should
check
+ // that all blocks in the result map have an entry for all nodes. If they
+ // do not, it means this is an abandoned stripe and we should not attempt
+ // to reconstruct it.
+ // Note that if some nodes report different values for the block length,
+ // it also indicate garbage data at the end of the block. A different part
+ // of the code handles this and only reconstructs the valid part of the
+ // block, ie the minimum length reported by the nodes.
+ Iterator<Map.Entry<Long, BlockData[]>> resultIterator
+ = resultMap.entrySet().iterator();
+ while (resultIterator.hasNext()) {
+ Map.Entry<Long, BlockData[]> entry = resultIterator.next();
+ BlockData[] blockDataArr = entry.getValue();
+ for (Map.Entry<Integer, DatanodeDetails> e : sourceNodeMap.entrySet()) {
+ // There should be an entry in the Array for each keyset node. If there
+ // is not, this is an orphaned stripe and we should remove it from the
+ // result.
+ if (blockDataArr[e.getKey() - 1] == null) {
+ LOG.warn("In container {} block {} does not have a putBlock entry " +
+ "for index {} on datanode {} making it an orphan block / " +
+ "stripe. It will not be reconstructed", containerID,
+ entry.getKey(), e.getKey(), e.getValue());
+ resultIterator.remove();
+ break;
+ }
+ }
+ }
return resultMap;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index 9f96448f3b..0f3cdf8d35 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -31,12 +31,14 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -49,6 +51,7 @@ import
org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
+import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -64,12 +67,15 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import
org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient;
import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -83,6 +89,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -148,6 +155,10 @@ public class TestContainerCommandsEC {
private static OzoneConfiguration config;
private static CertificateClient certClient;
+ private static OzoneBucket classBucket;
+ private static OzoneVolume classVolume;
+ private static ReplicationConfig repConfig;
+
@BeforeAll
public static void init() throws Exception {
config = new OzoneConfiguration();
@@ -157,6 +168,10 @@ public class TestContainerCommandsEC {
OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
config.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
config.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true);
+ DatanodeConfiguration dnConf = config.getObject(
+ DatanodeConfiguration.class);
+ dnConf.setBlockDeletionInterval(Duration.ofSeconds(1));
+ config.setFromObject(dnConf);
startCluster(config);
prepareData(KEY_SIZE_RANGES);
}
@@ -211,6 +226,170 @@ public class TestContainerCommandsEC {
}
}
+ private void closeAllPipelines(ReplicationConfig replicationConfig) {
+ scm.getPipelineManager().getPipelines(replicationConfig,
+ Pipeline.PipelineState.OPEN)
+ .forEach(p -> {
+ try {
+ scm.getPipelineManager().closePipeline(p, false);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testOrphanBlock() throws Exception {
+ // Close all pipelines so we are guaranteed to get a new one
+ closeAllPipelines(repConfig);
+ // First write a full stripe, which is chunksize * dataNum
+ int keyLen = EC_CHUNK_SIZE * EC_DATA;
+ String keyName = UUID.randomUUID().toString();
+ try (OutputStream out = classBucket
+ .createKey(keyName, keyLen, repConfig, new HashMap<>())) {
+ out.write(RandomUtils.nextBytes(keyLen));
+ }
+ long orphanContainerID = classBucket.getKey(keyName)
+ .getOzoneKeyLocations().get(0).getContainerID();
+
+ PipelineID orphanPipelineID = scm.getContainerManager()
+ .getContainer(ContainerID.valueOf(orphanContainerID)).getPipelineID();
+
+ Pipeline orphanPipeline = scm.getPipelineManager()
+ .getPipeline(orphanPipelineID);
+
+ Token<ContainerTokenIdentifier> orphanContainerToken =
+ containerTokenGenerator.generateToken(
+ ANY_USER, new ContainerID(orphanContainerID));
+
+ // Close the container by closing the pipeline
+ scm.getPipelineManager().closePipeline(orphanPipeline, false);
+
+ // Find the datanode hosting Replica index = 2
+ DatanodeDetails dn2 = null;
+ HddsDatanodeService dn2Service = null;
+ List<DatanodeDetails> pipelineNodes = orphanPipeline.getNodes();
+ for (DatanodeDetails node : pipelineNodes) {
+ if (orphanPipeline.getReplicaIndex(node) == 2) {
+ dn2 = node;
+ break;
+ }
+ }
+ // Find the Cluster node corresponding to the datanode hosting index = 2
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ if (dn.getDatanodeDetails().equals(dn2)) {
+ dn2Service = dn;
+ break;
+ }
+ }
+
+ if (dn2 == null || dn2Service == null) {
+ throw new RuntimeException("Could not find datanode hosting index 2");
+ }
+
+ // Wait for all replicas in the pipeline to report as closed.
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return scm.getContainerManager().getContainerReplicas(
+ ContainerID.valueOf(orphanContainerID)).stream()
+ .allMatch(cr -> cr.getState() ==
+ StorageContainerDatanodeProtocolProtos.
+ ContainerReplicaProto.State.CLOSED);
+ } catch (ContainerNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }, 500, 10000);
+
+ // Get the block ID of the key we have just written. This will be used to
+ // delete the block from one of the datanode to make the stripe look like
+ // a orphan block.
+ long localID = classBucket.getKey(keyName)
+ .getOzoneKeyLocations().get(0).getLocalID();
+
+ // Create a delete command for the block and sent it.
+ DeleteBlocksCommand deleteBlocksCommand =
+ new DeleteBlocksCommand(ImmutableList.of(
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+ .newBuilder()
+ .setContainerID(orphanContainerID)
+ .addLocalID(localID)
+ .setTxID(1L)
+ .setCount(10)
+ .build()));
+ dn2Service.getDatanodeStateMachine().getContext()
+ .addCommand(deleteBlocksCommand);
+
+ try (XceiverClientGrpc client = new XceiverClientGrpc(
+ createSingleNodePipeline(orphanPipeline, dn2, 1), cluster.getConf())) {
+ // Wait for the block to be actually deleted
+ GenericTestUtils.waitFor(() -> {
+ try {
+ ListBlockResponseProto response = ContainerProtocolCalls
+ .listBlock(client, orphanContainerID, null, Integer.MAX_VALUE,
+ orphanContainerToken);
+ for (BlockData bd : response.getBlockDataList()) {
+ if (bd.getBlockID().getLocalID() == localID) {
+ return false;
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 500, 30000);
+ }
+
+ ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(
+ config, certClient, null, ECReconstructionMetrics.create());
+
+ // Create a reconstruction command to create a new copy of indexes 4 and 5
+ // which means 1 to 3 must be available. However we know the block
+ // information is missing for index 2. As all containers in the stripe must
+ // have the block information, this makes the stripe look like a orphan
+ // block, where the write went to some nodes but not all.
+ SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
+ for (DatanodeDetails node : orphanPipeline.getNodes()) {
+ if (orphanPipeline.getReplicaIndex(node) <= EC_DATA) {
+ sourceNodeMap.put(orphanPipeline.getReplicaIndex(node), node);
+ }
+ }
+ // Here we find some spare nodes - ie nodes in the cluster that are not in
+ // the original pipeline.
+ List<DatanodeDetails> targets = cluster.getHddsDatanodes().stream()
+ .map(HddsDatanodeService::getDatanodeDetails)
+ .filter(d -> !orphanPipeline.getNodes().contains(d))
+ .limit(2)
+ .collect(Collectors.toList());
+ SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
+ for (int j = 0; j < targets.size(); j++) {
+ targetNodeMap.put(EC_DATA + j + 1, targets.get(j));
+ }
+
+ // Attempt to reconstruct the container.
+ coordinator.reconstructECContainerGroup(orphanContainerID,
+ (ECReplicationConfig) repConfig,
+ sourceNodeMap, targetNodeMap);
+
+ // Check the block listing for the recovered containers 4 or 5 and they
+ // should be present but with no blocks as the only block in the container
+ // was an orphan block.
+ try (XceiverClientGrpc reconClient = new XceiverClientGrpc(
+ createSingleNodePipeline(orphanPipeline, targetNodeMap.get(4), 4),
+ cluster.getConf())) {
+ ListBlockResponseProto response = ContainerProtocolCalls
+ .listBlock(reconClient, orphanContainerID, null, Integer.MAX_VALUE,
+ orphanContainerToken);
+ long count = response.getBlockDataList().stream()
+ .filter(bd -> bd.getBlockID().getLocalID() == localID)
+ .count();
+
+ Assert.assertEquals(0L, count);
+ Assert.assertEquals(0, response.getBlockDataList().size());
+ }
+ }
+
@Test
public void testListBlock() throws Exception {
for (int i = 0; i < datanodeDetails.size(); i++) {
@@ -677,17 +856,17 @@ public class TestContainerCommandsEC {
final String volumeName = UUID.randomUUID().toString();
final String bucketName = UUID.randomUUID().toString();
store.createVolume(volumeName);
- OzoneVolume volume = store.getVolume(volumeName);
- volume.createBucket(bucketName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- final ReplicationConfig repConfig =
+ classVolume = store.getVolume(volumeName);
+ classVolume.createBucket(bucketName);
+ classBucket = classVolume.getBucket(bucketName);
+ repConfig =
new ECReplicationConfig(EC_DATA, EC_PARITY, EC_CODEC, EC_CHUNK_SIZE);
values = new byte[ranges.length][];
for (int i = 0; i < ranges.length; i++) {
int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]);
values[i] = RandomUtils.nextBytes(keySize);
final String keyName = UUID.randomUUID().toString();
- try (OutputStream out = bucket
+ try (OutputStream out = classBucket
.createKey(keyName, values[i].length, repConfig, new HashMap<>())) {
out.write(values[i]);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]