This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new e82682f HDDS-5551. EC: Implement an Input Stream to reconstruct EC
blocks on demand (#2797)
e82682f is described below
commit e82682fe8b4431b565201165278b96f24ec28303
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Nov 9 18:15:50 2021 +0000
HDDS-5551. EC: Implement an Input Stream to reconstruct EC blocks on demand
(#2797)
---
.../hadoop/ozone/client/io/ECBlockInputStream.java | 107 ++--
.../io/ECBlockReconstructedStripeInputStream.java | 415 ++++++++++++++
.../client/io/InsufficientLocationsException.java | 43 ++
.../client/rpc/read/TestECBlockInputStream.java | 10 -
.../TestECBlockReconstructedStripeInputStream.java | 601 +++++++++++++++++++++
5 files changed, 1133 insertions(+), 43 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index ae030b5..52b370b 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -55,7 +55,6 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private final Function<BlockID, Pipeline> refreshFunction;
private final OmKeyLocationInfo blockInfo;
private final DatanodeDetails[] dataLocations;
- private final DatanodeDetails[] parityLocations;
private final BlockExtendedInputStream[] blockStreams;
private final int maxLocations;
@@ -63,6 +62,43 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private boolean closed = false;
private boolean seeked = false;
+ protected OmKeyLocationInfo getBlockInfo() {
+ return blockInfo;
+ }
+
+ protected ECReplicationConfig getRepConfig() {
+ return repConfig;
+ }
+
+ protected DatanodeDetails[] getDataLocations() {
+ return dataLocations;
+ }
+
+ protected long getStripeSize() {
+ return stripeSize;
+ }
+
+ protected int availableDataLocations() {
+ int count = 0;
+ for (int i = 0; i < repConfig.getData(); i++) {
+ if (dataLocations[i] != null) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ protected int availableParityLocations() {
+ int count = 0;
+ for (int i = repConfig.getData();
+ i < repConfig.getData() + repConfig.getParity(); i++) {
+ if (dataLocations[i] != null) {
+ count++;
+ }
+ }
+ return count;
+ }
+
public ECBlockInputStream(ECReplicationConfig repConfig,
OmKeyLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
@@ -75,33 +111,29 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
this.maxLocations = repConfig.getData() + repConfig.getParity();
- this.dataLocations = new DatanodeDetails[repConfig.getData()];
- this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+ this.dataLocations =
+ new DatanodeDetails[repConfig.getData() + repConfig.getParity()];
this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];
- stripeSize = ecChunkSize * repConfig.getData();
+ this.stripeSize = (long)ecChunkSize * repConfig.getData();
setBlockLocations(this.blockInfo.getPipeline());
}
public synchronized boolean hasSufficientLocations() {
- // Until we implement "on the fly" recovery, all data location must be
- // present and we have enough locations if that is the case.
- //
// The number of locations needed is a function of the EC Chunk size. If
the
// block length is <= the chunk size, we should only have location 1. If it
// is greater than the chunk size but less than chunk_size * 2, then we
must
// have two locations. If it is greater than chunk_size * data_num, then we
// must have all data_num locations.
- int expectedDataBlocks =
- (int)Math.min(
- Math.ceil((double)blockInfo.getLength() / ecChunkSize),
- repConfig.getData());
- for (int i=0; i<expectedDataBlocks; i++) {
- if (dataLocations[i] == null) {
- return false;
- }
- }
- return true;
+ // We only consider data locations here.
+ int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+ return expectedDataBlocks == availableDataLocations();
+ }
+
+ protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
+ return (int)Math.min(Math.ceil(
+ (double)getBlockInfo().getLength() / rConfig.getEcChunkSize()),
+ rConfig.getData());
}
/**
@@ -110,7 +142,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
* stream reference. The block group index will be one greater than this.
* @return
*/
- private int currentStreamIndex() {
+ protected int currentStreamIndex() {
return (int)((position / ecChunkSize) % repConfig.getData());
}
@@ -120,9 +152,9 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
* stream if it has not been opened already.
* @return BlockInput stream to read from.
*/
- private BlockExtendedInputStream getOrOpenStream() {
- int ind = currentStreamIndex();
- BlockExtendedInputStream stream = blockStreams[ind];
+ protected BlockExtendedInputStream getOrOpenStream(
+ int streamIndex, int locationIndex) {
+ BlockExtendedInputStream stream = blockStreams[streamIndex];
if (stream == null) {
// To read an EC block, we create a STANDALONE pipeline that contains the
// single location for the block index we want to read. The EC blocks are
@@ -131,14 +163,14 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
Pipeline pipeline = Pipeline.newBuilder()
.setReplicationConfig(new StandaloneReplicationConfig(
HddsProtos.ReplicationFactor.ONE))
- .setNodes(Arrays.asList(dataLocations[ind]))
+ .setNodes(Arrays.asList(dataLocations[locationIndex]))
.setId(PipelineID.randomId())
.setState(Pipeline.PipelineState.CLOSED)
.build();
OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
.setBlockID(blockInfo.getBlockID())
- .setLength(internalBlockLength(ind+1))
+ .setLength(internalBlockLength(locationIndex+1))
.setPipeline(blockInfo.getPipeline())
.setToken(blockInfo.getToken())
.setPartNumber(blockInfo.getPartNumber())
@@ -148,7 +180,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
blkInfo, pipeline,
blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
refreshFunction);
- blockStreams[ind] = stream;
+ blockStreams[streamIndex] = stream;
}
return stream;
}
@@ -160,12 +192,19 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
* @param index - Index number of the internal block, starting from 1
* @return
*/
- private long internalBlockLength(int index) {
+ protected long internalBlockLength(int index) {
long lastStripe = blockInfo.getLength() % stripeSize;
long blockSize = (blockInfo.getLength() - lastStripe) /
repConfig.getData();
long lastCell = lastStripe / ecChunkSize + 1;
long lastCellLength = lastStripe % ecChunkSize;
+ if (index > repConfig.getData()) {
+ // Its a parity block and their size is driven by the size of the
+ // first block of the block group. All parity blocks have the same size
+ // as block_1.
+ index = 1;
+ }
+
if (index < lastCell) {
return blockSize + ecChunkSize;
} else if (index == lastCell) {
@@ -187,18 +226,14 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
throw new IndexOutOfBoundsException("The index " + index + " is greater "
+ "than the EC Replication Config (" + repConfig + ")");
}
- if (index <= repConfig.getData()) {
- dataLocations[index - 1] = location;
- } else {
- parityLocations[index - repConfig.getData() - 1] = location;
- }
+ dataLocations[index - 1] = location;
}
- private long blockLength() {
+ protected long blockLength() {
return blockInfo.getLength();
}
- private long remaining() {
+ protected long remaining() {
return blockLength() - position;
}
@@ -222,7 +257,9 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
int totalRead = 0;
while(strategy.getTargetLength() > 0 && remaining() > 0) {
- BlockExtendedInputStream stream = getOrOpenStream();
+ int currentIndex = currentStreamIndex();
+ BlockExtendedInputStream stream =
+ getOrOpenStream(currentIndex, currentIndex);
int read = readFromStream(stream, strategy);
totalRead += read;
position += read;
@@ -349,6 +386,10 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
return position;
}
+ protected synchronized void setPos(long pos) {
+ position = pos;
+ }
+
@Override
public synchronized boolean seekToNewSource(long l) throws IOException {
return false;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
new file mode 100644
index 0000000..674c1c6
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.ozone.erasurecode.CodecRegistry;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+
+/**
+ * Class to read EC encoded data from blocks a stripe at a time, when some of
+ * the data blocks are not available. The public API for this class is:
+ *
+ * readStripe(ByteBuffer[] bufs)
+ *
+ * The other inherited public APIs will throw a NotImplementedException. This
is
+ * because this class is intended to only read full stripes into a reusable set
+ * of bytebuffers, and the tradition read APIs do not facilitate this.
+ *
+ * The caller should pass an array of ByteBuffers to readStripe() which:
+ *
+ * 1. Have EC DataNum buffers in the array.
+ * 2. Each buffer should have its position set to zero
+ * 3. Each buffer should have ecChunkSize remaining
+ *
+ * These buffers are either read into directly from the data blocks on the
+ * datanodes, or they will be reconstructed from parity data using the EC
+ * decoder.
+ *
+ * The EC Decoder expects to receive an array of elements matching EC Data + EC
+ * Parity elements long. Missing or not needed elements should be set to null
+ * in the array. The elements should be assigned to the array in EC index
order.
+ *
+ * Assuming we have n missing data locations, where n <= parity locations, the
+ * ByteBuffers passed in from the client are either assigned to the decoder
+ * input array, or they are assigned to the decoder output array, where
+ * reconstructed data is written. The required number of parity buffers will be
+ * assigned and added to the decoder input so it has sufficient locations to
+ * reconstruct the data. After reconstruction the byte buffers received will
+ * have the data for a full stripe populated, either by reading directly from
+ * the block or by reconstructing the data.
+ *
+ * The buffers are returned "ready to read" with the position at zero and
+ * remaining() indicating how much data was read. If the remaining data is less
+ * than a full stripe, the client can simply read upto remaining from each
+ * buffer in turn. If there is a full stripe, each buffer should have ecChunk
+ * size remaining.
+ */
+public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ECBlockReconstructedStripeInputStream.class);
+
+ // List of buffers, data + parity long, needed by the EC decoder. Missing
+ // or not-need locations will be null.
+ private ByteBuffer[] decoderInputBuffers;
+ // Missing chunks are recovered into these buffers.
+ private ByteBuffer[] decoderOutputBuffers;
+ // Missing indexes to be recovered into the recovered buffers. Required by
the
+ // EC decoder
+ private int[] missingIndexes;
+ // The blockLocation indexes to use to read data into the dataBuffers.
+ private List<Integer> dataIndexes = new ArrayList<>();
+
+ private final RawErasureDecoder decoder;
+
+ private boolean initialized = false;
+
+ public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ XceiverClientFactory xceiverClientFactory, Function<BlockID,
+ Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+ super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
+ refreshFunction, streamFactory);
+
+ decoder = CodecRegistry.getInstance()
+ .getCodecFactory(repConfig.getCodec().toString())
+ .createDecoder(repConfig);
+ }
+
+ protected void init() throws InsufficientLocationsException {
+ if (!hasSufficientLocations()) {
+ throw new InsufficientLocationsException("There are not enough " +
+ "datanodes to read the EC block");
+ }
+
+ ECReplicationConfig repConfig = getRepConfig();
+ // The EC decoder needs an array data+parity long, with missing or not
+ // needed indexes set to null.
+ decoderInputBuffers = new ByteBuffer[
+ getRepConfig().getData() + getRepConfig().getParity()];
+ DatanodeDetails[] locations = getDataLocations();
+ setMissingIndexesAndDataLocations(locations);
+ List<Integer> parityIndexes =
+ selectParityIndexes(locations, missingIndexes.length);
+ // We read from the selected parity blocks, so add them to the data
indexes.
+ dataIndexes.addAll(parityIndexes);
+ // The decoder inputs originally start as all nulls. Then we populate the
+ // pieces we have data for. The parity buffers are reused for the block
+ // so we can allocated them now.
+ for (Integer i : parityIndexes) {
+ decoderInputBuffers[i] = allocateBuffer(repConfig);
+ }
+ decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
+ initialized = true;
+ }
+
+ /**
+ * Determine which indexes are missing, taking into account the length of the
+ * block. For a block shorter than a full EC stripe, it is expected that
+ * some of the data locations will not be present.
+ * Populates the missingIndex and dataIndexes instance variables.
+ * @param locations Available locations for the block group
+ */
+ private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
+ ECReplicationConfig repConfig = getRepConfig();
+ int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+ List<Integer> missingInd = new ArrayList<>();
+ for (int i = 0; i < repConfig.getData(); i++) {
+ if (locations[i] == null && i < expectedDataBlocks) {
+ missingInd.add(i);
+ } else if (locations[i] != null) {
+ dataIndexes.add(i);
+ }
+ }
+ missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+ }
+
+ private void assignBuffers(ByteBuffer[] bufs) {
+ ECReplicationConfig repConfig = getRepConfig();
+ Preconditions.assertTrue(bufs.length == repConfig.getData());
+ int recoveryIndex = 0;
+ // Here bufs come from the caller and will be filled with data read from
+ // the blocks or recovered. Therefore, if the index is missing, we assign
+ // the buffer to the decoder outputs, where data is recovered via EC
+ // decoding. Otherwise the buffer is set to the input. Note, it may be a
+ // buffer which needs padded.
+ for (int i = 0; i < repConfig.getData(); i++) {
+ if (isMissingIndex(i)) {
+ decoderOutputBuffers[recoveryIndex++] = bufs[i];
+ } else {
+ decoderInputBuffers[i] = bufs[i];
+ }
+ }
+ }
+
+ private boolean isMissingIndex(int ind) {
+ for (int i : missingIndexes) {
+ if (i == ind) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * This method should be passed a list of byteBuffers which must contain EC
+ * Data Number entries. Each Bytebuffer should be at position 0 and have EC
+ * ChunkSize bytes remaining. After returning, the buffers will contain the
+ * data for the next stripe in the block. The buffers will be returned
+ * "ready to read" with their position set to zero and the limit set
+ * according to how much data they contain.
+ *
+ * @param bufs A list of byteBuffers which must contain EC Data Number
+ * entries. Each Bytebuffer should be at position 0 and have
+ * EC ChunkSize bytes remaining.
+ *
+ * @return The number of bytes read
+ * @throws IOException
+ */
+ public synchronized int readStripe(ByteBuffer[] bufs) throws IOException {
+ if (!initialized) {
+ init();
+ }
+ int toRead = (int)Math.min(getRemaining(), getStripeSize());
+ if (toRead == 0) {
+ return EOF;
+ }
+ validateBuffers(bufs);
+ assignBuffers(bufs);
+ clearParityBuffers();
+ loadDataBuffersFromStream();
+ padBuffers(toRead);
+ flipInputs();
+ decodeStripe();
+ unPadBuffers(bufs, toRead);
+ setPos(getPos() + toRead);
+ return toRead;
+ }
+
+ private void validateBuffers(ByteBuffer[] bufs) {
+ Preconditions.assertTrue(bufs.length == getRepConfig().getData());
+ int chunkSize = getRepConfig().getEcChunkSize();
+ for (ByteBuffer b : bufs) {
+ Preconditions.assertTrue(b.remaining() == chunkSize);
+ }
+ }
+
+ private void padBuffers(int toRead) {
+ int dataNum = getRepConfig().getData();
+ int parityNum = getRepConfig().getParity();
+ int chunkSize = getRepConfig().getEcChunkSize();
+ int fullChunks = toRead / chunkSize;
+ if (fullChunks == dataNum) {
+ // There is no padding to do - we are reading a full stripe.
+ return;
+ }
+ // The size of each chunk is governed by the size of the first chunk.
+ // The parity always matches the first chunk size.
+ int paritySize = Math.min(toRead, chunkSize);
+ // We never need to pad the first chunk - its length dictates the length
+ // of all others.
+ fullChunks = Math.max(1, fullChunks);
+ for (int i = fullChunks; i < dataNum; i++) {
+ ByteBuffer buf = decoderInputBuffers[i];
+ if (buf != null) {
+ buf.limit(paritySize);
+ zeroFill(buf);
+ }
+ }
+ // Ensure the available parity buffers are the expected length
+ for (int i = dataNum; i < dataNum + parityNum; i++) {
+ ByteBuffer b = decoderInputBuffers[i];
+ if (b != null) {
+ Preconditions.assertTrue(b.position() == paritySize);
+ }
+ }
+ // The output buffers need their limit set to the parity size
+ for (ByteBuffer b : decoderOutputBuffers) {
+ b.limit(paritySize);
+ }
+ }
+
+ private void unPadBuffers(ByteBuffer[] bufs, int toRead) {
+ int chunkSize = getRepConfig().getEcChunkSize();
+ int fullChunks = toRead / chunkSize;
+ int remainingLength = toRead % chunkSize;
+ if (fullChunks == getRepConfig().getData()) {
+ // We are reading a full stripe, no concerns over padding.
+ return;
+ }
+
+ if (fullChunks == 0){
+ // All buffers except the first contain no data.
+ for (int i = 1; i < bufs.length; i++) {
+ bufs[i].position(0);
+ bufs[i].limit(0);
+ }
+ } else {
+ // The first partial has the remaining length
+ bufs[fullChunks].limit(remainingLength);
+ // All others have a zero limit
+ for (int i = fullChunks + 1; i < bufs.length; i++) {
+ bufs[i].position(0);
+ bufs[i].limit(0);
+ }
+ }
+ }
+
+ private void zeroFill(ByteBuffer buf) {
+ // fill with zeros from pos to limit.
+ if (buf.hasArray()) {
+ byte[] a = buf.array();
+ Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
+ buf.position(buf.limit());
+ } else {
+ while (buf.hasRemaining()) {
+ buf.put((byte)0);
+ }
+ }
+ }
+
+ /**
+ * Take the parity indexes which are available, shuffle them and truncate the
+ * list to the number of required parity chunks.
+ * @param locations The list of locations for all blocks in the block group/
+ * @param numRequired The number of parity chunks needed for reconstruction
+ * @return A list of indexes indicating which parity locations to read.
+ */
+ private List<Integer> selectParityIndexes(
+ DatanodeDetails[] locations, int numRequired) {
+ List<Integer> indexes = new ArrayList<>();
+ ECReplicationConfig repConfig = getRepConfig();
+ for (int i = repConfig.getData();
+ i < repConfig.getParity() + repConfig.getData(); i++) {
+ if (locations[i] != null) {
+ indexes.add(i);
+ }
+ }
+ Preconditions.assertTrue(indexes.size() >= numRequired);
+ Random rand = new Random();
+ while (indexes.size() > numRequired) {
+ indexes.remove(rand.nextInt(indexes.size()));
+ }
+ return indexes;
+ }
+
+ private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
+ ByteBuffer buf = ByteBuffer.allocate(repConfig.getEcChunkSize());
+ return buf;
+ }
+
+ private void flipInputs() {
+ for (ByteBuffer b : decoderInputBuffers) {
+ if (b != null) {
+ b.flip();
+ }
+ }
+ }
+
+ private void clearParityBuffers() {
+ for (int i = getRepConfig().getData();
+ i < getRepConfig().getRequiredNodes(); i++) {
+ if (decoderInputBuffers[i] != null) {
+ decoderInputBuffers[i].clear();
+ }
+ }
+ }
+
+ protected void loadDataBuffersFromStream() throws IOException {
+ for (int i = 0; i < dataIndexes.size(); i++) {
+ BlockExtendedInputStream stream =
+ getOrOpenStream(i, dataIndexes.get(i));
+ ByteBuffer b = decoderInputBuffers[dataIndexes.get(i)];
+ while (b.hasRemaining()) {
+ int read = stream.read(b);
+ if (read == EOF) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Take the populated input buffers and missing indexes and create the
+ * outputs. Note that the input buffers have to be "ready for read", ie they
+ * need to have been flipped after their data was loaded. The created outputs
+ * are "ready to read" by the underlying decoder API, so there is no need to
+ * flip them after the call. The decoder reads all the inputs leaving the
+ * buffer position at the end, so the inputs are flipped after the decode so
+ * we have a complete set of "outputs" for the EC Stripe which are ready to
+ * read.
+ * @throws IOException
+ */
+ private void decodeStripe() throws IOException {
+ decoder.decode(decoderInputBuffers, missingIndexes, decoderOutputBuffers);
+ flipInputs();
+ }
+
+ @Override
+ public synchronized boolean hasSufficientLocations() {
+ // The number of locations needed is a function of the EC Chunk size. If
the
+ // block length is <= the chunk size, we should only have one data
location.
+ // If it is greater than the chunk size but less than chunk_size * 2, then
+ // we must have two locations. If it is greater than chunk_size * data_num,
+ // then we must have all data_num locations.
+ // The remaining data locations (for small block lengths) can be assumed to
+ // be all zeros.
+ // Then we need a total of dataNum blocks available across the available
+ // data, parity and padding blocks.
+ ECReplicationConfig repConfig = getRepConfig();
+ int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+ int availableLocations =
+ availableDataLocations() + availableParityLocations();
+ int paddedLocations = repConfig.getData() - expectedDataBlocks;
+
+ if (availableLocations + paddedLocations >= repConfig.getData()) {
+ return true;
+ } else {
+ LOG.warn("There are insufficient locations. {} available {} padded {} " +
+ "expected", availableLocations, paddedLocations, expectedDataBlocks);
+ return false;
+ }
+ }
+
+ @Override
+ protected int readWithStrategy(ByteReaderStrategy strategy) {
+ throw new NotImplementedException("readWithStrategy is not implemented. " +
+ "Use readStripe() instead");
+ }
+
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
new file mode 100644
index 0000000..956ed90
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown by EC Input Streams if there are not enough locations to
+ * read the EC data successfully.
+ */
+public class InsufficientLocationsException extends IOException {
+
+ public InsufficientLocationsException() {
+ super();
+ }
+
+ public InsufficientLocationsException(String message) {
+ super(message);
+ }
+
+ public InsufficientLocationsException(String message, Throwable ex) {
+ super(message, ex);
+ }
+
+ public InsufficientLocationsException(Throwable ex) {
+ super(ex);
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index aa22a0b..1098eb4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -63,7 +63,6 @@ public class TestECBlockInputStream {
}
@Test
- // TODO - this test will need changed when we can do recovery reads.
public void testSufficientLocations() {
// EC-3-2, 5MB block, so all 3 data locations are needed
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
@@ -89,15 +88,6 @@ public class TestECBlockInputStream {
Assert.assertTrue(ecb.hasSufficientLocations());
}
- // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
- dnMap.clear();
- dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
- keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
- try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
- Assert.assertFalse(ecb.hasSufficientLocations());
- }
-
// EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
// locations.
dnMap.clear();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
new file mode 100644
index 0000000..172aafe
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ozone.erasurecode.CodecRegistry;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+/**
+ * Test for the ECBlockReconstructedStripeInputStream.
+ */
+public class TestECBlockReconstructedStripeInputStream {
+
+
+ private static final int ONEMB = 1024 * 1024;
+
+ private ECReplicationConfig repConfig;
+ private TestBlockInputStreamFactory streamFactory;
+
+ @Before
+ public void setup() {
+ repConfig = new ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, ONEMB);
+ streamFactory = new TestBlockInputStreamFactory();
+ }
+
+ @Test
+ public void testSufficientLocations() {
+ // One chunk, only 1 location.
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 1, ONEMB);
+ try (ECBlockInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig,
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ Assert.assertTrue(ecb.hasSufficientLocations());
+ }
+
+ Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+ // Two Chunks, but missing data block 2.
+ dnMap = createIndexMap(1, 4, 5);
+ keyInfo = createKeyInfo(repConfig, ONEMB * 2, dnMap);
+ try (ECBlockInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig,
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ Assert.assertTrue(ecb.hasSufficientLocations());
+ }
+
+ // Three Chunks, but missing data block 2 and 3.
+ dnMap = createIndexMap(1, 4, 5);
+ keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+ try (ECBlockInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig,
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ Assert.assertTrue(ecb.hasSufficientLocations());
+ }
+
+ // Three Chunks, but missing data block 2 and 3 and parity 1.
+ dnMap = createIndexMap(1, 4);
+ keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+ try (ECBlockInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig,
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ Assert.assertFalse(ecb.hasSufficientLocations());
+ }
+ }
+
+ @Test
+ public void testReadFullStripesWithPartial() throws IOException {
+ // Generate the input data for 3 full stripes and generate the parity.
+ int chunkSize = repConfig.getEcChunkSize();
+ int partialStripeSize = chunkSize * 2 - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 *
chunkSize);
+ dataBufs[1].limit(4 * chunkSize - 1);
+ dataBufs[2].limit(3 * chunkSize);
+ for (ByteBuffer b : dataBufs) {
+ randomFill(b);
+ }
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+ List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+ // Two data missing
+ locations.add(createIndexMap(1, 4, 5));
+ // One data missing
+ locations.add(createIndexMap(1, 2, 4, 5));
+ // Two data missing including first
+ locations.add(createIndexMap(2, 4, 5));
+ // One data and one parity missing
+ locations.add(createIndexMap(2, 3, 4));
+
+ for (Map<DatanodeDetails, Integer> dnMap : locations) {
+ streamFactory = new TestBlockInputStreamFactory();
+ addDataStreamsToFactory(dataBufs, parity);
+
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ stripeSize() * 3 + partialStripeSize, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory)) {
+ // Read 3 full stripes
+ for (int i = 0; i < 3; i++) {
+ int read = ecb.readStripe(bufs);
+ for (int j = 0; j < bufs.length; j++) {
+ validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
+ }
+ Assert.assertEquals(stripeSize(), read);
+
+ // Check the underlying streams have read 1 chunk per read:
+ for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+ Assert.assertEquals(chunkSize * (i + 1),
+ bis.getPos());
+ }
+ Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
+ clearBuffers(bufs);
+ }
+ // The next read is a partial stripe
+ int read = ecb.readStripe(bufs);
+ Assert.assertEquals(partialStripeSize, read);
+ validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
+ validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+ Assert.assertEquals(0, bufs[2].remaining());
+ Assert.assertEquals(0, bufs[2].position());
+
+ // A further read should give EOF
+ clearBuffers(bufs);
+ read = ecb.readStripe(bufs);
+ Assert.assertEquals(-1, read);
+ }
+ }
+ }
+
+ @Test
+ public void testReadPartialStripe() throws IOException {
+ int blockLength = repConfig.getEcChunkSize() - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+ // First buffer has only the blockLength, the other two will have no data.
+ dataBufs[0].limit(blockLength);
+ dataBufs[1].limit(0);
+ dataBufs[2].limit(0);
+ for (ByteBuffer b : dataBufs) {
+ randomFill(b);
+ }
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ // We have a length that is less than a single chunk, so blocks 2 and 3
+ // are padding and will not be present. Block 1 is lost and needs recovered
+ // from the parity and padded blocks 2 and 3.
+ Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+ OmKeyLocationInfo keyInfo =
+ createKeyInfo(repConfig, blockLength, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory)) {
+ int read = ecb.readStripe(bufs);
+ Assert.assertEquals(blockLength, read);
+ validateContents(dataBufs[0], bufs[0], 0, blockLength);
+ Assert.assertEquals(0, bufs[1].remaining());
+ Assert.assertEquals(0, bufs[1].position());
+ Assert.assertEquals(0, bufs[2].remaining());
+ Assert.assertEquals(0, bufs[2].position());
+ // Check the underlying streams have been advanced by 1 blockLength:
+ for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+ Assert.assertEquals(blockLength, bis.getPos());
+ }
+ Assert.assertEquals(ecb.getPos(), blockLength);
+ clearBuffers(bufs);
+ // A further read should give EOF
+ read = ecb.readStripe(bufs);
+ Assert.assertEquals(-1, read);
+ }
+ }
+
+ @Test
+ public void testReadPartialStripeTwoChunks() throws IOException {
+ int chunkSize = repConfig.getEcChunkSize();
+ int blockLength = chunkSize * 2 - 1;
+
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+ // First buffer has only the blockLength, the other two will have no data.
+ dataBufs[0].limit(chunkSize);
+ dataBufs[1].limit(chunkSize - 1);
+ dataBufs[2].limit(0);
+ for (ByteBuffer b : dataBufs) {
+ randomFill(b);
+ }
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ // We have a length that is less than a single chunk, so blocks 2 and 3
+ // are padding and will not be present. Block 1 is lost and needs recovered
+ // from the parity and padded blocks 2 and 3.
+ Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+ OmKeyLocationInfo keyInfo =
+ createKeyInfo(repConfig, blockLength, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory)) {
+ int read = ecb.readStripe(bufs);
+ Assert.assertEquals(blockLength, read);
+ validateContents(dataBufs[0], bufs[0], 0, chunkSize);
+ validateContents(dataBufs[1], bufs[1], 0, chunkSize - 1);
+ Assert.assertEquals(0, bufs[2].remaining());
+ Assert.assertEquals(0, bufs[2].position());
+ // Check the underlying streams have been advanced by 1 chunk:
+ for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+ Assert.assertEquals(chunkSize, bis.getPos());
+ }
+ Assert.assertEquals(ecb.getPos(), blockLength);
+ clearBuffers(bufs);
+ // A further read should give EOF
+ read = ecb.readStripe(bufs);
+ Assert.assertEquals(-1, read);
+ }
+ }
+
+ @Test
+ public void testReadPartialStripeThreeChunks() throws IOException {
+ int chunkSize = repConfig.getEcChunkSize();
+ int blockLength = chunkSize * 3 - 1;
+
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+ // First buffer has only the blockLength, the other two will have no data.
+ dataBufs[0].limit(chunkSize);
+ dataBufs[1].limit(chunkSize);
+ dataBufs[2].limit(chunkSize - 1);
+ for (ByteBuffer b : dataBufs) {
+ randomFill(b);
+ }
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+ // We have a length that is less than a stripe, so chunks 1 and 2 are full.
+ // Block 1 is lost and needs recovered
+ // from the parity and padded blocks 2 and 3.
+
+ List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+ // Two data missing
+ locations.add(createIndexMap(3, 4, 5));
+ // Two data missing
+ locations.add(createIndexMap(1, 4, 5));
+ // One data missing - the last one
+ locations.add(createIndexMap(1, 2, 5));
+ // One data and one parity missing
+ locations.add(createIndexMap(2, 3, 4));
+ // One data and one parity missing
+ locations.add(createIndexMap(1, 2, 4));
+
+ for (Map<DatanodeDetails, Integer> dnMap : locations) {
+ streamFactory = new TestBlockInputStreamFactory();
+ addDataStreamsToFactory(dataBufs, parity);
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+
+ OmKeyLocationInfo keyInfo =
+ createKeyInfo(repConfig, blockLength, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory)) {
+ int read = ecb.readStripe(bufs);
+ Assert.assertEquals(blockLength, read);
+ validateContents(dataBufs[0], bufs[0], 0, chunkSize);
+ validateContents(dataBufs[1], bufs[1], 0, chunkSize);
+ validateContents(dataBufs[2], bufs[2], 0, chunkSize - 1);
+ // Check the underlying streams have been advanced by 1 chunk:
+ for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+ Assert.assertEquals(0, bis.getRemaining());
+ }
+ Assert.assertEquals(ecb.getPos(), blockLength);
+ clearBuffers(bufs);
+ // A further read should give EOF
+ read = ecb.readStripe(bufs);
+ Assert.assertEquals(-1, read);
+ }
+ }
+ }
+
+ private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity)
{
+ List<ByteBuffer> dataStreams = new ArrayList<>();
+ for (ByteBuffer b : data) {
+ dataStreams.add(b);
+ }
+ for (ByteBuffer b : parity) {
+ dataStreams.add(b);
+ }
+ streamFactory.setBlockStreamData(dataStreams);
+ }
+
+ /**
+ * Validates that the data buffer has the same contents as the source buffer,
+ * starting the checks in the src at offset and for count bytes.
+ * @param src The source of the data
+ * @param data The data which should be checked against the source
+ * @param offset The starting point in the src buffer
+ * @param count How many bytes to check.
+ */
+ private void validateContents(ByteBuffer src, ByteBuffer data, int offset,
+ int count) {
+ byte[] srcArray = src.array();
+ Assert.assertEquals(count, data.remaining());
+ for (int i = offset; i < offset + count; i++) {
+ Assert.assertEquals("Element " + i, srcArray[i], data.get());
+ }
+ data.flip();
+ }
+
+ /**
+ * Returns a new map containing a random DatanodeDetails for each index in
+ * inputs.
+ * @param idxs A list of indexes to add to the map
+ * @return A map of DatanodeDetails to index.
+ */
+ private Map<DatanodeDetails, Integer> createIndexMap(int... idxs) {
+ Map<DatanodeDetails, Integer> map = new HashMap<>();
+ for (int i : idxs) {
+ map.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+ }
+ return map;
+ }
+
+ /**
+ * Given a set of data buffers, generate the parity data for the inputs.
+ * @param data A set of data buffers
+ * @param ecConfig The ECReplicationConfig representing the scheme
+ * @return
+ * @throws IOException
+ */
+ private ByteBuffer[] generateParity(ByteBuffer[] data,
+ ECReplicationConfig ecConfig) throws IOException {
+ // First data buffer dictates the size
+ int cellSize = data[0].limit();
+ // Store the positions of the remaining data buffers so we can restore them
+ int[] dataLimits = new int[data.length];
+ for (int i=1; i<data.length; i++) {
+ dataLimits[i] = data[i].limit();
+ data[i].limit(cellSize);
+ zeroFill(data[i]);
+ data[i].flip();
+ }
+ ByteBuffer[] parity = new ByteBuffer[ecConfig.getParity()];
+ for (int i = 0; i < ecConfig.getParity(); i++) {
+ parity[i] = ByteBuffer.allocate(cellSize);
+ }
+ RawErasureEncoder encoder = CodecRegistry.getInstance()
+ .getCodecFactory(repConfig.getCodec().toString())
+ .createEncoder(repConfig);
+ encoder.encode(data, parity);
+
+ data[0].flip();
+ for (int i = 1; i < data.length; i++) {
+ data[i].limit(dataLimits[i]);
+ data[i].position(0);
+ }
+ return parity;
+ }
+
+ /**
+ * Fill the remaining space in a buffer random bytes.
+ * @param buf
+ */
+ private void randomFill(ByteBuffer buf) {
+ while (buf.hasRemaining()) {
+ buf.put((byte)ThreadLocalRandom.current().nextInt(255));
+ }
+ buf.flip();
+ }
+
+ /**
+ * Fill / Pad the remaining space in a buffer with zeros.
+ * @param buf
+ */
+ private void zeroFill(ByteBuffer buf) {
+ byte[] a = buf.array();
+ Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
+ buf.position(buf.limit());
+ }
+
+ /**
+ * Return a list of num ByteBuffers of the given size.
+ * @param num Number of buffers to create
+ * @param size The size of each buffer
+ * @return
+ */
+ private ByteBuffer[] allocateBuffers(int num, int size) {
+ ByteBuffer[] bufs = new ByteBuffer[num];
+ for (int i = 0; i < num; i++) {
+ bufs[i] = ByteBuffer.allocate(size);
+ }
+ return bufs;
+ }
+
+ private int stripeSize() {
+ return stripeSize(repConfig);
+ }
+
+ private int stripeSize(ECReplicationConfig rconfig) {
+ return rconfig.getEcChunkSize() * rconfig.getData();
+ }
+
+ private void clearBuffers(ByteBuffer[] bufs) {
+ for (ByteBuffer b : bufs) {
+ b.clear();
+ }
+ }
+
+ private ByteBuffer[] allocateByteBuffers(ECReplicationConfig rConfig) {
+ ByteBuffer[] bufs = new ByteBuffer[repConfig.getData()];
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i] = ByteBuffer.allocate(rConfig.getEcChunkSize());
+ }
+ return bufs;
+ }
+
+ private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setReplicationConfig(repConf)
+ .build();
+
+ OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(new BlockID(1, 1))
+ .setLength(blockLength)
+ .setOffset(0)
+ .setPipeline(pipeline)
+ .setPartNumber(0)
+ .build();
+ return keyInfo;
+ }
+
+ private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ int nodeCount, long blockLength) {
+ Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+ for (int i = 0; i < nodeCount; i++) {
+ datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+ }
+ return createKeyInfo(repConf, blockLength, datanodes);
+ }
+
+ private static class TestBlockInputStreamFactory implements
+ BlockInputStreamFactory {
+
+ private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+ private List<ByteBuffer> blockStreamData;
+
+ private Pipeline currentPipeline;
+
+ public List<TestBlockInputStream> getBlockStreams() {
+ return blockStreams;
+ }
+
+ public void setBlockStreamData(List<ByteBuffer> bufs) {
+ this.blockStreamData = bufs;
+ }
+
+ public void setCurrentPipeline(Pipeline pipeline) {
+ this.currentPipeline = pipeline;
+ }
+
+ public BlockExtendedInputStream create(ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ XceiverClientFactory xceiverFactory,
+ Function<BlockID, Pipeline> refreshFunction) {
+
+ int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
+ TestBlockInputStream stream = new TestBlockInputStream(
+ blockInfo.getBlockID(), blockInfo.getLength(),
+ blockStreamData.get(repInd -1));
+ blockStreams.add(stream);
+ return stream;
+ }
+ }
+
+ private static class TestBlockInputStream extends BlockExtendedInputStream {
+
+ private ByteBuffer data;
+ private boolean closed = false;
+ private BlockID blockID;
+ private long length;
+ private static final byte EOF = -1;
+
+ TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
+ this.blockID = blockId;
+ this.length = blockLen;
+ this.data = data;
+ data.position(0);
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public BlockID getBlockID() {
+ return blockID;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public long getRemaining() {
+ return data.remaining();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ throws IOException {
+ return read(ByteBuffer.wrap(b, off, len));
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (getRemaining() == 0) {
+ return EOF;
+ }
+ int toRead = Math.min(buf.remaining(), (int)getRemaining());
+ for (int i = 0; i < toRead; i++) {
+ buf.put(data.get());
+ }
+ return toRead;
+ };
+
+ @Override
+ protected int readWithStrategy(ByteReaderStrategy strategy) throws
+ IOException {
+ throw new IOException("Should not be called");
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ @Override
+ public void unbuffer() {
+ }
+
+ @Override
+ public long getPos() {
+ return data.position();
+ }
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]