This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new dfa8141  HDDS-6237: EC: putBlock should pass close flag true on end of 
block group/close file. (#3026)
dfa8141 is described below

commit dfa814193706de44a18c1eb1764aecd3e1de0ed9
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Wed Feb 2 10:22:39 2022 -0800

    HDDS-6237: EC: putBlock should pass close flag true on end of block 
group/close file. (#3026)
    
    Co-authored-by: Uma Maheswara Rao G <[email protected]>
---
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |  4 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 32 +++++++------
 .../ozone/client/rpc/TestECKeyOutputStream.java    | 54 +++++++++++++++++++++-
 3 files changed, 72 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 69c3735..1f9d82c 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -265,7 +265,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
         .build();
   }
 
-  void executePutBlock() throws IOException {
+  void executePutBlock(boolean isClose) {
     if (!isInitialized()) {
       return;
     }
@@ -274,7 +274,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
         continue;
       }
       try {
-        stream.executePutBlock(false, true);
+        stream.executePutBlock(isClose, true);
       } catch (Exception e) {
         stream.setIoException(e);
       }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 1e99210..db51a65 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -199,8 +199,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
     writeOffset += len;
   }
 
-  private StripeWriteStatus rewriteStripeToNewBlockGroup(int chunkSize,
-      int failedStripeDataSize, boolean allocateBlockIfFull)
+  private StripeWriteStatus rewriteStripeToNewBlockGroup(
+      int failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
       throws IOException {
     long[] failedDataStripeChunkLens = new long[numDataBlks];
     long[] failedParityStripeChunkLens = new long[numParityBlks];
@@ -250,7 +250,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     if (hasWriteFailure()) {
       return StripeWriteStatus.FAILED;
     }
-    currentStreamEntry.executePutBlock();
+    currentStreamEntry.executePutBlock(close);
 
     if (hasPutBlockFailure()) {
       return StripeWriteStatus.FAILED;
@@ -285,10 +285,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
     int currentStreamIdx = currentStreamEntry.getCurrentStreamIdx();
     if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
       //Lets encode and write
-      if (handleParityWrites(ecChunkSize,
-          allocateBlockIfFull) == StripeWriteStatus.FAILED) {
-        handleStripeFailure(numDataBlks * ecChunkSize, ecChunkSize,
-            allocateBlockIfFull);
+      boolean shouldClose = currentStreamEntry.getRemaining() <= 0;
+      if (handleParityWrites(ecChunkSize, allocateBlockIfFull,
+          shouldClose) == StripeWriteStatus.FAILED) {
+        handleStripeFailure(numDataBlks * ecChunkSize, allocateBlockIfFull,
+            shouldClose);
       } else {
         // At this stage stripe write is successful.
         currentStreamEntry.updateBlockGroupToAckedPosition(
@@ -299,7 +300,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
   }
 
   private StripeWriteStatus handleParityWrites(int parityCellSize,
-      boolean allocateBlockIfFull)
+      boolean allocateBlockIfFull, boolean isLastStripe)
       throws IOException {
     writeParityCells(parityCellSize);
     if (hasWriteFailure()) {
@@ -311,7 +312,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
     // TODO: we should alter the put block calls to share CRC to each stream.
     ECBlockOutputStreamEntry streamEntry =
         blockOutputStreamEntryPool.getCurrentStreamEntry();
-    streamEntry.executePutBlock();
+    streamEntry
+        .executePutBlock(isLastStripe);
 
     if (hasPutBlockFailure()) {
       return StripeWriteStatus.FAILED;
@@ -528,8 +530,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
             lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
         addPadding(parityCellSize);
         if (handleParityWrites(parityCellSize,
-            false) == StripeWriteStatus.FAILED) {
-          handleStripeFailure(lastStripeSize, parityCellSize, false);
+            false, true) == StripeWriteStatus.FAILED) {
+          handleStripeFailure(lastStripeSize, false, true);
         } else {
           blockOutputStreamEntryPool.getCurrentStreamEntry()
               .updateBlockGroupToAckedPosition(
@@ -549,14 +551,14 @@ public class ECKeyOutputStream extends KeyOutputStream {
     ecChunkBufferCache.release();
   }
 
-  private void handleStripeFailure(int lastStripeSize, int parityCellSize,
-      boolean allocateBlockIfFull)
+  private void handleStripeFailure(int lastStripeSize,
+      boolean allocateBlockIfFull, boolean isClose)
       throws IOException {
     StripeWriteStatus stripeWriteStatus;
     for (int i = 0; i < this.config.getMaxECStripeWriteRetries(); i++) {
       stripeWriteStatus =
-          rewriteStripeToNewBlockGroup(parityCellSize, lastStripeSize,
-              allocateBlockIfFull);
+          rewriteStripeToNewBlockGroup(lastStripeSize,
+              allocateBlockIfFull, isClose);
       if (stripeWriteStatus == StripeWriteStatus.SUCCESS) {
         return;
       }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 73e02a5..cc4c0ae 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.client.rpc;
 
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -26,6 +27,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.BucketArgs;
@@ -34,12 +38,14 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -48,8 +54,11 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
@@ -103,7 +112,10 @@ public class TestECKeyOutputStream {
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
-
+    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+        TimeUnit.SECONDS);
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
         .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
         .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
@@ -229,6 +241,46 @@ public class TestECKeyOutputStream {
     validateContent(inputData, bucket, bucket.getKey(keyName));
   }
 
+  @Test
+  public void testECContainerKeysCount()
+      throws IOException, InterruptedException, TimeoutException {
+    byte[] inputData = getInputBytes(1);
+    final OzoneBucket bucket = getOzoneBucket();
+    ContainerOperationClient containerOperationClient =
+        new ContainerOperationClient(conf);
+    List<ContainerInfo> containerInfos =
+        containerOperationClient.listContainer(1, 100);
+    Map<ContainerID, Long> containerKeys = new HashMap<>();
+    for (ContainerInfo info : containerInfos) {
+      containerKeys.put(info.containerID(),
+          containerKeys.getOrDefault(info.containerID(), 0L) + 1);
+    }
+
+    String keyName = UUID.randomUUID().toString();
+    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+            chunkSize), new HashMap<>())) {
+      out.write(inputData);
+    }
+    OzoneKeyDetails key = bucket.getKey(keyName);
+    long currentKeyContainerID =
+        key.getOzoneKeyLocations().get(0).getContainerID();
+    Long priorKeys = containerKeys.get(new ContainerID(currentKeyContainerID));
+    long expectedKeys = (priorKeys != null ? priorKeys : 0) + 1L;
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return containerOperationClient
+            .listContainer(currentKeyContainerID, 100).get(0)
+            .getNumberOfKeys() == expectedKeys;
+      } catch (IOException exception) {
+        Assert.fail("Unexpected exception " + exception);
+        return false;
+      }
+    }, 100, 10000);
+    validateContent(inputData, bucket, key);
+  }
+
   private void validateContent(byte[] inputData, OzoneBucket bucket,
       OzoneKey key) throws IOException {
     try (OzoneInputStream is = bucket.readKey(key.getName())) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to