This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 0915f0b1b8 HDDS-10985. EC Reconstruction failed because the size of 
currentChunks was not equal to checksumBlockDataChunks. (#7009)
0915f0b1b8 is described below

commit 0915f0b1b83c0d354d1844d92861711c62489df5
Author: slfan1989 <[email protected]>
AuthorDate: Wed Sep 11 17:49:10 2024 +0800

    HDDS-10985. EC Reconstruction failed because the size of currentChunks was 
not equal to checksumBlockDataChunks. (#7009)
---
 .../hdds/scm/storage/ECBlockOutputStream.java      |  34 ++++++-
 .../ozone/container/common/helpers/BlockData.java  |  11 +++
 .../hdds/scm/storage/TestContainerCommandsEC.java  | 104 ++++++++++++++++-----
 3 files changed, 126 insertions(+), 23 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 12ca9978c6..7776e245be 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -38,9 +38,13 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -142,8 +146,34 @@ public class ECBlockOutputStream extends BlockOutputStream 
{
     }
 
     if (checksumBlockData != null) {
-      List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();
+
+      // For the same BlockGroupLength, we need to find the larger value of 
Block DataSize.
+      // This is because we do not send empty chunks to the DataNode, so the 
larger value is more accurate.
+      Map<Long, Optional<BlockData>> maxDataSizeByGroup = 
Arrays.stream(blockData)
+          .filter(Objects::nonNull)
+          .collect(Collectors.groupingBy(BlockData::getBlockGroupLength,
+          Collectors.maxBy(Comparator.comparingLong(BlockData::getSize))));
+      BlockData maxBlockData = maxDataSizeByGroup.get(blockGroupLength).get();
+
+      // When calculating the checksum size,
+      // We need to consider both blockGroupLength and the actual size of 
blockData.
+      //
+      // We use the smaller value to determine the size of the ChunkList.
+      //
+      // 1. In most cases, blockGroupLength is equal to the size of blockData.
+      // 2. Occasionally, blockData is not fully filled; if a chunk is empty,
+      // it is not sent to the DN, resulting in blockData size being smaller 
than blockGroupLength.
+      // 3. In cases with 'dirty data',
+      // if an error occurs when writing to the EC-Stripe (e.g., DN reports 
Container Closed),
+      // and the length confirmed with OM is smaller, blockGroupLength may be 
smaller than blockData size.
+      long blockDataSize = Math.min(maxBlockData.getSize(), blockGroupLength);
+      int chunkSize = (int) Math.ceil(((double) blockDataSize / 
repConfig.getEcChunkSize()));
       List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
+      if (chunkSize > 0) {
+        checksumBlockDataChunks = checksumBlockData.getChunks().subList(0, 
chunkSize);
+      }
+
+      List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();
 
       Preconditions.checkArgument(
           currentChunks.size() == checksumBlockDataChunks.size(),
@@ -269,7 +299,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
         throw ce;
       });
     } catch (IOException | ExecutionException e) {
-      throw new IOException(EXCEPTION_MSG + e.toString(), e);
+      throw new IOException(EXCEPTION_MSG + e, e);
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, false);
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
index 4bd170df8e..ea5c5453f3 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.utils.db.Codec;
 import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
 import org.apache.hadoop.hdds.utils.db.Proto3Codec;
+import org.apache.hadoop.ozone.OzoneConsts;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -280,4 +281,14 @@ public class BlockData {
     sb.append(", size=").append(size);
     sb.append("]");
   }
