This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new f5dbe00  Hdds 5491: EC: Write should handle node failures. (#2767)
f5dbe00 is described below

commit f5dbe00309aeac90417eb87a51fa64b95c71c509
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Sat Nov 6 14:59:47 2021 -0700

    Hdds 5491: EC: Write should handle node failures. (#2767)
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  10 +-
 .../hdds/scm/storage/ECBlockOutputStream.java      |  25 ++++-
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |  91 +++++++++++++++--
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 111 +++++++++++++++++----
 .../hadoop/ozone/client/MockDatanodeStorage.java   |  12 ++-
 .../ozone/client/MockXceiverClientFactory.java     |  15 +++
 .../hadoop/ozone/client/MockXceiverClientSpi.java  |  11 +-
 .../client/MultiNodePipelineBlockAllocator.java    |  51 +++++-----
 .../hadoop/ozone/client/TestOzoneECClient.java     |  78 +++++++++++++++
 9 files changed, 339 insertions(+), 65 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index ebcd962..5ce57e1 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -598,7 +598,7 @@ public class BlockOutputStream extends OutputStream {
   }
 
 
-  void setIoException(Exception e) {
+  public void setIoException(Exception e) {
     IOException ioe = getIoException();
     if (ioe == null) {
       IOException exception =  new IOException(EXCEPTION_MSG + e.toString(), 
e);
@@ -654,8 +654,10 @@ public class BlockOutputStream extends OutputStream {
    * @throws IOException if there is an I/O error while performing the call
    * @throws OzoneChecksumException if there is an error while computing
    * checksum
+   * @return
    */
-  void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+  CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
+      ChunkBuffer chunk) throws IOException {
     int effectiveChunkSize = chunk.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
     final ByteString data = chunk.toByteString(
@@ -693,13 +695,15 @@ public class BlockOutputStream extends OutputStream {
         setIoException(ce);
         throw ce;
       });
+      containerBlockData.addChunks(chunkInfo);
+      return future;
     } catch (IOException | ExecutionException e) {
       throw new IOException(EXCEPTION_MSG + e.toString(), e);
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, false);
     }
-    containerBlockData.addChunks(chunkInfo);
+    return null;
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 86d7058..5ee34d3 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -41,6 +41,11 @@ import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
  */
 public class ECBlockOutputStream extends BlockOutputStream{
 
+  private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+      currentChunkRspFuture = null;
+
+  private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+      putBlkRspFuture = null;
   /**
    * Creates a new ECBlockOutputStream.
    *
@@ -63,7 +68,8 @@ public class ECBlockOutputStream extends BlockOutputStream{
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
-    writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
+    this.currentChunkRspFuture =
+        writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
   }
 
   /**
@@ -113,6 +119,7 @@ public class ECBlockOutputStream extends BlockOutputStream{
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, false);
     }
+    this.putBlkRspFuture = flushFuture;
     return flushFuture;
   }
 
@@ -121,4 +128,20 @@ public class ECBlockOutputStream extends BlockOutputStream{
     super.close();
     cleanup(false);
   }
+
+  /**
+   * @return The current chunk writer response future.
+   */
+  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+      getCurrentChunkResponseFuture() {
+    return this.currentChunkRspFuture;
+  }
+
+  /**
+   * @return The current chunk putBlock response future.
+   */
+  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+      getCurrentPutBlkResponseFuture() {
+    return this.putBlkRspFuture;
+  }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 22a1906..95609b6 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -22,11 +22,14 @@ import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -37,9 +40,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -125,6 +131,12 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
     currentStreamIdx++;
   }
 
+  public void markFailed(Exception e) {
+    if (blockOutputStreams[currentStreamIdx] != null) {
+      blockOutputStreams[currentStreamIdx].setIoException(e);
+    }
+  }
+
   public void forceToFirstParityBlock(){
     currentStreamIdx = replicationConfig.getData();
   }
@@ -251,21 +263,11 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
     if (!isInitialized()) {
       return;
     }
-    int failedStreams = 0;
     for (ECBlockOutputStream stream : blockOutputStreams) {
       if (stream == null) {
         continue;
       }
-      if (!stream.isClosed()) {
-        stream.executePutBlock(false, true);
-      } else {
-        failedStreams++;
-      }
-      if(failedStreams > replicationConfig.getParity()) {
-        throw new IOException(
-            "There are " + failedStreams + " block write failures,"
-                + " supported tolerance: " + replicationConfig.getParity());
-      }
+      stream.executePutBlock(false, true);
     }
   }
 
@@ -279,6 +281,73 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
     return blockOutputStreams[0].getBlockID();
   }
 
+  /**
+   * In EC, we will do async write calls for writing data in the scope of a
+   * stripe. After every stripe write finishes, use this method to validate the
+   * responses of current stripe data writes. This method can also be used to
+   * validate the stripe put block responses.
+   * @param forPutBlock : If true, it will validate the put block response
+   *                   futures. It will validates stripe data write response
+   *                   futures if false.
+   * @return
+   */
+  public boolean checkStreamFailures(boolean forPutBlock) {
+    final Iterator<ECBlockOutputStream> iter = blockStreams().iterator();
+    while (iter.hasNext()) {
+      final ECBlockOutputStream stream = iter.next();
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+          responseFuture = null;
+      if (forPutBlock) {
+        responseFuture =
+            stream != null ? stream.getCurrentPutBlkResponseFuture() : null;
+      } else {
+        responseFuture =
+            stream != null ? stream.getCurrentChunkResponseFuture() : null;
+      }
+      if (isFailed(stream, responseFuture)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isFailed(
+      ECBlockOutputStream outputStream,
+      CompletableFuture<ContainerProtos.
+          ContainerCommandResponseProto> chunkWriteResponseFuture) {
+    ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
+        = null;
+    try {
+      containerCommandResponseProto = chunkWriteResponseFuture != null ?
+          chunkWriteResponseFuture.get() :
+          null;
+    } catch (InterruptedException e) {
+      outputStream.setIoException(e);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      outputStream.setIoException(e);
+    }
+
+    if ((outputStream != null && containerCommandResponseProto != null)
+        && (outputStream.getIoException() != null || isStreamFailed(
+        containerCommandResponseProto, outputStream))) {
+      return true;
+    }
+    return false;
+  }
+
+  boolean isStreamFailed(
+      ContainerProtos.ContainerCommandResponseProto responseProto,
+      ECBlockOutputStream stream) {
+    try {
+      ContainerProtocolCalls.validateContainerResponse(responseProto);
+    } catch (StorageContainerException sce) {
+      stream.setIoException(sce);
+      return true;
+    }
+    return false;
+  }
+
   private boolean isWritingParity() {
     return currentStreamIdx >= replicationConfig.getData();
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 799e15d..8fedde1 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -65,6 +65,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
   private final RawErasureEncoder encoder;
   private final ECReplicationConfig.EcCodec ecCodec;
 
+  private enum StripeWriteStatus {
+    SUCCESS,
+    FAILED
+  }
+
   private long currentBlockGroupLen = 0;
   /**
    * Defines stream action while calling handleFlushOrClose.
@@ -81,9 +86,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
   private long offset;
   // how much data has been ingested into the stream
   private long writeOffset;
-  // whether an exception is encountered while write and whole write could
-  // not succeed
-  private boolean isException;
   private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;
 
   @VisibleForTesting
@@ -124,7 +126,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
             replicationConfig, uploadID, partNumber, isMultipart, info,
             unsafeByteBufferConversion, xceiverClientManager, handler.getId());
 
-    this.isException = false;
     this.writeOffset = 0;
     OzoneConfiguration conf = new OzoneConfiguration();
     ECSchema schema =
@@ -217,20 +218,73 @@ public class ECKeyOutputStream extends KeyOutputStream {
     writeOffset += len;
   }
 
-  private void checkAndWriteParityCells(int lastDataBuffPos,
-      boolean allocateBlockIfFull)
+  private void handleStripeFailure(int chunkSize, int failedStripeDataSize)
       throws IOException {
+    long[] failedDataStripeChunkLens = new long[numDataBlks];
+    long[] failedParityStripeChunkLens = new long[numParityBlks];
+    final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
+    for (int i = 0; i < numDataBlks; i++) {
+      failedDataStripeChunkLens[i] = dataBuffers[i].limit();
+    }
+    final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
+    for (int i = 0; i <  numParityBlks; i++) {
+      failedParityStripeChunkLens[i] = parityBuffers[i].limit();
+    }
+
+    blockOutputStreamEntryPool.getCurrentStreamEntry().resetToFirstEntry();
+    // Rollback the length/offset updated as part of this failed stripe write.
+    offset -= failedStripeDataSize;
+    blockOutputStreamEntryPool.getCurrentStreamEntry()
+        .incCurrentPosition(-failedStripeDataSize);
+
+    // Let's close the current entry.
+    blockOutputStreamEntryPool.getCurrentStreamEntry().close();
+    currentBlockGroupLen = 0;
+
+    // Let's rewrite the last stripe, so that it will be written to new block
+    // group.
+    // TODO: we can improve to write partial stripe failures. In that case,
+    //  we just need to write only available buffers.
+    blockOutputStreamEntryPool.allocateBlockIfNeeded();
+    final ECBlockOutputStreamEntry currentStreamEntry =
+        blockOutputStreamEntryPool.getCurrentStreamEntry();
+    long totalLenToWrite = failedStripeDataSize;
+    for (int i = 0; i < numDataBlks; i++) {
+      long currentLen = totalLenToWrite < failedDataStripeChunkLens[i] ?
+          totalLenToWrite :
+          failedDataStripeChunkLens[i];
+      if (currentLen > 0) {
+        handleOutputStreamWrite(i, currentLen, true, false);
+      }
+      currentStreamEntry.useNextBlockStream();
+      totalLenToWrite -= currentLen;
+    }
+    for (int i = 0; i < (numParityBlks); i++) {
+      handleOutputStreamWrite(i + numDataBlks, failedParityStripeChunkLens[i],
+          true, true);
+      currentStreamEntry.useNextBlockStream();
+    }
+    currentStreamEntry.executePutBlock();
+    ecChunkBufferCache.clear(chunkSize);
+    ecChunkBufferCache.release();
+  }
+
+  private void checkAndWriteParityCells(int lastDataBuffPos,
+      boolean allocateBlockIfFull) throws IOException {
     //check data blocks finished
     //If index is > datanum blks
     int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
         .getCurrentStreamIdx();
     if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
       //Lets encode and write
-      handleParityWrites(ecChunkSize, allocateBlockIfFull);
+      if (handleParityWrites(ecChunkSize,
+          allocateBlockIfFull) == StripeWriteStatus.FAILED) {
+        handleStripeFailure(ecChunkSize, numDataBlks * ecChunkSize);
+      }
     }
   }
 
-  private void handleParityWrites(int parityCellSize,
+  private StripeWriteStatus handleParityWrites(int parityCellSize,
       boolean allocateBlockIfFull)
       throws IOException {
     writeParityCells(parityCellSize);
@@ -239,7 +293,15 @@ public class ECKeyOutputStream extends KeyOutputStream {
     // TODO: we should alter the put block calls to share CRC to each stream.
     ECBlockOutputStreamEntry streamEntry =
         blockOutputStreamEntryPool.getCurrentStreamEntry();
+    // Since writes are async, let's check the failures once.
+    if(streamEntry.checkStreamFailures(false)){
+      return StripeWriteStatus.FAILED;
+    }
     streamEntry.executePutBlock();
+    // Since putBlock also async, let's check the failures again.
+    if(streamEntry.checkStreamFailures(true)){
+      return StripeWriteStatus.FAILED;
+    }
     ecChunkBufferCache.clear(parityCellSize);
 
     if (streamEntry.getRemaining() <= 0) {
@@ -251,6 +313,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
       streamEntry.resetToFirstEntry();
     }
     currentBlockGroupLen = 0;
+    return StripeWriteStatus.SUCCESS;
   }
 
   void writeParityCells(int parityCellSize) throws IOException {
@@ -268,14 +331,14 @@ public class ECKeyOutputStream extends KeyOutputStream {
          numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
       // Move the stream entry cursor to parity block index
       handleParityWrite(i, parityBuffers[i - numDataBlks].array(), 0,
-          ecChunkSize, true);
+          parityCellSize, true);
     }
   }
 
   private int handleDataWrite(int currIdx, byte[] b, int off, long len,
       boolean isFullCell) throws IOException {
     int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, (int) len);
-    handleOutputStreamWrite(currIdx, b, off, len, isFullCell, false);
+    handleOutputStreamWrite(currIdx, len, isFullCell, false);
 
     if(pos == ecChunkSize){
       blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
@@ -285,11 +348,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
 
   private void handleParityWrite(int currIdx, byte[] b, int off, long len,
       boolean isFullCell) throws IOException {
-    handleOutputStreamWrite(currIdx, b, off, len, isFullCell, true);
+    handleOutputStreamWrite(currIdx, len, isFullCell, true);
     blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
   }
 
-  private void handleOutputStreamWrite(int currIdx, byte[] b, int off, long 
len,
+  private void handleOutputStreamWrite(int currIdx, long len,
       boolean isFullCell, boolean isParity) throws IOException {
 
     BlockOutputStreamEntry current =
@@ -298,17 +361,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
         Math.min((int) len, (int) current.getRemaining());
     currentBlockGroupLen += isParity ? 0 : writeLengthToCurrStream;
 
-    len -= writeLengthToCurrStream;
     if (isFullCell) {
       ByteBuffer bytesToWrite = isParity ?
           ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
           ecChunkBufferCache.getDataBuffers()[currIdx];
       try {
+        // Since it's a fullcell, let's write all content from buffer.
         writeToOutputStream(current, len, bytesToWrite.array(),
             bytesToWrite.array().length, 0, current.getWrittenDataLength(),
             isParity);
       } catch (Exception e) {
-        markStreamClosed();
+        markStreamAsFailed(e);
       }
     }
   }
@@ -347,7 +410,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     Throwable t = HddsClientUtils.checkForException(exception);
     Preconditions.checkNotNull(t);
     // In EC, we will just close the current stream.
-    streamEntry.close();
+    markStreamAsFailed(exception);
   }
 
   private void markStreamClosed() {
@@ -355,6 +418,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
     closed = true;
   }
 
+  private void markStreamAsFailed(Exception e) {
+    blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e);
+  }
+
   @Override
   public void flush() throws IOException {
     checkNotClosed();
@@ -473,7 +540,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
                 bytesToWrite.position(), 0, current.getWrittenDataLength(),
                 false);
           } catch (Exception e) {
-            markStreamClosed();
+            markStreamAsFailed(e);
           }
         }
         final int lastStripeSize =
@@ -482,13 +549,15 @@ public class ECKeyOutputStream extends KeyOutputStream {
         final int parityCellSize =
             lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
         addPadding(parityCellSize);
-        handleParityWrites(parityCellSize, false);
+        if (handleParityWrites(parityCellSize,
+            false) == StripeWriteStatus.FAILED) {
+          // TODO: loop this until we succeed?
+          handleStripeFailure(parityCellSize, lastStripeSize);
+        }
       }
-      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
 
-      if (!isException) {
-        Preconditions.checkArgument(writeOffset == offset);
-      }
+      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+      Preconditions.checkArgument(writeOffset == offset);
       blockOutputStreamEntryPool.getCurrentStreamEntry().close();
       blockOutputStreamEntryPool.commitKey(offset);
     } finally {
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index 73ab96b..ce70c15 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -37,6 +38,12 @@ public class MockDatanodeStorage {
 
   private final Map<String, ByteString> data = new HashMap<>();
 
+  private boolean failed = false;
+
+  public void setStorageFailed(){
+    this.failed = true;
+  }
+
   public void putBlock(DatanodeBlockID blockID, BlockData blockData) {
     blocks.put(blockID, blockData);
   }
@@ -47,7 +54,10 @@ public class MockDatanodeStorage {
 
   public void writeChunk(
       DatanodeBlockID blockID,
-      ChunkInfo chunkInfo, ByteString bytes) {
+      ChunkInfo chunkInfo, ByteString bytes) throws IOException {
+    if (failed) {
+      throw new IOException("This storage was marked as failed.");
+    }
     data.put(createKey(blockID, chunkInfo),
         ByteString.copyFrom(bytes.toByteArray()));
     chunks.put(createKey(blockID, chunkInfo), chunkInfo);
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index da72651..d4f1d23 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,6 +37,19 @@ public class MockXceiverClientFactory
   private final Map<DatanodeDetails, MockDatanodeStorage> storage =
       new HashMap<>();
 
+  public void setFailedStorages(List<DatanodeDetails> failedStorages) {
+    final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator =
+        storage.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
+          iterator.next();
+      if (failedStorages.contains(next.getKey())) {
+        final MockDatanodeStorage value = next.getValue();
+        value.setStorageFailed();
+      }
+    }
+  }
+
   @Override
   public void close() throws IOException {
 
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 4d3db44..4dfe966 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
@@ -82,7 +83,13 @@ public class MockXceiverClientSpi extends XceiverClientSpi {
     switch (request.getCmdType()) {
     case WriteChunk:
       return result(request,
-          r -> r.setWriteChunk(writeChunk(request.getWriteChunk())));
+          r -> {
+            try {
+              return r.setWriteChunk(writeChunk(request.getWriteChunk()));
+            } catch (IOException e) {
+              return r.setResult(Result.IO_EXCEPTION);
+            }
+          });
     case ReadChunk:
       return result(request,
           r -> r.setReadChunk(readChunk(request.getReadChunk())));
@@ -149,7 +156,7 @@ public class MockXceiverClientSpi extends XceiverClientSpi {
   }
 
   private WriteChunkResponseProto writeChunk(
-      WriteChunkRequestProto writeChunk) {
+      WriteChunkRequestProto writeChunk) throws IOException {
     datanodeStorage
         .writeChunk(writeChunk.getBlockID(), writeChunk.getChunkData(),
             writeChunk.getData());
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
index 2c315b7..be2f7fd 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -27,13 +27,14 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 /**
  * Allocates the block with required number of nodes in the pipeline.
  */
 public class MultiNodePipelineBlockAllocator implements MockBlockAllocator {
+  public static final Random RANDOM = new Random();
   private long blockId;
-  private HddsProtos.Pipeline pipeline;
   private int requiredNodes;
   private final ConfigurationSource conf;
 
@@ -46,35 +47,33 @@ public class MultiNodePipelineBlockAllocator implements 
MockBlockAllocator {
   @Override
   public Iterable<? extends OzoneManagerProtocolProtos.KeyLocation>
       allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs) {
-    if (pipeline == null) {
-      HddsProtos.Pipeline.Builder builder =
-          HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
-              .setType(keyArgs.getType()).setId(
-              HddsProtos.PipelineID.newBuilder().setUuid128(
-                  HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
-                      .setMostSigBits(1L).build()).build());
-
-      for (int i = 1; i <= requiredNodes; i++) {
-        builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
-            .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(i)
-                .setMostSigBits(i).build()).setHostName("localhost")
-            .setIpAddress("1.2.3.4").addPorts(
-                HddsProtos.Port.newBuilder().setName("RATIS").setValue(1234 + 
i)
-                    .build()).build());
-        if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
-          builder.addMemberReplicaIndexes(i);
-        }
-      }
+    HddsProtos.Pipeline.Builder builder =
+        HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
+            
.setType(keyArgs.getType()).setId(HddsProtos.PipelineID.newBuilder()
+            .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
+                .setMostSigBits(1L).build()).build());
+    final int rand = RANDOM.nextInt(); // used for port and UUID combination.
+    // It's ok here for port number limit as don't really create any socket
+    // connection.
+    for (int i = 1; i <= requiredNodes; i++) {
+      builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
+          .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(rand)
+              .setMostSigBits(i).build()).setHostName("localhost")
+          .setIpAddress("1.2.3.4").addPorts(
+              HddsProtos.Port.newBuilder().setName("RATIS").setValue(rand)
+                  .build()).build());
       if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
-        builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
+        builder.addMemberReplicaIndexes(i);
       }
-      pipeline = builder.build();
     }
+    if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
+      builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
+    }
+    final HddsProtos.Pipeline pipeline = builder.build();
 
-    long blockSize = (long)conf.getStorageSize(
-        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
-        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT,
-        StorageUnit.BYTES);
+    long blockSize = (long) conf
+        .getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
+            OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
 
     List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
     results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index c66d0f6..0b2a8d8 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -420,6 +420,84 @@ public class TestOzoneECClient {
     }
   }
 
