This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 86f7354  HDDS-6036: EC: handleStripeFailure should retry (#2910)
86f7354 is described below

commit 86f735418a0ded94d18a5729a18af6e6e16a48ca
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Dec 16 09:28:35 2021 -0800

    HDDS-6036: EC: handleStripeFailure should retry (#2910)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |  11 ++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   5 +
 .../common/src/main/resources/ozone-default.xml    |  13 ++
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |  10 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 115 ++++++++++----
 .../hadoop/ozone/client/MockBlockAllocator.java    |   5 +-
 .../hadoop/ozone/client/MockOmTransport.java       |  21 ++-
 .../ozone/client/MockXceiverClientFactory.java     |  39 +++--
 .../client/MultiNodePipelineBlockAllocator.java    |  97 +++++++++---
 .../ozone/client/SinglePipelineBlockAllocator.java |   5 +-
 .../hadoop/ozone/client/TestOzoneClient.java       |   4 +-
 .../hadoop/ozone/client/TestOzoneECClient.java     | 173 ++++++++++++++++++++-
 12 files changed, 424 insertions(+), 74 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index f39ec86..8861f4b 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -124,6 +124,13 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private boolean checksumVerify = true;
 
+  @Config(key = "max.ec.stripe.write.retries",
+      defaultValue = "10",
+      description = "Ozone EC client to retry stripe to new block group on" +
+          " failures.",
+      tags = ConfigTag.CLIENT)
+  private int maxECStripeWriteRetries = 10;
+
   @PostConstruct
   private void validate() {
     Preconditions.checkState(streamBufferSize > 0);
@@ -224,6 +231,10 @@ public class OzoneClientConfig {
     this.checksumVerify = checksumVerify;
   }
 
+  public int getMaxECStripeWriteRetries() {
+    return this.maxECStripeWriteRetries;
+  }
+
   public int getBufferIncrement() {
     return bufferIncrement;
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 9c7d769..f719571 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -115,6 +115,11 @@ public final class OzoneConfigKeys {
       "ozone.scm.block.size";
   public static final String OZONE_SCM_BLOCK_SIZE_DEFAULT = "256MB";
 
+  public static final String OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES =
+      "ozone.client.max.ec.stripe.write.retries";
+  public static final String OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES_DEFAULT =
+      "10";
+
   /**
    * Ozone administrator users delimited by comma.
    * If not set, only the user who launches an ozone service will be the
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e236d1e..3e2b5e9 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1178,6 +1178,7 @@
       Supported values: RATIS, STAND_ALONE, CHAINED and EC.
     </description>
   </property>
+
   <property>
     <name>hdds.container.close.threshold</name>
     <value>0.9f</value>
@@ -3044,4 +3045,16 @@
       will create intermediate directories.
     </description>
   </property>
+
+  <property>
+    <name>ozone.client.max.ec.stripe.write.retries</name>
+    <value>10</value>
+    <tag>CLIENT</tag>
+    <description>
+      When EC stripe write failed, client will request to allocate new block 
group and write the failed stripe into new
+      block group. If the same stripe failure continued in newly acquired 
block group also, then it will retry by
+      requesting to allocate new block group again. This configuration is used 
to limit these number of retries. By
+      default the number of retries are 10.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 0cb7b0f..3670231 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -290,6 +290,14 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
     return blockOutputStreams[0].getBlockID();
   }
 
+  public List<ECBlockOutputStream> streamsWithWriteFailure() {
+    return getFailedStreams(false);
+  }
+
+  public List<ECBlockOutputStream> streamsWithPutBlockFailure() {
+    return getFailedStreams(true);
+  }
+
   /**
    * In EC, we will do async write calls for writing data in the scope of a
    * stripe. After every stripe write finishes, use this method to validate the
@@ -300,7 +308,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
    *                   futures if false.
    * @return
    */
-  public List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
+  private List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
     final Iterator<ECBlockOutputStream> iter = blockStreams().iterator();
     List<ECBlockOutputStream> failedStreams = new ArrayList<>();
     while (iter.hasNext()) {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index f5480c7..fd333e2 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -174,7 +174,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
     int pos = handleDataWrite(currentStreamIdx, b, off,
         currentWriterChunkLenToWrite,
         currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
-    //TODO: do we really need this call?
     checkAndWriteParityCells(pos, false);
     int remLen = len - currentWriterChunkLenToWrite;
     int iters = remLen / ecChunkSize;
@@ -200,7 +199,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
     writeOffset += len;
   }
 
-  private void handleStripeFailure(int chunkSize, int failedStripeDataSize)
+  private StripeWriteStatus rewriteStripeToNewBlockGroup(int chunkSize,
+      int failedStripeDataSize, boolean allocateBlockIfFull)
       throws IOException {
     long[] failedDataStripeChunkLens = new long[numDataBlks];
     long[] failedParityStripeChunkLens = new long[numParityBlks];
@@ -246,9 +246,35 @@ public class ECKeyOutputStream extends KeyOutputStream {
           true, true);
       currentStreamEntry.useNextBlockStream();
     }
+
+    if (hasWriteFailure()) {
+      return StripeWriteStatus.FAILED;
+    }
     currentStreamEntry.executePutBlock();
+
+    if (hasPutBlockFailure()) {
+      return StripeWriteStatus.FAILED;
+    }
+    ECBlockOutputStreamEntry newBlockGroupStreamEntry =
+        blockOutputStreamEntryPool.getCurrentStreamEntry();
+    newBlockGroupStreamEntry
+        .updateBlockGroupToAckedPosition(failedStripeDataSize);
     ecChunkBufferCache.clear(chunkSize);
     ecChunkBufferCache.release();
+
+    if (newBlockGroupStreamEntry.getRemaining() <= 0) {
+      // In most cases this should not happen except in the case stripe size 
and
+      // block size same.
+      newBlockGroupStreamEntry.close();
+      if (allocateBlockIfFull) {
+        blockOutputStreamEntryPool.allocateBlockIfNeeded();
+      }
+      currentBlockGroupLen = 0;
+    } else {
+      newBlockGroupStreamEntry.resetToFirstEntry();
+    }
+
+    return StripeWriteStatus.SUCCESS;
   }
 
   private void checkAndWriteParityCells(int lastDataBuffPos,
@@ -262,12 +288,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
       //Lets encode and write
       if (handleParityWrites(ecChunkSize,
           allocateBlockIfFull) == StripeWriteStatus.FAILED) {
-        // TODO: This should make sure to retry until it's success. (HDDS-6036)
-        handleStripeFailure(ecChunkSize, numDataBlks * ecChunkSize);
+        handleStripeFailure(numDataBlks * ecChunkSize, ecChunkSize,
+            allocateBlockIfFull);
+      } else {
+        // At this stage stripe write is successful.
+        currentStreamEntry.updateBlockGroupToAckedPosition(
+            currentStreamEntry.getCurrentPosition());
       }
-      // At this stage stripe write is successful.
-      currentStreamEntry.updateBlockGroupToAckedPosition(
-          currentStreamEntry.getCurrentPosition());
 
     }
   }
@@ -276,24 +303,18 @@ public class ECKeyOutputStream extends KeyOutputStream {
       boolean allocateBlockIfFull)
       throws IOException {
     writeParityCells(parityCellSize);
+    if (hasWriteFailure()) {
+      return StripeWriteStatus.FAILED;
+    }
+
     // By this time, we should have finished full stripe. So, lets call
     // executePutBlock for all.
     // TODO: we should alter the put block calls to share CRC to each stream.
     ECBlockOutputStreamEntry streamEntry =
         blockOutputStreamEntryPool.getCurrentStreamEntry();
-    List<ECBlockOutputStream> failedStreams =
-        streamEntry.getFailedStreams(false);
-    // Since writes are async, let's check the failures once.
-    if (failedStreams.size() > 0) {
-      addToExcludeNodesList(failedStreams);
-      return StripeWriteStatus.FAILED;
-    }
     streamEntry.executePutBlock();
 
-    failedStreams = streamEntry.getFailedStreams(true);
-    // Since putBlock also async, let's check the failures again.
-    if (failedStreams.size() > 0) {
-      addToExcludeNodesList(failedStreams);
+    if (hasPutBlockFailure()) {
       return StripeWriteStatus.FAILED;
     }
     ecChunkBufferCache.clear(parityCellSize);
@@ -303,13 +324,38 @@ public class ECKeyOutputStream extends KeyOutputStream {
       if (allocateBlockIfFull) {
         blockOutputStreamEntryPool.allocateBlockIfNeeded();
       }
+      currentBlockGroupLen = 0;
     } else {
       streamEntry.resetToFirstEntry();
     }
-    currentBlockGroupLen = 0;
+
     return StripeWriteStatus.SUCCESS;
   }
 
+  private boolean hasWriteFailure() {
+    List<ECBlockOutputStream> failedStreams =
+        blockOutputStreamEntryPool.getCurrentStreamEntry()
+            .streamsWithWriteFailure();
+    // Since writes are async, let's check the failures once.
+    if (failedStreams.size() > 0) {
+      addToExcludeNodesList(failedStreams);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean hasPutBlockFailure() {
+    List<ECBlockOutputStream> failedStreams =
+        blockOutputStreamEntryPool.getCurrentStreamEntry()
+            .streamsWithPutBlockFailure();
+    // Since writes are async, let's check the failures once.
+    if (failedStreams.size() > 0) {
+      addToExcludeNodesList(failedStreams);
+      return true;
+    }
+    return false;
+  }
+
   private void addToExcludeNodesList(List<ECBlockOutputStream> failedStreams) {
     for (ECBlockOutputStream failedStream : failedStreams) {
       blockOutputStreamEntryPool.getExcludeList()
@@ -354,7 +400,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
   }
 
   private void handleOutputStreamWrite(int currIdx, long len,
-      boolean isFullCell, boolean isParity) throws IOException {
+      boolean isFullCell, boolean isParity) {
 
     BlockOutputStreamEntry current =
         blockOutputStreamEntryPool.getCurrentStreamEntry();
@@ -482,13 +528,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
         addPadding(parityCellSize);
         if (handleParityWrites(parityCellSize,
             false) == StripeWriteStatus.FAILED) {
-          // TODO: loop this until we succeed?
-          handleStripeFailure(parityCellSize, lastStripeSize);
+          handleStripeFailure(lastStripeSize, parityCellSize, false);
+        } else {
+          blockOutputStreamEntryPool.getCurrentStreamEntry()
+              .updateBlockGroupToAckedPosition(
+                  blockOutputStreamEntryPool.getCurrentStreamEntry()
+                      .getCurrentPosition());
         }
-        blockOutputStreamEntryPool.getCurrentStreamEntry()
-            .updateBlockGroupToAckedPosition(
-                blockOutputStreamEntryPool.getCurrentStreamEntry()
-                    .getCurrentPosition());
 
       }
 
@@ -502,6 +548,23 @@ public class ECKeyOutputStream extends KeyOutputStream {
     ecChunkBufferCache.release();
   }
 
+  private void handleStripeFailure(int lastStripeSize, int parityCellSize,
+      boolean allocateBlockIfFull)
+      throws IOException {
+    StripeWriteStatus stripeWriteStatus;
+    for (int i = 0; i < this.config.getMaxECStripeWriteRetries(); i++) {
+      stripeWriteStatus =
+          rewriteStripeToNewBlockGroup(parityCellSize, lastStripeSize,
+              allocateBlockIfFull);
+      if (stripeWriteStatus == StripeWriteStatus.SUCCESS) {
+        return;
+      }
+    }
+    throw new IOException("Completed max allowed retries " + this.config
+        .getMaxECStripeWriteRetries() + " on stripe failures.");
+
+  }
+
   private void addPadding(int parityCellSize) {
     ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
 
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
index 747b39b..0d5e1a2 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
 
@@ -27,7 +28,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLoca
  */
 public interface MockBlockAllocator {
 
-  Iterable<? extends KeyLocation> allocateBlock(
-      KeyArgs createKeyRequest);
+  Iterable<? extends KeyLocation> allocateBlock(KeyArgs createKeyRequest,
+      ExcludeList excludeList);
 
 }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index be16e1f..9bc11e2 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -57,6 +58,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeI
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.function.Function;
 
@@ -132,10 +134,18 @@ public class MockOmTransport implements OmTransport {
 
   private OzoneManagerProtocolProtos.AllocateBlockResponse allocateBlock(
       OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest) {
-    return OzoneManagerProtocolProtos.AllocateBlockResponse.newBuilder()
-        .setKeyLocation(
-            blockAllocator.allocateBlock(allocateBlockRequest.getKeyArgs())
-                .iterator().next()).build();
+    Iterator<? extends OzoneManagerProtocolProtos.KeyLocation> iterator =
+        blockAllocator.allocateBlock(allocateBlockRequest.getKeyArgs(),
+            ExcludeList.getFromProtoBuf(allocateBlockRequest.getExcludeList()))
+            .iterator();
+    OzoneManagerProtocolProtos.AllocateBlockResponse.Builder builder =
+        OzoneManagerProtocolProtos.AllocateBlockResponse.newBuilder()
+            .setKeyLocation(iterator.next());
+    while (iterator.hasNext()) {
+      builder.mergeKeyLocation(iterator.next());
+    }
+    return builder.build();
+
   }
 
   private DeleteVolumeResponse deleteVolume(
@@ -195,7 +205,8 @@ public class MockOmTransport implements OmTransport {
             .setModificationTime(now).setDataSize(keyArgs.getDataSize())
             .setLatestVersion(0L).addKeyLocationList(
             KeyLocationList.newBuilder().addAllKeyLocations(
-                blockAllocator.allocateBlock(createKeyRequest.getKeyArgs()))
+                blockAllocator.allocateBlock(createKeyRequest.getKeyArgs(),
+                    new ExcludeList()))
                 .build());
 
     if (keyArgs.getType() == HddsProtos.ReplicationType.NONE) {
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index d4f1d23..ef2fd43 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -36,18 +37,30 @@ public class MockXceiverClientFactory
 
   private final Map<DatanodeDetails, MockDatanodeStorage> storage =
       new HashMap<>();
+  private List<DatanodeDetails> pendingToFailNodes = new ArrayList<>();
 
   public void setFailedStorages(List<DatanodeDetails> failedStorages) {
-    final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator =
-        storage.entrySet().iterator();
-    while (iterator.hasNext()) {
-      final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
-          iterator.next();
-      if (failedStorages.contains(next.getKey())) {
-        final MockDatanodeStorage value = next.getValue();
-        value.setStorageFailed();
+    List<DatanodeDetails> remainingFailNodes = new ArrayList<>();
+    for(int i=0; i< failedStorages.size(); i++){
+      DatanodeDetails failedDN = failedStorages.get(i);
+      boolean isCurrentNodeMarked = false;
+      final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator 
=
+          storage.entrySet().iterator();
+      while (iterator.hasNext()) {
+        final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
+            iterator.next();
+        if (next.getKey().equals(failedDN)) {
+          final MockDatanodeStorage value = next.getValue();
+          value.setStorageFailed();
+          isCurrentNodeMarked = true;
+        }
+      }
+      if(!isCurrentNodeMarked){
+        //This node does not initialized by client yet.
+        remainingFailNodes.add(failedDN);
       }
     }
+    this.pendingToFailNodes = remainingFailNodes;
   }
 
   @Override
@@ -58,9 +71,13 @@ public class MockXceiverClientFactory
   @Override
   public XceiverClientSpi acquireClient(Pipeline pipeline)
       throws IOException {
-    return new MockXceiverClientSpi(pipeline, storage
-        .computeIfAbsent(pipeline.getFirstNode(),
-            r -> new MockDatanodeStorage()));
+    MockXceiverClientSpi mockXceiverClientSpi =
+        new MockXceiverClientSpi(pipeline, storage
+            .computeIfAbsent(pipeline.getFirstNode(),
+                r -> new MockDatanodeStorage()));
+    // Incase if this node already set to mark as failed.
+    setFailedStorages(this.pendingToFailNodes);
+    return mockXceiverClientSpi;
   }
 
   @Override
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
index be2f7fd..ee16d6d 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -21,61 +21,86 @@ package org.apache.hadoop.ozone.client;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
+import java.util.Set;
 
 /**
  * Allocates the block with required number of nodes in the pipeline.
+ * The nodes are pre-created with port numbers starting from 0 to
+ * ( given cluster size -1).
  */
 public class MultiNodePipelineBlockAllocator implements MockBlockAllocator {
-  public static final Random RANDOM = new Random();
   private long blockId;
   private int requiredNodes;
   private final ConfigurationSource conf;
+  private List<HddsProtos.DatanodeDetailsProto> clusterDns = new ArrayList<>();
+  private int start = 0;
 
   public MultiNodePipelineBlockAllocator(OzoneConfiguration conf,
-      int requiredNodes) {
+      int requiredNodes, int clusterSize) {
     this.requiredNodes = requiredNodes;
     this.conf = conf;
+    // Pre-initializing the datanodes. Later allocateBlock API will use this
+    // nodes to add the required number of nodes in the block pipelines.
+    for (int i = 0; i < clusterSize; i++) {
+      clusterDns.add(HddsProtos.DatanodeDetailsProto.newBuilder().setUuid128(
+          HddsProtos.UUID.newBuilder().setLeastSigBits(i).setMostSigBits(i)
+              .build()).setHostName("localhost").setIpAddress("1.2.3.4")
+          .addPorts(HddsProtos.Port.newBuilder().setName("RATIS").setValue(i)
+              .build()).build());
+    }
+  }
+
+  public List<HddsProtos.DatanodeDetailsProto> getClusterDns(){
+    return this.clusterDns;
   }
 
+  /**
+   * This method selects the block pipeline nodes from the pre-created cluster
+   * nodes(clusterDns). It will use requiredNodes field to decide how many 
nodes
+   * to be chosen for the pipeline. To make the tests easy prediction of the
+   * node allocations, it will choose block pipeline nodes in a sliding window
+   * fashion starting from 0th index in clusterDns in incrementing order until
+   * given requireNodes number. Similarly for the next block pipeline, it will
+   * start from the index location of previous chosen pipeline's last node 
index
+   * + 1. Let's say cluster size was initialized with 10 and required nodes are
+   * 5, the first block pipeline will have nodes from 0 to 4 and the second
+   * block will be assigned with the index locations of 5th to 9th nodes. Once
+   * we finish round of allocations, then it will start from 0 again for next
+   * block. It will also support exclude list. If client passes exclude list, 
it
+   * will simply skip the node if it presents in exclude list, instead it will
+   * simply take the next node. If not enough nodes left due to the grown
+   * exclude list, it will throw IllegalStateException.
+   *
+   * @param keyArgs
+   * @param excludeList
+   * @return KeyLocation
+   */
   @Override
   public Iterable<? extends OzoneManagerProtocolProtos.KeyLocation>
-      allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs) {
+      allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs,
+      ExcludeList excludeList) {
     HddsProtos.Pipeline.Builder builder =
         HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
             
.setType(keyArgs.getType()).setId(HddsProtos.PipelineID.newBuilder()
             .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
                 .setMostSigBits(1L).build()).build());
-    final int rand = RANDOM.nextInt(); // used for port and UUID combination.
-    // It's ok here for port number limit as don't really create any socket
-    // connection.
-    for (int i = 1; i <= requiredNodes; i++) {
-      builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
-          .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(rand)
-              .setMostSigBits(i).build()).setHostName("localhost")
-          .setIpAddress("1.2.3.4").addPorts(
-              HddsProtos.Port.newBuilder().setName("RATIS").setValue(rand)
-                  .build()).build());
-      if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
-        builder.addMemberReplicaIndexes(i);
-      }
-    }
+    addMembers(builder, requiredNodes, excludeList.getDatanodes(), keyArgs);
     if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
       builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
     }
     final HddsProtos.Pipeline pipeline = builder.build();
-
+    List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
     long blockSize = (long) conf
         .getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
             OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
-
-    List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
     results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
         .setPipeline(pipeline).setBlockID(
             HddsProtos.BlockID.newBuilder().setBlockCommitSequenceId(1L)
@@ -85,4 +110,34 @@ public class MultiNodePipelineBlockAllocator implements 
MockBlockAllocator {
         .setLength(blockSize).build());
     return results;
   }
+
+  private void addMembers(HddsProtos.Pipeline.Builder builder, int nodesNeeded,
+      Set<DatanodeDetails> excludedDataNodes,
+      OzoneManagerProtocolProtos.KeyArgs keyArgs) {
+    int clusterSize = clusterDns.size();
+    int counter = nodesNeeded;
+    int j = 0;
+    for (int i = 0; i < clusterDns.size(); i++) {
+      HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
+          clusterDns.get(start % clusterSize);
+      start++;
+      if (excludedDataNodes
+          .contains(DatanodeDetails.getFromProtoBuf(datanodeDetailsProto))) {
+        continue;
+      } else {
+        builder.addMembers(datanodeDetailsProto);
+        if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
+          builder.addMemberReplicaIndexes(++j);
+        }
+        if (--counter == 0) {
+          break;
+        }
+      }
+    }
+    if (counter > 0) {
+      throw new IllegalStateException(
+          "MockedImpl: Could not find enough nodes.");
+    }
+  }
+
 }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
index 9d88e8c..20fbc03 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.Port;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.UUID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
@@ -49,8 +50,8 @@ public class SinglePipelineBlockAllocator
   }
 
   @Override
-  public Iterable<? extends KeyLocation> allocateBlock(
-      KeyArgs keyArgs) {
+  public Iterable<? extends KeyLocation> allocateBlock(KeyArgs keyArgs,
+      ExcludeList excludeList) {
 
     if (pipeline == null) {
       Pipeline.Builder bldr = Pipeline.newBuilder()
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index e7bc9d7..23b181f 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -198,8 +198,8 @@ public class TestOzoneClient {
     int data = 3;
     int parity = 2;
     int chunkSize = 1024;
-    createNewClient(config, new MultiNodePipelineBlockAllocator(
-        config, data + parity));
+    createNewClient(config,
+        new MultiNodePipelineBlockAllocator(config, data + parity, 15));
     String value = new String(new byte[chunkSize], UTF_8);
     OzoneBucket bucket = getOzoneBucket();
 
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 8cd4365..c3c8c6d 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
@@ -72,8 +74,8 @@ public class TestOzoneECClient {
   private final XceiverClientFactory factoryStub =
       new MockXceiverClientFactory();
   private OzoneConfiguration conf = new OzoneConfiguration();
-  private MockOmTransport transportStub = new MockOmTransport(
-      new MultiNodePipelineBlockAllocator(conf, dataBlocks + parityBlocks));
+  private final MockOmTransport transportStub = new MockOmTransport(
+      new MultiNodePipelineBlockAllocator(conf, dataBlocks + parityBlocks, 
15));
   private final RawErasureEncoder encoder =
       new RSRawErasureCoderFactory().createEncoder(
           new ECReplicationConfig(dataBlocks, parityBlocks));
@@ -82,13 +84,22 @@ public class TestOzoneECClient {
   public void init() throws IOException {
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2,
         StorageUnit.KB);
+    createNewClient(conf, transportStub);
+  }
+
+  private void createNewClient(ConfigurationSource config,
+      MockBlockAllocator blkAllocator) throws IOException {
+    createNewClient(config, new MockOmTransport(blkAllocator));
+  }
 
-    client = new OzoneClient(conf, new RpcClient(conf, null) {
+  private void createNewClient(ConfigurationSource config,
+      final MockOmTransport transport) throws IOException {
+    client = new OzoneClient(config, new RpcClient(config, null) {
 
       @Override
       protected OmTransport createOmTransport(String omServiceId)
           throws IOException {
-        return transportStub;
+        return transport;
       }
 
       @Override
@@ -436,6 +447,160 @@ public class TestOzoneECClient {
     testNodeFailuresWhileWriting(4, 1);
   }
 
+  @Test
+  public void testStripeWriteRetriesOn2Failures() throws IOException {
+    OzoneConfiguration con = new OzoneConfiguration();
+    con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, 
StorageUnit.KB);
+    // Cluster has 15 nodes. So, first we will create 3 block groups with
+    // distinct nodes in each. Block Group 1:  0-4, Block Group 2: 5-9, Block
+    // Group 3: 10-14
+    // To mark the node failed in the second block group.
+    int[] nodesIndexesToMarkFailure = new int[2];
+    nodesIndexesToMarkFailure[0] = 0;
+    // To mark the node failed in the second block group also.
+    nodesIndexesToMarkFailure[1] = 5;
+    // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation
+    // should pick next good block group as we have 15 nodes.
+    int clusterSize = 15;
+    testStripeWriteRetriesOnFailures(con, clusterSize,
+        nodesIndexesToMarkFailure);
+    // It should have used 3rd block group also. So, total initialized nodes
+    // count should be clusterSize.
+    Assert.assertTrue(((MockXceiverClientFactory) factoryStub).getStorages()
+        .size() == clusterSize);
+  }
+
+  @Test
+  public void testStripeWriteRetriesOn3Failures() throws IOException {
+    OzoneConfiguration con = new OzoneConfiguration();
+    con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, 
StorageUnit.KB);
+
+    int[] nodesIndexesToMarkFailure = new int[3];
+    nodesIndexesToMarkFailure[0] = 0;
+    // To mark the node failed in the second block group.
+    nodesIndexesToMarkFailure[1] = 5;
+    // To mark the node failed in the third block group.
+    nodesIndexesToMarkFailure[2] = 10;
+    // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation will
+    // pick the remaining goods for the next block group.
+    int clusterSize = 15;
+    testStripeWriteRetriesOnFailures(con, clusterSize,
+        nodesIndexesToMarkFailure);
+    // It should have used 3rd block group also. So, total initialized nodes
+    // count should be clusterSize.
+    Assert.assertTrue(((MockXceiverClientFactory) factoryStub).getStorages()
+        .size() == clusterSize);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  // The mocked impl throws IllegalStateException when there are not enough
+  // nodes in allocateBlock request.
+  public void testStripeWriteRetriesOnAllNodeFailures() throws IOException {
+    OzoneConfiguration con = new OzoneConfiguration();
+    con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, 
StorageUnit.KB);
+
+    // After writing first stripe, we will mark all nodes as bad in the 
cluster.
+    int clusterSize = 5;
+    int[] nodesIndexesToMarkFailure = new int[clusterSize];
+    for (int i = 0; i < nodesIndexesToMarkFailure.length; i++) {
+      nodesIndexesToMarkFailure[i] = i;
+    }
+    // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation can
+    // not pick new block group as all nodes in cluster marked as bad.
+    testStripeWriteRetriesOnFailures(con, clusterSize,
+        nodesIndexesToMarkFailure);
+  }
+
+  @Test
+  public void testStripeWriteRetriesOn4FailuresWith3RetriesAllowed()
+      throws IOException {
+    OzoneConfiguration con = new OzoneConfiguration();
+    con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, 
StorageUnit.KB);
+    con.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES, 3);
+
+    int[] nodesIndexesToMarkFailure = new int[4];
+    nodesIndexesToMarkFailure[0] = 0;
+    //To mark node failed in second block group.
+    nodesIndexesToMarkFailure[1] = 5;
+    //To mark node failed in third block group.
+    nodesIndexesToMarkFailure[2] = 10;
+    //To mark node failed in fourth block group.
+    nodesIndexesToMarkFailure[3] = 15;
+    try {
+      // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation 
can
+      // pick good block group, but client retries should be limited
+      // OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES_ON_FAILURE(here it was
+      // configured as 3). So, it should fail as we have marked 3 nodes as bad.
+      testStripeWriteRetriesOnFailures(con, 20, nodesIndexesToMarkFailure);
+      Assert.fail(
+          "Expecting it to fail as retries should exceed the max allowed 
times:"
+              + " " + 3);
+    } catch (IOException e) {
+      Assert.assertEquals("Completed max allowed retries 3 on stripe 
failures.",
+          e.getMessage());
+    }
+  }
+
+  public void testStripeWriteRetriesOnFailures(OzoneConfiguration con,
+      int clusterSize, int[] nodesIndexesToMarkFailure) throws IOException {
+    close();
+    MultiNodePipelineBlockAllocator blkAllocator =
+        new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks,
+            clusterSize);
+    createNewClient(con, blkAllocator);
+    int numChunksToWriteAfterFailure = 3;
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, 1024 * 3,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+            chunkSize), new HashMap<>())) {
+      for (int i = 0; i < dataBlocks; i++) {
+        out.write(inputChunks[i]);
+      }
+      Assert.assertTrue(
+          ((MockXceiverClientFactory) factoryStub).getStorages().size() == 5);
+      List<DatanodeDetails> failedDNs = new ArrayList<>();
+      List<HddsProtos.DatanodeDetailsProto> dns = blkAllocator.getClusterDns();
+
+      for (int j = 0; j < nodesIndexesToMarkFailure.length; j++) {
+        failedDNs.add(DatanodeDetails
+            .getFromProtoBuf(dns.get(nodesIndexesToMarkFailure[j])));
+      }
+
+      // First let's set storage as bad
+      ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+      // Writer should be able to write by using 3rd block group.
+      for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+    final OzoneKeyDetails key = bucket.getKey(keyName);
+    // Data supposed to store in single block group. Since we introduced the
+    // failures after first stripe, the second stripe data should have been
+    // written into new blockgroup. So, we should have 2 block groups. That
+    // means two keyLocations.
+    Assert.assertEquals(2, key.getOzoneKeyLocations().size());
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[chunkSize];
+      for (int i = 0; i < dataBlocks; i++) {
+        Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+        Assert.assertTrue("Expected: " + new String(inputChunks[i],
+                UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+            Arrays.equals(inputChunks[i], fileContent));
+      }
+      for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+        Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+        Assert.assertTrue("Expected: " + new String(inputChunks[i],
+                UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+            Arrays.equals(inputChunks[i], fileContent));
+      }
+    }
+  }
+
   public void testNodeFailuresWhileWriting(int numFailureToInject,
       int numChunksToWriteAfterFailure) throws IOException {
     store.createVolume(volumeName);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to