+
+  public long getBlockGroupLength() {
+    String lenStr = getMetadata()
+        .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
+    // If we don't have the length, then it indicates a problem with the 
stripe.
+    // All replica should carry the length, so if it is not there, we return 0,
+    // which will cause us to set the length of the block to zero and not
+    // attempt to reconstruct it.
+    return (lenStr == null) ? 0 : Long.parseLong(lenStr);
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index c274d8fea3..6f79839cd0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.SecretKeyTestClient;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
 import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -83,6 +84,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
@@ -99,6 +101,7 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -117,6 +120,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /**
  * This class tests container commands on EC containers.
@@ -613,30 +617,33 @@ public class TestContainerCommandsEC {
 
   @ParameterizedTest
   @MethodSource("recoverableMissingIndexes")
-  void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
+  void testECReconstructionCoordinatorWith(List<Integer> missingIndexes, 
boolean triggerRetry)
       throws Exception {
-    testECReconstructionCoordinator(missingIndexes, 3);
+    testECReconstructionCoordinator(missingIndexes, 3, triggerRetry);
   }
 
   @ParameterizedTest
   @MethodSource("recoverableMissingIndexes")
-  void testECReconstructionCoordinatorWithPartialStripe(List<Integer> 
missingIndexes)
-      throws Exception {
-    testECReconstructionCoordinator(missingIndexes, 1);
+  void testECReconstructionCoordinatorWithPartialStripe(List<Integer> 
missingIndexes,
+      boolean triggerRetry) throws Exception {
+    testECReconstructionCoordinator(missingIndexes, 1, triggerRetry);
   }
 
   @ParameterizedTest
   @MethodSource("recoverableMissingIndexes")
-  void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer> 
missingIndexes)
-      throws Exception {
-    testECReconstructionCoordinator(missingIndexes, 4);
+  void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer> 
missingIndexes,
+      boolean triggerRetry) throws Exception {
+    testECReconstructionCoordinator(missingIndexes, 4, triggerRetry);
   }
 
-  static Stream<List<Integer>> recoverableMissingIndexes() {
-    return Stream
-        .concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream
-            .of(ImmutableList.of(2, 3), ImmutableList.of(2, 4),
-                ImmutableList.of(3, 5), ImmutableList.of(4, 5)));
+  static Stream<Arguments> recoverableMissingIndexes() {
+    Stream<Arguments> args = IntStream.rangeClosed(1, 5).mapToObj(i -> 
arguments(ImmutableList.of(i), true));
+    Stream<Arguments> args1 = IntStream.rangeClosed(1, 5).mapToObj(i -> 
arguments(ImmutableList.of(i), false));
+    Stream<Arguments> args2 =  Stream.of(arguments(ImmutableList.of(2, 3), 
true),
+        arguments(ImmutableList.of(2, 4), true), arguments(ImmutableList.of(3, 
5), true));
+    Stream<Arguments> args3 =  Stream.of(arguments(ImmutableList.of(2, 3), 
false),
+        arguments(ImmutableList.of(2, 4), false), 
arguments(ImmutableList.of(3, 5), false));
+    return Stream.concat(Stream.concat(args, args1), Stream.concat(args2, 
args3));
   }
 
   /**
@@ -647,7 +654,7 @@ public class TestContainerCommandsEC {
   public void testECReconstructionCoordinatorWithMissingIndexes135() {
     InsufficientLocationsException exception =
         assertThrows(InsufficientLocationsException.class, () -> {
-          testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3);
+          testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3, false);
         });
 
     String expectedMessage =
@@ -658,7 +665,7 @@ public class TestContainerCommandsEC {
   }
 
   private void testECReconstructionCoordinator(List<Integer> missingIndexes,
-      int numInputChunks) throws Exception {
+      int numInputChunks, boolean triggerRetry) throws Exception {
     ObjectStore objectStore = rpcClient.getObjectStore();
     String keyString = UUID.randomUUID().toString();
     String volumeName = UUID.randomUUID().toString();
@@ -667,7 +674,7 @@ public class TestContainerCommandsEC {
     objectStore.getVolume(volumeName).createBucket(bucketName);
     OzoneVolume volume = objectStore.getVolume(volumeName);
     OzoneBucket bucket = volume.getBucket(bucketName);
-    createKeyAndWriteData(keyString, bucket, numInputChunks);
+    createKeyAndWriteData(keyString, bucket, numInputChunks, triggerRetry);
 
     try (
         XceiverClientManager xceiverClientManager =
@@ -779,7 +786,7 @@ public class TestContainerCommandsEC {
                           .getReplicationConfig(), cToken);
           assertEquals(blockDataArrList.get(i).length,
               reconstructedBlockData.length);
-          checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
+          checkBlockDataWithRetry(blockDataArrList.get(i), 
reconstructedBlockData, triggerRetry);
           XceiverClientSpi client = xceiverClientManager.acquireClient(
               newTargetPipeline);
           try {
@@ -800,7 +807,7 @@ public class TestContainerCommandsEC {
   }
 
   private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
-      int numChunks) throws IOException {
+      int numChunks, boolean triggerRetry) throws IOException {
     for (int i = 0; i < numChunks; i++) {
       inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE);
     }
@@ -809,11 +816,48 @@ public class TestContainerCommandsEC {
         new HashMap<>())) {
       assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
       for (int i = 0; i < numChunks; i++) {
+        // We generally wait until the data is written to the last chunk
+        // before attempting to trigger CloseContainer.
+        // We use an asynchronous approach for this trigger,
+        // aiming to ensure that closing the container does not interfere with 
the write operation.
+        // However, this process often needs to be executed multiple times 
before it takes effect.
+        if (i == numChunks - 1 && triggerRetry) {
+          triggerRetryByCloseContainer(out);
+        }
         out.write(inputChunks[i]);
       }
     }
   }
 
+  private void triggerRetryByCloseContainer(OzoneOutputStream out) {
+    CompletableFuture.runAsync(() -> {
+      BlockOutputStreamEntry blockOutputStreamEntry = 
out.getKeyOutputStream().getStreamEntries().get(0);
+      BlockID entryBlockID = blockOutputStreamEntry.getBlockID();
+      long entryContainerID = entryBlockID.getContainerID();
+      Pipeline entryPipeline = blockOutputStreamEntry.getPipeline();
+      Map<DatanodeDetails, Integer> replicaIndexes = 
entryPipeline.getReplicaIndexes();
+      try {
+        for (Map.Entry<DatanodeDetails, Integer> entry : 
replicaIndexes.entrySet()) {
+          DatanodeDetails key = entry.getKey();
+          Integer value = entry.getValue();
+          XceiverClientManager xceiverClientManager = new 
XceiverClientManager(config);
+          Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
+              .generateToken(ANY_USER, ContainerID.valueOf(entryContainerID));
+          XceiverClientSpi client = xceiverClientManager.acquireClient(
+              createSingleNodePipeline(entryPipeline, key, value));
+          try {
+            ContainerProtocolCalls.closeContainer(client, entryContainerID, 
cToken.encodeToUrlString());
+          } finally {
+            xceiverClientManager.releaseClient(client, false);
+          }
+          break;
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
   @Test
   public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()
       throws Exception {
@@ -826,7 +870,7 @@ public class TestContainerCommandsEC {
     objectStore.getVolume(volumeName).createBucket(bucketName);
     OzoneVolume volume = objectStore.getVolume(volumeName);
     OzoneBucket bucket = volume.getBucket(bucketName);
-    createKeyAndWriteData(keyString, bucket, 3);
+    createKeyAndWriteData(keyString, bucket, 3, false);
 
     OzoneKeyDetails key = bucket.getKey(keyString);
     long conID = key.getOzoneKeyLocations().get(0).getContainerID();
@@ -900,6 +944,25 @@ public class TestContainerCommandsEC {
         HddsProtos.LifeCycleEvent.CLOSE);
   }
 
+  private void checkBlockDataWithRetry(
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+      reconstructedBlockData, boolean triggerRetry) {
+    if (triggerRetry) {
+      for (int i = 0; i < reconstructedBlockData.length; i++) {
+        assertEquals(blockData[i].getBlockID(), 
reconstructedBlockData[i].getBlockID());
+        List<ContainerProtos.ChunkInfo> oldBlockDataChunks = 
blockData[i].getChunks();
+        List<ContainerProtos.ChunkInfo> newBlockDataChunks = 
reconstructedBlockData[i].getChunks();
+        for (int j = 0; j < newBlockDataChunks.size(); j++) {
+          ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
+          assertEquals(chunkInfo, newBlockDataChunks.get(j));
+        }
+      }
+      return;
+    }
+    checkBlockData(blockData, reconstructedBlockData);
+  }
+
   private void checkBlockData(
       org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
       org.apache.hadoop.ozone.container.common.helpers.BlockData[]
@@ -967,8 +1030,7 @@ public class TestContainerCommandsEC {
         out.write(values[i]);
       }
     }
-//    List<ContainerID> containerIDs =
-//        new ArrayList<>(scm.getContainerManager().getContainerIDs());
+
     List<ContainerID> containerIDs =
             scm.getContainerManager().getContainers()
                     .stream()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to