+  @Test
+  public void testWriteShouldFailIfMoreThanParityNodesFail()
+      throws IOException {
+    testNodeFailuresWhileWriting(3, 3);
+  }
+
+  @Test
+  public void testWriteShouldSuccessIfLessThanParityNodesFail()
+      throws IOException {
+    testNodeFailuresWhileWriting(1, 2);
+  }
+
+  @Test
+  public void testWriteShouldSuccessIf4NodesFailed()
+      throws IOException {
+    testNodeFailuresWhileWriting(4, 1);
+  }
+
+  @Test
+  public void testWriteShouldSuccessIfAllNodesFailed()
+      throws IOException {
+    testNodeFailuresWhileWriting(4, 1);
+  }
+
+  public void testNodeFailuresWhileWriting(int numFailureToInject,
+      int numChunksToWriteAfterFailure) throws IOException {
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, 1024 * 3,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+            chunkSize), new HashMap<>())) {
+      for (int i = 0; i < dataBlocks; i++) {
+        out.write(inputChunks[i]);
+      }
+
+      List<DatanodeDetails> failedDNs = new ArrayList<>();
+      Map<DatanodeDetails, MockDatanodeStorage> storages =
+          ((MockXceiverClientFactory) factoryStub).getStorages();
+      DatanodeDetails[] dnDetails =
+          storages.keySet().toArray(new DatanodeDetails[storages.size()]);
+      for (int i = 0; i < numFailureToInject; i++) {
+        failedDNs.add(dnDetails[i]);
+      }
+
+      // First let's set storage as bad
+      ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+      for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+    final OzoneKeyDetails key = bucket.getKey(keyName);
+    // Data supposed to store in single block group. Since we introduced the
+    // failures after first stripe, the second stripe data should have been
+    // written into new blockgroup. So, we should have 2 block groups. That
+    // means two keyLocations.
+    Assert.assertEquals(2, key.getOzoneKeyLocations().size());
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[chunkSize];
+      for (int i = 0; i < dataBlocks; i++) {
+        Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+        Assert.assertTrue("Expected: " + new String(inputChunks[i],
+                UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+            Arrays.equals(inputChunks[i], fileContent));
+      }
+      for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+        Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+        Assert.assertTrue("Expected: " + new String(inputChunks[i],
+                UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+            Arrays.equals(inputChunks[i], fileContent));
+      }
+    }
+  }
+
+
   private OzoneBucket writeIntoECKey(byte[] data, String key,
       DefaultReplicationConfig defaultReplicationConfig) throws IOException {
     return writeIntoECKey(new byte[][] {data}, key, defaultReplicationConfig);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to