This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 86f7354 HDDS-6036: EC: handleStripeFailure should retry (#2910)
86f7354 is described below
commit 86f735418a0ded94d18a5729a18af6e6e16a48ca
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Dec 16 09:28:35 2021 -0800
HDDS-6036: EC: handleStripeFailure should retry (#2910)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 11 ++
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 5 +
.../common/src/main/resources/ozone-default.xml | 13 ++
.../ozone/client/io/ECBlockOutputStreamEntry.java | 10 +-
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 115 ++++++++++----
.../hadoop/ozone/client/MockBlockAllocator.java | 5 +-
.../hadoop/ozone/client/MockOmTransport.java | 21 ++-
.../ozone/client/MockXceiverClientFactory.java | 39 +++--
.../client/MultiNodePipelineBlockAllocator.java | 97 +++++++++---
.../ozone/client/SinglePipelineBlockAllocator.java | 5 +-
.../hadoop/ozone/client/TestOzoneClient.java | 4 +-
.../hadoop/ozone/client/TestOzoneECClient.java | 173 ++++++++++++++++++++-
12 files changed, 424 insertions(+), 74 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index f39ec86..8861f4b 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -124,6 +124,13 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private boolean checksumVerify = true;
+ @Config(key = "max.ec.stripe.write.retries",
+ defaultValue = "10",
+ description = "Ozone EC client to retry stripe to new block group on" +
+ " failures.",
+ tags = ConfigTag.CLIENT)
+ private int maxECStripeWriteRetries = 10;
+
@PostConstruct
private void validate() {
Preconditions.checkState(streamBufferSize > 0);
@@ -224,6 +231,10 @@ public class OzoneClientConfig {
this.checksumVerify = checksumVerify;
}
+ public int getMaxECStripeWriteRetries() {
+ return this.maxECStripeWriteRetries;
+ }
+
public int getBufferIncrement() {
return bufferIncrement;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 9c7d769..f719571 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -115,6 +115,11 @@ public final class OzoneConfigKeys {
"ozone.scm.block.size";
public static final String OZONE_SCM_BLOCK_SIZE_DEFAULT = "256MB";
+ public static final String OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES =
+ "ozone.client.max.ec.stripe.write.retries";
+ public static final String OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES_DEFAULT =
+ "10";
+
/**
* Ozone administrator users delimited by comma.
* If not set, only the user who launches an ozone service will be the
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e236d1e..3e2b5e9 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1178,6 +1178,7 @@
Supported values: RATIS, STAND_ALONE, CHAINED and EC.
</description>
</property>
+
<property>
<name>hdds.container.close.threshold</name>
<value>0.9f</value>
@@ -3044,4 +3045,16 @@
will create intermediate directories.
</description>
</property>
+
+ <property>
+ <name>ozone.client.max.ec.stripe.write.retries</name>
+ <value>10</value>
+ <tag>CLIENT</tag>
+ <description>
+ When EC stripe write failed, client will request to allocate new block
group and write the failed stripe into new
+ block group. If the same stripe failure continued in newly acquired
block group also, then it will retry by
+ requesting to allocate new block group again. This configuration is used
to limit these number of retries. By
+ default the number of retries are 10.
+ </description>
+ </property>
</configuration>
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 0cb7b0f..3670231 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -290,6 +290,14 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
return blockOutputStreams[0].getBlockID();
}
+ public List<ECBlockOutputStream> streamsWithWriteFailure() {
+ return getFailedStreams(false);
+ }
+
+ public List<ECBlockOutputStream> streamsWithPutBlockFailure() {
+ return getFailedStreams(true);
+ }
+
/**
* In EC, we will do async write calls for writing data in the scope of a
* stripe. After every stripe write finishes, use this method to validate the
@@ -300,7 +308,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
* futures if false.
* @return
*/
- public List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
+ private List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
final Iterator<ECBlockOutputStream> iter = blockStreams().iterator();
List<ECBlockOutputStream> failedStreams = new ArrayList<>();
while (iter.hasNext()) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index f5480c7..fd333e2 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -174,7 +174,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
int pos = handleDataWrite(currentStreamIdx, b, off,
currentWriterChunkLenToWrite,
currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
- //TODO: do we really need this call?
checkAndWriteParityCells(pos, false);
int remLen = len - currentWriterChunkLenToWrite;
int iters = remLen / ecChunkSize;
@@ -200,7 +199,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
writeOffset += len;
}
- private void handleStripeFailure(int chunkSize, int failedStripeDataSize)
+ private StripeWriteStatus rewriteStripeToNewBlockGroup(int chunkSize,
+ int failedStripeDataSize, boolean allocateBlockIfFull)
throws IOException {
long[] failedDataStripeChunkLens = new long[numDataBlks];
long[] failedParityStripeChunkLens = new long[numParityBlks];
@@ -246,9 +246,35 @@ public class ECKeyOutputStream extends KeyOutputStream {
true, true);
currentStreamEntry.useNextBlockStream();
}
+
+ if (hasWriteFailure()) {
+ return StripeWriteStatus.FAILED;
+ }
currentStreamEntry.executePutBlock();
+
+ if (hasPutBlockFailure()) {
+ return StripeWriteStatus.FAILED;
+ }
+ ECBlockOutputStreamEntry newBlockGroupStreamEntry =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ newBlockGroupStreamEntry
+ .updateBlockGroupToAckedPosition(failedStripeDataSize);
ecChunkBufferCache.clear(chunkSize);
ecChunkBufferCache.release();
+
+ if (newBlockGroupStreamEntry.getRemaining() <= 0) {
+ // In most cases this should not happen except in the case stripe size
and
+ // block size same.
+ newBlockGroupStreamEntry.close();
+ if (allocateBlockIfFull) {
+ blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ }
+ currentBlockGroupLen = 0;
+ } else {
+ newBlockGroupStreamEntry.resetToFirstEntry();
+ }
+
+ return StripeWriteStatus.SUCCESS;
}
private void checkAndWriteParityCells(int lastDataBuffPos,
@@ -262,12 +288,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
//Lets encode and write
if (handleParityWrites(ecChunkSize,
allocateBlockIfFull) == StripeWriteStatus.FAILED) {
- // TODO: This should make sure to retry until it's success. (HDDS-6036)
- handleStripeFailure(ecChunkSize, numDataBlks * ecChunkSize);
+ handleStripeFailure(numDataBlks * ecChunkSize, ecChunkSize,
+ allocateBlockIfFull);
+ } else {
+ // At this stage stripe write is successful.
+ currentStreamEntry.updateBlockGroupToAckedPosition(
+ currentStreamEntry.getCurrentPosition());
}
- // At this stage stripe write is successful.
- currentStreamEntry.updateBlockGroupToAckedPosition(
- currentStreamEntry.getCurrentPosition());
}
}
@@ -276,24 +303,18 @@ public class ECKeyOutputStream extends KeyOutputStream {
boolean allocateBlockIfFull)
throws IOException {
writeParityCells(parityCellSize);
+ if (hasWriteFailure()) {
+ return StripeWriteStatus.FAILED;
+ }
+
// By this time, we should have finished full stripe. So, lets call
// executePutBlock for all.
// TODO: we should alter the put block calls to share CRC to each stream.
ECBlockOutputStreamEntry streamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
- List<ECBlockOutputStream> failedStreams =
- streamEntry.getFailedStreams(false);
- // Since writes are async, let's check the failures once.
- if (failedStreams.size() > 0) {
- addToExcludeNodesList(failedStreams);
- return StripeWriteStatus.FAILED;
- }
streamEntry.executePutBlock();
- failedStreams = streamEntry.getFailedStreams(true);
- // Since putBlock also async, let's check the failures again.
- if (failedStreams.size() > 0) {
- addToExcludeNodesList(failedStreams);
+ if (hasPutBlockFailure()) {
return StripeWriteStatus.FAILED;
}
ecChunkBufferCache.clear(parityCellSize);
@@ -303,13 +324,38 @@ public class ECKeyOutputStream extends KeyOutputStream {
if (allocateBlockIfFull) {
blockOutputStreamEntryPool.allocateBlockIfNeeded();
}
+ currentBlockGroupLen = 0;
} else {
streamEntry.resetToFirstEntry();
}
- currentBlockGroupLen = 0;
+
return StripeWriteStatus.SUCCESS;
}
+ private boolean hasWriteFailure() {
+ List<ECBlockOutputStream> failedStreams =
+ blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .streamsWithWriteFailure();
+ // Since writes are async, let's check the failures once.
+ if (failedStreams.size() > 0) {
+ addToExcludeNodesList(failedStreams);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean hasPutBlockFailure() {
+ List<ECBlockOutputStream> failedStreams =
+ blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .streamsWithPutBlockFailure();
+ // Since writes are async, let's check the failures once.
+ if (failedStreams.size() > 0) {
+ addToExcludeNodesList(failedStreams);
+ return true;
+ }
+ return false;
+ }
+
private void addToExcludeNodesList(List<ECBlockOutputStream> failedStreams) {
for (ECBlockOutputStream failedStream : failedStreams) {
blockOutputStreamEntryPool.getExcludeList()
@@ -354,7 +400,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
private void handleOutputStreamWrite(int currIdx, long len,
- boolean isFullCell, boolean isParity) throws IOException {
+ boolean isFullCell, boolean isParity) {
BlockOutputStreamEntry current =
blockOutputStreamEntryPool.getCurrentStreamEntry();
@@ -482,13 +528,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
addPadding(parityCellSize);
if (handleParityWrites(parityCellSize,
false) == StripeWriteStatus.FAILED) {
- // TODO: loop this until we succeed?
- handleStripeFailure(parityCellSize, lastStripeSize);
+ handleStripeFailure(lastStripeSize, parityCellSize, false);
+ } else {
+ blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .updateBlockGroupToAckedPosition(
+ blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .getCurrentPosition());
}
- blockOutputStreamEntryPool.getCurrentStreamEntry()
- .updateBlockGroupToAckedPosition(
- blockOutputStreamEntryPool.getCurrentStreamEntry()
- .getCurrentPosition());
}
@@ -502,6 +548,23 @@ public class ECKeyOutputStream extends KeyOutputStream {
ecChunkBufferCache.release();
}
+ private void handleStripeFailure(int lastStripeSize, int parityCellSize,
+ boolean allocateBlockIfFull)
+ throws IOException {
+ StripeWriteStatus stripeWriteStatus;
+ for (int i = 0; i < this.config.getMaxECStripeWriteRetries(); i++) {
+ stripeWriteStatus =
+ rewriteStripeToNewBlockGroup(parityCellSize, lastStripeSize,
+ allocateBlockIfFull);
+ if (stripeWriteStatus == StripeWriteStatus.SUCCESS) {
+ return;
+ }
+ }
+ throw new IOException("Completed max allowed retries " + this.config
+ .getMaxECStripeWriteRetries() + " on stripe failures.");
+
+ }
+
private void addPadding(int parityCellSize) {
ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
index 747b39b..0d5e1a2 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockBlockAllocator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.client;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
@@ -27,7 +28,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLoca
*/
public interface MockBlockAllocator {
- Iterable<? extends KeyLocation> allocateBlock(
- KeyArgs createKeyRequest);
+ Iterable<? extends KeyLocation> allocateBlock(KeyArgs createKeyRequest,
+ ExcludeList excludeList);
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index be16e1f..9bc11e2 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -57,6 +58,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeI
import java.io.IOException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
@@ -132,10 +134,18 @@ public class MockOmTransport implements OmTransport {
private OzoneManagerProtocolProtos.AllocateBlockResponse allocateBlock(
OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest) {
- return OzoneManagerProtocolProtos.AllocateBlockResponse.newBuilder()
- .setKeyLocation(
- blockAllocator.allocateBlock(allocateBlockRequest.getKeyArgs())
- .iterator().next()).build();
+ Iterator<? extends OzoneManagerProtocolProtos.KeyLocation> iterator =
+ blockAllocator.allocateBlock(allocateBlockRequest.getKeyArgs(),
+ ExcludeList.getFromProtoBuf(allocateBlockRequest.getExcludeList()))
+ .iterator();
+ OzoneManagerProtocolProtos.AllocateBlockResponse.Builder builder =
+ OzoneManagerProtocolProtos.AllocateBlockResponse.newBuilder()
+ .setKeyLocation(iterator.next());
+ while (iterator.hasNext()) {
+ builder.mergeKeyLocation(iterator.next());
+ }
+ return builder.build();
+
}
private DeleteVolumeResponse deleteVolume(
@@ -195,7 +205,8 @@ public class MockOmTransport implements OmTransport {
.setModificationTime(now).setDataSize(keyArgs.getDataSize())
.setLatestVersion(0L).addKeyLocationList(
KeyLocationList.newBuilder().addAllKeyLocations(
- blockAllocator.allocateBlock(createKeyRequest.getKeyArgs()))
+ blockAllocator.allocateBlock(createKeyRequest.getKeyArgs(),
+ new ExcludeList()))
.build());
if (keyArgs.getType() == HddsProtos.ReplicationType.NONE) {
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index d4f1d23..ef2fd43 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -36,18 +37,30 @@ public class MockXceiverClientFactory
private final Map<DatanodeDetails, MockDatanodeStorage> storage =
new HashMap<>();
+ private List<DatanodeDetails> pendingToFailNodes = new ArrayList<>();
public void setFailedStorages(List<DatanodeDetails> failedStorages) {
- final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator =
- storage.entrySet().iterator();
- while (iterator.hasNext()) {
- final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
- iterator.next();
- if (failedStorages.contains(next.getKey())) {
- final MockDatanodeStorage value = next.getValue();
- value.setStorageFailed();
+ List<DatanodeDetails> remainingFailNodes = new ArrayList<>();
+ for(int i=0; i< failedStorages.size(); i++){
+ DatanodeDetails failedDN = failedStorages.get(i);
+ boolean isCurrentNodeMarked = false;
+ final Iterator<Map.Entry<DatanodeDetails, MockDatanodeStorage>> iterator
=
+ storage.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<DatanodeDetails, MockDatanodeStorage> next =
+ iterator.next();
+ if (next.getKey().equals(failedDN)) {
+ final MockDatanodeStorage value = next.getValue();
+ value.setStorageFailed();
+ isCurrentNodeMarked = true;
+ }
+ }
+ if(!isCurrentNodeMarked){
+ //This node does not initialized by client yet.
+ remainingFailNodes.add(failedDN);
}
}
+ this.pendingToFailNodes = remainingFailNodes;
}
@Override
@@ -58,9 +71,13 @@ public class MockXceiverClientFactory
@Override
public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException {
- return new MockXceiverClientSpi(pipeline, storage
- .computeIfAbsent(pipeline.getFirstNode(),
- r -> new MockDatanodeStorage()));
+ MockXceiverClientSpi mockXceiverClientSpi =
+ new MockXceiverClientSpi(pipeline, storage
+ .computeIfAbsent(pipeline.getFirstNode(),
+ r -> new MockDatanodeStorage()));
+ // Incase if this node already set to mark as failed.
+ setFailedStorages(this.pendingToFailNodes);
+ return mockXceiverClientSpi;
}
@Override
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
index be2f7fd..ee16d6d 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -21,61 +21,86 @@ package org.apache.hadoop.ozone.client;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
+import java.util.Set;
/**
* Allocates the block with required number of nodes in the pipeline.
+ * The nodes are pre-created with port numbers starting from 0 to
+ * ( given cluster size -1).
*/
public class MultiNodePipelineBlockAllocator implements MockBlockAllocator {
- public static final Random RANDOM = new Random();
private long blockId;
private int requiredNodes;
private final ConfigurationSource conf;
+ private List<HddsProtos.DatanodeDetailsProto> clusterDns = new ArrayList<>();
+ private int start = 0;
public MultiNodePipelineBlockAllocator(OzoneConfiguration conf,
- int requiredNodes) {
+ int requiredNodes, int clusterSize) {
this.requiredNodes = requiredNodes;
this.conf = conf;
+ // Pre-initializing the datanodes. Later allocateBlock API will use this
+ // nodes to add the required number of nodes in the block pipelines.
+ for (int i = 0; i < clusterSize; i++) {
+ clusterDns.add(HddsProtos.DatanodeDetailsProto.newBuilder().setUuid128(
+ HddsProtos.UUID.newBuilder().setLeastSigBits(i).setMostSigBits(i)
+ .build()).setHostName("localhost").setIpAddress("1.2.3.4")
+ .addPorts(HddsProtos.Port.newBuilder().setName("RATIS").setValue(i)
+ .build()).build());
+ }
+ }
+
+ public List<HddsProtos.DatanodeDetailsProto> getClusterDns(){
+ return this.clusterDns;
}
+ /**
+ * This method selects the block pipeline nodes from the pre-created cluster
+ * nodes(clusterDns). It will use requiredNodes field to decide how many
nodes
+ * to be chosen for the pipeline. To make the tests easy prediction of the
+ * node allocations, it will choose block pipeline nodes in a sliding window
+ * fashion starting from 0th index in clusterDns in incrementing order until
+ * given requireNodes number. Similarly for the next block pipeline, it will
+ * start from the index location of previous chosen pipeline's last node
index
+ * + 1. Let's say cluster size was initialized with 10 and required nodes are
+ * 5, the first block pipeline will have nodes from 0 to 4 and the second
+ * block will be assigned with the index locations of 5th to 9th nodes. Once
+ * we finish round of allocations, then it will start from 0 again for next
+ * block. It will also support exclude list. If client passes exclude list,
it
+ * will simply skip the node if it presents in exclude list, instead it will
+ * simply take the next node. If not enough nodes left due to the grown
+ * exclude list, it will throw IllegalStateException.
+ *
+ * @param keyArgs
+ * @param excludeList
+ * @return KeyLocation
+ */
@Override
public Iterable<? extends OzoneManagerProtocolProtos.KeyLocation>
- allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs) {
+ allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs,
+ ExcludeList excludeList) {
HddsProtos.Pipeline.Builder builder =
HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
.setType(keyArgs.getType()).setId(HddsProtos.PipelineID.newBuilder()
.setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
.setMostSigBits(1L).build()).build());
- final int rand = RANDOM.nextInt(); // used for port and UUID combination.
- // It's ok here for port number limit as don't really create any socket
- // connection.
- for (int i = 1; i <= requiredNodes; i++) {
- builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
- .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(rand)
- .setMostSigBits(i).build()).setHostName("localhost")
- .setIpAddress("1.2.3.4").addPorts(
- HddsProtos.Port.newBuilder().setName("RATIS").setValue(rand)
- .build()).build());
- if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
- builder.addMemberReplicaIndexes(i);
- }
- }
+ addMembers(builder, requiredNodes, excludeList.getDatanodes(), keyArgs);
if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
}
final HddsProtos.Pipeline pipeline = builder.build();
-
+ List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
long blockSize = (long) conf
.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
-
- List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
.setPipeline(pipeline).setBlockID(
HddsProtos.BlockID.newBuilder().setBlockCommitSequenceId(1L)
@@ -85,4 +110,34 @@ public class MultiNodePipelineBlockAllocator implements
MockBlockAllocator {
.setLength(blockSize).build());
return results;
}
+
+ private void addMembers(HddsProtos.Pipeline.Builder builder, int nodesNeeded,
+ Set<DatanodeDetails> excludedDataNodes,
+ OzoneManagerProtocolProtos.KeyArgs keyArgs) {
+ int clusterSize = clusterDns.size();
+ int counter = nodesNeeded;
+ int j = 0;
+ for (int i = 0; i < clusterDns.size(); i++) {
+ HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
+ clusterDns.get(start % clusterSize);
+ start++;
+ if (excludedDataNodes
+ .contains(DatanodeDetails.getFromProtoBuf(datanodeDetailsProto))) {
+ continue;
+ } else {
+ builder.addMembers(datanodeDetailsProto);
+ if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
+ builder.addMemberReplicaIndexes(++j);
+ }
+ if (--counter == 0) {
+ break;
+ }
+ }
+ }
+ if (counter > 0) {
+ throw new IllegalStateException(
+ "MockedImpl: Could not find enough nodes.");
+ }
+ }
+
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
index 9d88e8c..20fbc03 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/SinglePipelineBlockAllocator.java
@@ -27,6 +27,7 @@ import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.Port;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.UUID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
@@ -49,8 +50,8 @@ public class SinglePipelineBlockAllocator
}
@Override
- public Iterable<? extends KeyLocation> allocateBlock(
- KeyArgs keyArgs) {
+ public Iterable<? extends KeyLocation> allocateBlock(KeyArgs keyArgs,
+ ExcludeList excludeList) {
if (pipeline == null) {
Pipeline.Builder bldr = Pipeline.newBuilder()
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index e7bc9d7..23b181f 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -198,8 +198,8 @@ public class TestOzoneClient {
int data = 3;
int parity = 2;
int chunkSize = 1024;
- createNewClient(config, new MultiNodePipelineBlockAllocator(
- config, data + parity));
+ createNewClient(config,
+ new MultiNodePipelineBlockAllocator(config, data + parity, 15));
String value = new String(new byte[chunkSize], UTF_8);
OzoneBucket bucket = getOzoneBucket();
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 8cd4365..c3c8c6d 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
@@ -72,8 +74,8 @@ public class TestOzoneECClient {
private final XceiverClientFactory factoryStub =
new MockXceiverClientFactory();
private OzoneConfiguration conf = new OzoneConfiguration();
- private MockOmTransport transportStub = new MockOmTransport(
- new MultiNodePipelineBlockAllocator(conf, dataBlocks + parityBlocks));
+ private final MockOmTransport transportStub = new MockOmTransport(
+ new MultiNodePipelineBlockAllocator(conf, dataBlocks + parityBlocks,
15));
private final RawErasureEncoder encoder =
new RSRawErasureCoderFactory().createEncoder(
new ECReplicationConfig(dataBlocks, parityBlocks));
@@ -82,13 +84,22 @@ public class TestOzoneECClient {
public void init() throws IOException {
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2,
StorageUnit.KB);
+ createNewClient(conf, transportStub);
+ }
+
+ private void createNewClient(ConfigurationSource config,
+ MockBlockAllocator blkAllocator) throws IOException {
+ createNewClient(config, new MockOmTransport(blkAllocator));
+ }
- client = new OzoneClient(conf, new RpcClient(conf, null) {
+ private void createNewClient(ConfigurationSource config,
+ final MockOmTransport transport) throws IOException {
+ client = new OzoneClient(config, new RpcClient(config, null) {
@Override
protected OmTransport createOmTransport(String omServiceId)
throws IOException {
- return transportStub;
+ return transport;
}
@Override
@@ -436,6 +447,160 @@ public class TestOzoneECClient {
testNodeFailuresWhileWriting(4, 1);
}
+ @Test
+ public void testStripeWriteRetriesOn2Failures() throws IOException {
+ OzoneConfiguration con = new OzoneConfiguration();
+ con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2,
StorageUnit.KB);
+ // Cluster has 15 nodes. So, first we will create 3 block groups with
+ // distinct nodes in each. Block Group 1: 0-4, Block Group 2: 5-9, Block
+ // Group 3: 10-14
+ // To mark the node failed in the second block group.
+ int[] nodesIndexesToMarkFailure = new int[2];
+ nodesIndexesToMarkFailure[0] = 0;
+ // To mark the node failed in the second block group also.
+ nodesIndexesToMarkFailure[1] = 5;
+ // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation
+ // should pick next good block group as we have 15 nodes.
+ int clusterSize = 15;
+ testStripeWriteRetriesOnFailures(con, clusterSize,
+ nodesIndexesToMarkFailure);
+ // It should have used 3rd block group also. So, total initialized nodes
+ // count should be clusterSize.
+ Assert.assertTrue(((MockXceiverClientFactory) factoryStub).getStorages()
+ .size() == clusterSize);
+ }
+
+ @Test
+ public void testStripeWriteRetriesOn3Failures() throws IOException {
+ OzoneConfiguration con = new OzoneConfiguration();
+ con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2,
StorageUnit.KB);
+
+ int[] nodesIndexesToMarkFailure = new int[3];
+ nodesIndexesToMarkFailure[0] = 0;
+ // To mark the node failed in the second block group.
+ nodesIndexesToMarkFailure[1] = 5;
+ // To mark the node failed in the third block group.
+ nodesIndexesToMarkFailure[2] = 10;
+ // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation will
+ // pick the remaining goods for the next block group.
+ int clusterSize = 15;
+ testStripeWriteRetriesOnFailures(con, clusterSize,
+ nodesIndexesToMarkFailure);
+ // It should have used 3rd block group also. So, total initialized nodes
+ // count should be clusterSize.
+ Assert.assertTrue(((MockXceiverClientFactory) factoryStub).getStorages()
+ .size() == clusterSize);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ // The mocked impl throws IllegalStateException when there are not enough
+ // nodes in allocateBlock request.
+ public void testStripeWriteRetriesOnAllNodeFailures() throws IOException {
+ OzoneConfiguration con = new OzoneConfiguration();
+ con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2,
StorageUnit.KB);
+
+ // After writing first stripe, we will mark all nodes as bad in the
cluster.
+ int clusterSize = 5;
+ int[] nodesIndexesToMarkFailure = new int[clusterSize];
+ for (int i = 0; i < nodesIndexesToMarkFailure.length; i++) {
+ nodesIndexesToMarkFailure[i] = i;
+ }
+ // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation can
+ // not pick new block group as all nodes in cluster marked as bad.
+ testStripeWriteRetriesOnFailures(con, clusterSize,
+ nodesIndexesToMarkFailure);
+ }
+
+ @Test
+ public void testStripeWriteRetriesOn4FailuresWith3RetriesAllowed()
+ throws IOException {
+ OzoneConfiguration con = new OzoneConfiguration();
+ con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2,
StorageUnit.KB);
+ con.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES, 3);
+
+ int[] nodesIndexesToMarkFailure = new int[4];
+ nodesIndexesToMarkFailure[0] = 0;
+ //To mark node failed in second block group.
+ nodesIndexesToMarkFailure[1] = 5;
+ //To mark node failed in third block group.
+ nodesIndexesToMarkFailure[2] = 10;
+ //To mark node failed in fourth block group.
+ nodesIndexesToMarkFailure[3] = 15;
+ try {
+ // Mocked MultiNodePipelineBlockAllocator#allocateBlock implementation
can
+ // pick good block group, but client retries should be limited
+ // OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES_ON_FAILURE(here it was
+ // configured as 3). So, it should fail as we have marked 3 nodes as bad.
+ testStripeWriteRetriesOnFailures(con, 20, nodesIndexesToMarkFailure);
+ Assert.fail(
+ "Expecting it to fail as retries should exceed the max allowed
times:"
+ + " " + 3);
+ } catch (IOException e) {
+ Assert.assertEquals("Completed max allowed retries 3 on stripe
failures.",
+ e.getMessage());
+ }
+ }
+
+ public void testStripeWriteRetriesOnFailures(OzoneConfiguration con,
+ int clusterSize, int[] nodesIndexesToMarkFailure) throws IOException {
+ close();
+ MultiNodePipelineBlockAllocator blkAllocator =
+ new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks,
+ clusterSize);
+ createNewClient(con, blkAllocator);
+ int numChunksToWriteAfterFailure = 3;
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, 1024 * 3,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ for (int i = 0; i < dataBlocks; i++) {
+ out.write(inputChunks[i]);
+ }
+ Assert.assertTrue(
+ ((MockXceiverClientFactory) factoryStub).getStorages().size() == 5);
+ List<DatanodeDetails> failedDNs = new ArrayList<>();
+ List<HddsProtos.DatanodeDetailsProto> dns = blkAllocator.getClusterDns();
+
+ for (int j = 0; j < nodesIndexesToMarkFailure.length; j++) {
+ failedDNs.add(DatanodeDetails
+ .getFromProtoBuf(dns.get(nodesIndexesToMarkFailure[j])));
+ }
+
+ // First let's set storage as bad
+ ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+ // Writer should be able to write by using 3rd block group.
+ for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ final OzoneKeyDetails key = bucket.getKey(keyName);
+ // Data supposed to store in single block group. Since we introduced the
+ // failures after first stripe, the second stripe data should have been
+ // written into new blockgroup. So, we should have 2 block groups. That
+ // means two keyLocations.
+ Assert.assertEquals(2, key.getOzoneKeyLocations().size());
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[chunkSize];
+ for (int i = 0; i < dataBlocks; i++) {
+ Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+ Assert.assertTrue("Expected: " + new String(inputChunks[i],
+ UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+ Arrays.equals(inputChunks[i], fileContent));
+ }
+ for (int i = 0; i < numChunksToWriteAfterFailure; i++) {
+ Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+ Assert.assertTrue("Expected: " + new String(inputChunks[i],
+ UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+ Arrays.equals(inputChunks[i], fileContent));
+ }
+ }
+ }
+
public void testNodeFailuresWhileWriting(int numFailureToInject,
int numChunksToWriteAfterFailure) throws IOException {
store.createVolume(volumeName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]