This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new f5dbe00 Hdds 5491: EC: Write should handle node failures. (#2767)
f5dbe00 is described below
commit f5dbe00309aeac90417eb87a51fa64b95c71c509
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Sat Nov 6 14:59:47 2021 -0700
Hdds 5491: EC: Write should handle node failures. (#2767)
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 10 +-
.../hdds/scm/storage/ECBlockOutputStream.java | 25 ++++-
.../ozone/client/io/ECBlockOutputStreamEntry.java | 91 +++++++++++++++--
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 111 +++++++++++++++++----
.../hadoop/ozone/client/MockDatanodeStorage.java | 12 ++-
.../ozone/client/MockXceiverClientFactory.java | 15 +++
.../hadoop/ozone/client/MockXceiverClientSpi.java | 11 +-
.../client/MultiNodePipelineBlockAllocator.java | 51 +++++-----
.../hadoop/ozone/client/TestOzoneECClient.java | 78 +++++++++++++++
9 files changed, 339 insertions(+), 65 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index ebcd962..5ce57e1 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -598,7 +598,7 @@ public class BlockOutputStream extends OutputStream {
}
- void setIoException(Exception e) {
+ public void setIoException(Exception e) {
IOException ioe = getIoException();
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(),
e);
@@ -654,8 +654,10 @@ public class BlockOutputStream extends OutputStream {
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneChecksumException if there is an error while computing
* checksum
+ * @return
*/
- void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
+ ChunkBuffer chunk) throws IOException {
int effectiveChunkSize = chunk.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
final ByteString data = chunk.toByteString(
@@ -693,13 +695,15 @@ public class BlockOutputStream extends OutputStream {
setIoException(ce);
throw ce;
});
+ containerBlockData.addChunks(chunkInfo);
+ return future;
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
- containerBlockData.addChunks(chunkInfo);
+ return null;
}
@VisibleForTesting
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 86d7058..5ee34d3 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -41,6 +41,11 @@ import static
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
*/
public class ECBlockOutputStream extends BlockOutputStream{
+ private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ currentChunkRspFuture = null;
+
+ private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ putBlkRspFuture = null;
/**
* Creates a new ECBlockOutputStream.
*
@@ -63,7 +68,8 @@ public class ECBlockOutputStream extends BlockOutputStream{
@Override
public void write(byte[] b, int off, int len) throws IOException {
- writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
+ this.currentChunkRspFuture =
+ writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
}
/**
@@ -113,6 +119,7 @@ public class ECBlockOutputStream extends BlockOutputStream{
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
+ this.putBlkRspFuture = flushFuture;
return flushFuture;
}
@@ -121,4 +128,20 @@ public class ECBlockOutputStream extends BlockOutputStream{
super.close();
cleanup(false);
}
+
+ /**
+ * @return The current chunk writer response future.
+ */
+ public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ getCurrentChunkResponseFuture() {
+ return this.currentChunkRspFuture;
+ }
+
+ /**
+ * @return The current chunk putBlock response future.
+ */
+ public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ getCurrentPutBlkResponseFuture() {
+ return this.putBlkRspFuture;
+ }
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 22a1906..95609b6 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -22,11 +22,14 @@ import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -37,9 +40,12 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -125,6 +131,12 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
currentStreamIdx++;
}
+ public void markFailed(Exception e) {
+ if (blockOutputStreams[currentStreamIdx] != null) {
+ blockOutputStreams[currentStreamIdx].setIoException(e);
+ }
+ }
+
public void forceToFirstParityBlock(){
currentStreamIdx = replicationConfig.getData();
}
@@ -251,21 +263,11 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
if (!isInitialized()) {
return;
}
- int failedStreams = 0;
for (ECBlockOutputStream stream : blockOutputStreams) {
if (stream == null) {
continue;
}
- if (!stream.isClosed()) {
- stream.executePutBlock(false, true);
- } else {
- failedStreams++;
- }
- if(failedStreams > replicationConfig.getParity()) {
- throw new IOException(
- "There are " + failedStreams + " block write failures,"
- + " supported tolerance: " + replicationConfig.getParity());
- }
+ stream.executePutBlock(false, true);
}
}
@@ -279,6 +281,73 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
return blockOutputStreams[0].getBlockID();
}
+ /**
+ * In EC, we will do async write calls for writing data in the scope of a
+ * stripe. After every stripe write finishes, use this method to validate the
+ * responses of current stripe data writes. This method can also be used to
+ * validate the stripe put block responses.
+ * @param forPutBlock : If true, it will validate the put block response
+ * futures. It will validates stripe data write response
+ * futures if false.
+ * @return
+ */
+ public boolean checkStreamFailures(boolean forPutBlock) {
+ final Iterator<ECBlockOutputStream> iter = blockStreams().iterator();
+ while (iter.hasNext()) {
+ final ECBlockOutputStream stream = iter.next();
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ responseFuture = null;
+ if (forPutBlock) {
+ responseFuture =
+ stream != null ? stream.getCurrentPutBlkResponseFuture() : null;
+ } else {
+ responseFuture =
+ stream != null ? stream.getCurrentChunkResponseFuture() : null;
+ }
+ if (isFailed(stream, responseFuture)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isFailed(
+ ECBlockOutputStream outputStream,
+ CompletableFuture<ContainerProtos.
+ ContainerCommandResponseProto> chunkWriteResponseFuture) {
+ ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
+ = null;
+ try {
+ containerCommandResponseProto = chunkWriteResponseFuture != null ?
+ chunkWriteResponseFuture.get() :
+ null;
+ } catch (InterruptedException e) {
+ outputStream.setIoException(e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ outputStream.setIoException(e);
+ }
+
+ if ((outputStream != null && containerCommandResponseProto != null)
+ && (outputStream.getIoException() != null || isStreamFailed(
+ containerCommandResponseProto, outputStream))) {
+ return true;
+ }
+ return false;
+ }
+
+ boolean isStreamFailed(
+ ContainerProtos.ContainerCommandResponseProto responseProto,
+ ECBlockOutputStream stream) {
+ try {
+ ContainerProtocolCalls.validateContainerResponse(responseProto);
+ } catch (StorageContainerException sce) {
+ stream.setIoException(sce);
+ return true;
+ }
+ return false;
+ }
+
private boolean isWritingParity() {
return currentStreamIdx >= replicationConfig.getData();
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 799e15d..8fedde1 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -65,6 +65,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
private final RawErasureEncoder encoder;
private final ECReplicationConfig.EcCodec ecCodec;
+ private enum StripeWriteStatus {
+ SUCCESS,
+ FAILED
+ }
+
private long currentBlockGroupLen = 0;
/**
* Defines stream action while calling handleFlushOrClose.
@@ -81,9 +86,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
private long offset;
// how much data has been ingested into the stream
private long writeOffset;
- // whether an exception is encountered while write and whole write could
- // not succeed
- private boolean isException;
private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;
@VisibleForTesting
@@ -124,7 +126,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
replicationConfig, uploadID, partNumber, isMultipart, info,
unsafeByteBufferConversion, xceiverClientManager, handler.getId());
- this.isException = false;
this.writeOffset = 0;
OzoneConfiguration conf = new OzoneConfiguration();
ECSchema schema =
@@ -217,20 +218,73 @@ public class ECKeyOutputStream extends KeyOutputStream {
writeOffset += len;
}
- private void checkAndWriteParityCells(int lastDataBuffPos,
- boolean allocateBlockIfFull)
+ private void handleStripeFailure(int chunkSize, int failedStripeDataSize)
throws IOException {
+ long[] failedDataStripeChunkLens = new long[numDataBlks];
+ long[] failedParityStripeChunkLens = new long[numParityBlks];
+ final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
+ for (int i = 0; i < numDataBlks; i++) {
+ failedDataStripeChunkLens[i] = dataBuffers[i].limit();
+ }
+ final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
+ for (int i = 0; i < numParityBlks; i++) {
+ failedParityStripeChunkLens[i] = parityBuffers[i].limit();
+ }
+
+ blockOutputStreamEntryPool.getCurrentStreamEntry().resetToFirstEntry();
+ // Rollback the length/offset updated as part of this failed stripe write.
+ offset -= failedStripeDataSize;
+ blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .incCurrentPosition(-failedStripeDataSize);
+
+ // Let's close the current entry.
+ blockOutputStreamEntryPool.getCurrentStreamEntry().close();
+ currentBlockGroupLen = 0;
+
+ // Let's rewrite the last stripe, so that it will be written to new block
+ // group.
+ // TODO: we can improve to write partial stripe failures. In that case,
+ // we just need to write only available buffers.
+ blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ final ECBlockOutputStreamEntry currentStreamEntry =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ long totalLenToWrite = failedStripeDataSize;
+ for (int i = 0; i < numDataBlks; i++) {
+ long currentLen = totalLenToWrite < failedDataStripeChunkLens[i] ?
+ totalLenToWrite :
+ failedDataStripeChunkLens[i];
+ if (currentLen > 0) {
+ handleOutputStreamWrite(i, currentLen, true, false);
+ }
+ currentStreamEntry.useNextBlockStream();
+ totalLenToWrite -= currentLen;
+ }
+ for (int i = 0; i < (numParityBlks); i++) {
+ handleOutputStreamWrite(i + numDataBlks, failedParityStripeChunkLens[i],
+ true, true);
+ currentStreamEntry.useNextBlockStream();
+ }
+ currentStreamEntry.executePutBlock();
+ ecChunkBufferCache.clear(chunkSize);
+ ecChunkBufferCache.release();
+ }
+
+ private void checkAndWriteParityCells(int lastDataBuffPos,
+ boolean allocateBlockIfFull) throws IOException {
//check data blocks finished
//If index is > datanum blks
int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
.getCurrentStreamIdx();
if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
//Lets encode and write
- handleParityWrites(ecChunkSize, allocateBlockIfFull);
+ if (handleParityWrites(ecChunkSize,
+ allocateBlockIfFull) == StripeWriteStatus.FAILED) {
+ handleStripeFailure(ecChunkSize, numDataBlks * ecChunkSize);
+ }
}
}
- private void handleParityWrites(int parityCellSize,
+ private StripeWriteStatus handleParityWrites(int parityCellSize,
boolean allocateBlockIfFull)
throws IOException {
writeParityCells(parityCellSize);
@@ -239,7 +293,15 @@ public class ECKeyOutputStream extends KeyOutputStream {
// TODO: we should alter the put block calls to share CRC to each stream.
ECBlockOutputStreamEntry streamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
+ // Since writes are async, let's check the failures once.
+ if(streamEntry.checkStreamFailures(false)){
+ return StripeWriteStatus.FAILED;
+ }
streamEntry.executePutBlock();
+ // Since putBlock also async, let's check the failures again.
+ if(streamEntry.checkStreamFailures(true)){
+ return StripeWriteStatus.FAILED;
+ }
ecChunkBufferCache.clear(parityCellSize);
if (streamEntry.getRemaining() <= 0) {
@@ -251,6 +313,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
streamEntry.resetToFirstEntry();
}
currentBlockGroupLen = 0;
+ return StripeWriteStatus.SUCCESS;
}
void writeParityCells(int parityCellSize) throws IOException {
@@ -268,14 +331,14 @@ public class ECKeyOutputStream extends KeyOutputStream {
numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
// Move the stream entry cursor to parity block index
handleParityWrite(i, parityBuffers[i - numDataBlks].array(), 0,
- ecChunkSize, true);
+ parityCellSize, true);
}
}
private int handleDataWrite(int currIdx, byte[] b, int off, long len,
boolean isFullCell) throws IOException {
int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, (int) len);
- handleOutputStreamWrite(currIdx, b, off, len, isFullCell, false);
+ handleOutputStreamWrite(currIdx, len, isFullCell, false);
if(pos == ecChunkSize){
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
@@ -285,11 +348,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
private void handleParityWrite(int currIdx, byte[] b, int off, long len,
boolean isFullCell) throws IOException {
- handleOutputStreamWrite(currIdx, b, off, len, isFullCell, true);
+ handleOutputStreamWrite(currIdx, len, isFullCell, true);
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}
- private void handleOutputStreamWrite(int currIdx, byte[] b, int off, long
len,
+ private void handleOutputStreamWrite(int currIdx, long len,
boolean isFullCell, boolean isParity) throws IOException {
BlockOutputStreamEntry current =
@@ -298,17 +361,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
Math.min((int) len, (int) current.getRemaining());
currentBlockGroupLen += isParity ? 0 : writeLengthToCurrStream;
- len -= writeLengthToCurrStream;
if (isFullCell) {
ByteBuffer bytesToWrite = isParity ?
ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
ecChunkBufferCache.getDataBuffers()[currIdx];
try {
+ // Since it's a fullcell, let's write all content from buffer.
writeToOutputStream(current, len, bytesToWrite.array(),
bytesToWrite.array().length, 0, current.getWrittenDataLength(),
isParity);
} catch (Exception e) {
- markStreamClosed();
+ markStreamAsFailed(e);
}
}
}
@@ -347,7 +410,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
// In EC, we will just close the current stream.
- streamEntry.close();
+ markStreamAsFailed(exception);
}
private void markStreamClosed() {
@@ -355,6 +418,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
closed = true;
}
+ private void markStreamAsFailed(Exception e) {
+ blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e);
+ }
+
@Override
public void flush() throws IOException {
checkNotClosed();
@@ -473,7 +540,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
bytesToWrite.position(), 0, current.getWrittenDataLength(),
false);
} catch (Exception e) {
- markStreamClosed();
+ markStreamAsFailed(e);
}
}
final int lastStripeSize =
@@ -482,13 +549,15 @@ public class ECKeyOutputStream extends KeyOutputStream {
final int parityCellSize =
lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
addPadding(parityCellSize);
- handleParityWrites(parityCellSize, false);
+ if (handleParityWrites(parityCellSize,
+ false) == StripeWriteStatus.FAILED) {
+ // TODO: loop this until we succeed?
+ handleStripeFailure(parityCellSize, lastStripeSize);
+ }
}
- handleFlushOrCloseAllStreams(StreamAction.CLOSE);
- if (!isException) {
- Preconditions.checkArgument(writeOffset == offset);
- }
+ handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+ Preconditions.checkArgument(writeOffset == offset);
blockOutputStreamEntryPool.getCurrentStreamEntry().close();
blockOutputStreamEntryPool.commitKey(offset);
} finally {
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index 73ab96b..ce70c15 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -23,6 +23,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -37,6 +38,12 @@ public class MockDatanodeStorage {
private final Map<String, ByteString> data = new HashMap<>();
+ private boolean failed = false;
+
+ public void setStorageFailed(){
+ this.failed = true;
+ }
+
public void putBlock(DatanodeBlockID blockID, BlockData blockData) {
blocks.put(blockID, blockData);
}
@@ -47,7 +54,10 @@ public class MockDatanodeStorage {
public void writeChunk(
DatanodeBlockID blockID,
- ChunkInfo chunkInfo, ByteString bytes) {
+ ChunkInfo chunkInfo, ByteString bytes) throws IOException {
+ if (failed) {
+ throw new IOException("This storage was marked as failed.");
+ }
data.put(createKey(blockID, chunkInfo),
ByteString.copyFrom(bytes.toByteArray()));
chunks.put(createKey(blockID, chunkInfo), chunkInfo);
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index da72651..d4f1d23 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
/**
@@ -35,6 +37,19 @@ public class MockXceiverClientFactory
private final Map<DatanodeDetails, MockDatanodeStorage> storage =
new HashMap<>();
+ public void setFailedStorages(List<DatanodeDetails> failedStorages) {
+ final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator =
+ storage.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
+ iterator.next();
+ if (failedStorages.contains(next.getKey())) {
+ final MockDatanodeStorage value = next.getValue();
+ value.setStorageFailed();
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 4d3db44..4dfe966 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -82,7 +83,13 @@ public class MockXceiverClientSpi extends XceiverClientSpi {
switch (request.getCmdType()) {
case WriteChunk:
return result(request,
- r -> r.setWriteChunk(writeChunk(request.getWriteChunk())));
+ r -> {
+ try {
+ return r.setWriteChunk(writeChunk(request.getWriteChunk()));
+ } catch (IOException e) {
+ return r.setResult(Result.IO_EXCEPTION);
+ }
+ });
case ReadChunk:
return result(request,
r -> r.setReadChunk(readChunk(request.getReadChunk())));
@@ -149,7 +156,7 @@ public class MockXceiverClientSpi extends XceiverClientSpi {
}
private WriteChunkResponseProto writeChunk(
- WriteChunkRequestProto writeChunk) {
+ WriteChunkRequestProto writeChunk) throws IOException {
datanodeStorage
.writeChunk(writeChunk.getBlockID(), writeChunk.getChunkData(),
writeChunk.getData());
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
index 2c315b7..be2f7fd 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -27,13 +27,14 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
/**
* Allocates the block with required number of nodes in the pipeline.
*/
public class MultiNodePipelineBlockAllocator implements MockBlockAllocator {
+ public static final Random RANDOM = new Random();
private long blockId;
- private HddsProtos.Pipeline pipeline;
private int requiredNodes;
private final ConfigurationSource conf;
@@ -46,35 +47,33 @@ public class MultiNodePipelineBlockAllocator implements
MockBlockAllocator {
@Override
public Iterable<? extends OzoneManagerProtocolProtos.KeyLocation>
allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs) {
- if (pipeline == null) {
- HddsProtos.Pipeline.Builder builder =
- HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
- .setType(keyArgs.getType()).setId(
- HddsProtos.PipelineID.newBuilder().setUuid128(
- HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
- .setMostSigBits(1L).build()).build());
-
- for (int i = 1; i <= requiredNodes; i++) {
- builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
- .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(i)
- .setMostSigBits(i).build()).setHostName("localhost")
- .setIpAddress("1.2.3.4").addPorts(
- HddsProtos.Port.newBuilder().setName("RATIS").setValue(1234 +
i)
- .build()).build());
- if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
- builder.addMemberReplicaIndexes(i);
- }
- }
+ HddsProtos.Pipeline.Builder builder =
+ HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
+
.setType(keyArgs.getType()).setId(HddsProtos.PipelineID.newBuilder()
+ .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
+ .setMostSigBits(1L).build()).build());
+ final int rand = RANDOM.nextInt(); // used for port and UUID combination.
+ // It's ok here for port number limit as don't really create any socket
+ // connection.
+ for (int i = 1; i <= requiredNodes; i++) {
+ builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
+ .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(rand)
+ .setMostSigBits(i).build()).setHostName("localhost")
+ .setIpAddress("1.2.3.4").addPorts(
+ HddsProtos.Port.newBuilder().setName("RATIS").setValue(rand)
+ .build()).build());
if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
- builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
+ builder.addMemberReplicaIndexes(i);
}
- pipeline = builder.build();
}
+ if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
+ builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
+ }
+ final HddsProtos.Pipeline pipeline = builder.build();
- long blockSize = (long)conf.getStorageSize(
- OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
- OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT,
- StorageUnit.BYTES);
+ long blockSize = (long) conf
+ .getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
+ OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index c66d0f6..0b2a8d8 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -420,6 +420,84 @@ public class TestOzoneECClient {
}
}
+ @Test
+ public void testWriteShouldFailIfMoreThanParityNodesFail()
+ throws IOException {
+ testNodeFailuresWhileWriting(3, 3);
+ }
+
+ @Test
+ public void testWriteShouldSuccessIfLessThanParityNodesFail()
+ throws IOException {
+ testNodeFailuresWhileWriting(1, 2);
+ }
+
+ @Test
+ public void testWriteShouldSuccessIf4NodesFailed()
+ throws IOException {
+ testNodeFailuresWhileWriting(4, 1);
+ }
+
+ @Test
+ public void testWriteShouldSuccessIfAllNodesFailed()
+ throws IOException {
+ testNodeFailuresWhileWriting(4, 1);
+ }
+
+ public void testNodeFailuresWhileWriting(int numFailureToInject,
+ int numChunksToWriteAfterFailure) throws IOException {
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, 1024 * 3,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ for (int i = 0; i < dataBlocks; i++) {
+ out.write(inputChunks[i]);
+ }
+
+ List<DatanodeDetails> failedDNs = new ArrayList<>();
+ Map<DatanodeDetails, MockDatanodeStorage> storages =
+ ((MockXceiverClientFactory) factoryStub).getStorages();
+ DatanodeDetails[] dnDetails =
+ storages.keySet().toArray(new DatanodeDetails[storages.size()]);
+ for (int i = 0; i < numFailureToInject; i++) {
+ failedDNs.add(dnDetails[i]);
+ }
+
+ // First let's set storage as bad
+ ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+ for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ final OzoneKeyDetails key = bucket.getKey(keyName);
+ // Data supposed to store in single block group. Since we introduced the
+ // failures after first stripe, the second stripe data should have been
+ // written into new blockgroup. So, we should have 2 block groups. That
+ // means two keyLocations.
+ Assert.assertEquals(2, key.getOzoneKeyLocations().size());
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[chunkSize];
+ for (int i = 0; i < dataBlocks; i++) {
+ Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+ Assert.assertTrue("Expected: " + new String(inputChunks[i],
+ UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+ Arrays.equals(inputChunks[i], fileContent));
+ }
+ for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+ Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+ Assert.assertTrue("Expected: " + new String(inputChunks[i],
+ UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+ Arrays.equals(inputChunks[i], fileContent));
+ }
+ }
+ }
+
+
private OzoneBucket writeIntoECKey(byte[] data, String key,
DefaultReplicationConfig defaultReplicationConfig) throws IOException {
return writeIntoECKey(new byte[][] {data}, key, defaultReplicationConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]