This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 685442c HDDS-6001. EC: Create ECBlockReconstructedInputStream to wrap
ECBlockReconstructedStripeInputStream (#2848)
685442c is described below
commit 685442c6b84b1372854c2a93f1779258648ed4a8
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Nov 29 10:49:46 2021 +0000
HDDS-6001. EC: Create ECBlockReconstructedInputStream to wrap
ECBlockReconstructedStripeInputStream (#2848)
---
.../client/io/ECBlockReconstructedInputStream.java | 205 ++++++++++
.../ozone/client/rpc/read/ECStreamTestUtil.java | 339 +++++++++++++++++
.../client/rpc/read/TestECBlockInputStream.java | 82 ++--
.../read/TestECBlockReconstructedInputStream.java | 310 ++++++++++++++++
.../TestECBlockReconstructedStripeInputStream.java | 413 +++++----------------
5 files changed, 972 insertions(+), 377 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
new file mode 100644
index 0000000..6538458
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Input stream which wraps a ECBlockReconstructedStripeInputStream to allow
+ * a EC Block to be read via the traditional InputStream read methods.
+ */
+public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
+
+ private ECReplicationConfig repConfig;
+ private ECBlockReconstructedStripeInputStream stripeReader;
+ private ByteBuffer[] bufs;
+ private boolean closed = false;
+
+ private long position = 0;
+
+ public ECBlockReconstructedInputStream(ECReplicationConfig repConfig,
+ ECBlockReconstructedStripeInputStream stripeReader) {
+ this.repConfig = repConfig;
+ this.stripeReader = stripeReader;
+
+ allocateBuffers();
+ }
+
+ @Override
+ public synchronized BlockID getBlockID() {
+ return stripeReader.getBlockID();
+ }
+
+ @Override
+ public synchronized long getRemaining() {
+ return getLength() - position;
+ }
+
+ @Override
+ public synchronized long getLength() {
+ return stripeReader.getLength();
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len)
+ throws IOException {
+ return read(ByteBuffer.wrap(b, off, len));
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ ensureNotClosed();
+ if (!hasRemaining()) {
+ return EOF;
+ }
+ int totalRead = 0;
+ while (buf.hasRemaining() && getRemaining() > 0) {
+ ByteBuffer b = selectNextBuffer();
+ if (b == null) {
+ // This should not happen, so if it does abort.
+ throw new IOException(getRemaining() + " bytes remaining but unable " +
+ "to select a buffer with data");
+ }
+ long read = readBufferToDest(b, buf);
+ totalRead += read;
+ }
+ return totalRead;
+ }
+
+ private void ensureNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException("The input stream is closed");
+ }
+ }
+
+ private ByteBuffer selectNextBuffer() throws IOException {
+ for (ByteBuffer b : bufs) {
+ if (b.hasRemaining()) {
+ return b;
+ }
+ }
+ // If we get here, then no buffer has any remaining, so we need to
+ // fill them.
+ long read = readStripe();
+ if (read == EOF) {
+ return null;
+ }
+ return selectNextBuffer();
+ }
+
+ private long readBufferToDest(ByteBuffer src, ByteBuffer dest) {
+ int initialRemaining = dest.remaining();
+ while(dest.hasRemaining() && src.hasRemaining()) {
+ dest.put(src.get());
+ }
+ int read = initialRemaining - dest.remaining();
+ position += read;
+ return read;
+ }
+
+ @Override
+ protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+ throws IOException {
+ throw new IOException("Not Implemented");
+ }
+
+ @Override
+ public synchronized void unbuffer() {
+ stripeReader.unbuffer();
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ stripeReader.close();
+ closed = true;
+ }
+
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ ensureNotClosed();
+ if (pos < 0 || pos >= getLength()) {
+ if (pos == 0) {
+ // It is possible for length and pos to be zero in which case
+ // seek should return instead of throwing exception
+ return;
+ }
+ throw new EOFException(
+ "EOF encountered at pos: " + pos + " for block: " + getBlockID());
+ }
+ long stripeSize = (long)repConfig.getEcChunkSize() * repConfig.getData();
+ long stripeNum = pos / stripeSize;
+ int partial = (int)(pos % stripeSize);
+ // Seek the stripe reader to the beginning of the new current stripe
+ stripeReader.seek(stripeNum * stripeSize);
+ // Now reload the data buffers and adjust their position to the partial
+ // stripe offset.
+ readAndSeekStripe(partial);
+ position = pos;
+ }
+
+ private void readAndSeekStripe(int offset) throws IOException {
+ readStripe();
+ if (offset == 0) {
+ return;
+ }
+ for (ByteBuffer b : bufs) {
+ int newPos = Math.min(b.remaining(), offset);
+ b.position(newPos);
+ offset -= newPos;
+ if (offset == 0) {
+ break;
+ }
+ }
+ }
+
+ private long readStripe() throws IOException {
+ clearBuffers();
+ return stripeReader.readStripe(bufs);
+ }
+
+ private void allocateBuffers() {
+ bufs = new ByteBuffer[repConfig.getData()];
+ for (int i = 0; i < repConfig.getData(); i++) {
+ bufs[i] = ByteBuffer.allocate(repConfig.getEcChunkSize());
+ // Initially set the limit to 0 so there is no remaining space.
+ bufs[i].limit(0);
+ }
+ }
+
+ private void clearBuffers() {
+ for (ByteBuffer b : bufs) {
+ b.clear();
+ }
+ }
+
+ private boolean hasRemaining() {
+ return getRemaining() > 0;
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
new file mode 100644
index 0000000..aabec31
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ozone.erasurecode.CodecRegistry;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.ratis.util.Preconditions;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.function.Function;
+
+/**
+ * Utility class providing methods useful in EC tests.
+ */
+public final class ECStreamTestUtil {
+
+ private ECStreamTestUtil() {
+ }
+
+ public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setReplicationConfig(repConf)
+ .build();
+
+ OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(new BlockID(1, 1))
+ .setLength(blockLength)
+ .setOffset(0)
+ .setPipeline(pipeline)
+ .setPartNumber(0)
+ .build();
+ return keyInfo;
+ }
+
+ public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ int nodeCount, long blockLength) {
+ Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+ for (int i = 0; i < nodeCount; i++) {
+ datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+ }
+ return createKeyInfo(repConf, blockLength, datanodes);
+ }
+
+ /**
+ * Fill / Pad the remaining space in a buffer with zeros.
+ * @param buf
+ */
+ public static void zeroFill(ByteBuffer buf) {
+ byte[] a = buf.array();
+ Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
+ buf.position(buf.limit());
+ }
+
+ /**
+ * Given a List of ByteBuffers, write length of random bytes from the given
+ * Random generator to the byte buffers. The data is striped across the
+ * buffers in stripeSize chunks.
+ * When the length of data has been written, the buffer limits are set to
+ * their final positions.
+ *
+ * @param bufs The list of buffers to fill with random data
+ * @param stripeSize The stripe size to use
+ * @param rand The random generator to use
+ * @param length The length of data to write.
+ */
+ public static void randomFill(ByteBuffer[] bufs, int stripeSize,
+ SplittableRandom rand, int length) {
+ Preconditions.assertTrue(totalSpaceAvailable(bufs) >= length);
+ int remaining = length;
+ while(remaining > 0) {
+ for (ByteBuffer b : bufs) {
+ int toWrite = Math.min(stripeSize, remaining);
+ for (int i = 0; i < toWrite; i++) {
+ b.put((byte) rand.nextInt(255));
+ }
+ remaining -= toWrite;
+ }
+ }
+ // Set the buffer limits to the final position
+ for (ByteBuffer b : bufs) {
+ b.limit(b.position());
+ }
+ }
+
+ private static int totalSpaceAvailable(ByteBuffer[] bufs) {
+ int space = 0;
+ for (ByteBuffer b : bufs) {
+ space += b.remaining();
+ }
+ return space;
+ }
+
+ /**
+ * Given a buffer which has data loaded, flip the buffer and ensure it
matches
+ * byte for byte the next series of bytes from the Random generator.
+ * @param b Byte Buffers containing data
+ * @param rand The random generator
+ */
+ public static void assertBufferMatches(ByteBuffer b, SplittableRandom rand) {
+ b.flip();
+ int i=0;
+ while (b.hasRemaining()) {
+ i++;
+ Assert.assertEquals("Failed on iteration " + i,
+ (byte)rand.nextInt(255), b.get());
+ }
+ }
+
+ /**
+ * Given a List of ByteBuffers and the RepConfig, encode the parity buffers
+ * from the data buffers. The data buffers should be passed "as is" after
+ * reading data. That is, the position will be at the last data byte read in
+ * or the buffer limit.
+ * The data buffers and parity will be returned "ready to read" with the
+ * position reset to zero.
+ * @param data List of data buffers
+ * @param ecConfig The ECReplicationConfig.
+ * @return List of encoded parity buffers.
+ * @throws IOException
+ */
+ public static ByteBuffer[] generateParity(ByteBuffer[] data,
+ ECReplicationConfig ecConfig) throws IOException {
+ // First data buffer dictates the size
+ int cellSize = data[0].limit();
+ data[0].flip();
+ // Store the positions of the remaining data buffers so we can restore them
+ int[] dataLimits = new int[data.length];
+ for (int i = 1; i < data.length; i++) {
+ dataLimits[i] = data[i].limit();
+ data[i].limit(cellSize);
+ zeroFill(data[i]);
+ data[i].flip();
+ }
+ ByteBuffer[] parity = new ByteBuffer[ecConfig.getParity()];
+ for (int i = 0; i < ecConfig.getParity(); i++) {
+ parity[i] = ByteBuffer.allocate(cellSize);
+ }
+ RawErasureEncoder encoder = CodecRegistry.getInstance()
+ .getCodecFactory(ecConfig.getCodec().toString())
+ .createEncoder(ecConfig);
+ encoder.encode(data, parity);
+
+ data[0].flip();
+ for (int i = 1; i < data.length; i++) {
+ data[i].limit(dataLimits[i]);
+ data[i].position(0);
+ }
+ return parity;
+ }
+
+ /**
+ * Returns a new map containing a random DatanodeDetails for each index in
+ * inputs.
+ * @param idxs A list of indexes to add to the map
+ * @return A map of DatanodeDetails to index.
+ */
+ public static Map<DatanodeDetails, Integer> createIndexMap(int... idxs) {
+ Map<DatanodeDetails, Integer> map = new HashMap<>();
+ for (int i : idxs) {
+ map.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+ }
+ return map;
+ }
+
+
+ /**
+ * A stream factory which can be used in tests to provide TestBlockStream
+ * instances.
+ */
+ public static class TestBlockInputStreamFactory implements
+ BlockInputStreamFactory {
+
+ private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+ private List<ByteBuffer> blockStreamData;
+
+ private Pipeline currentPipeline;
+
+ public List<ECStreamTestUtil.TestBlockInputStream> getBlockStreams() {
+ return blockStreams;
+ }
+
+ public void setBlockStreamData(List<ByteBuffer> bufs) {
+ this.blockStreamData = bufs;
+ }
+
+ public void setCurrentPipeline(Pipeline pipeline) {
+ this.currentPipeline = pipeline;
+ }
+
+ public BlockExtendedInputStream create(ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ XceiverClientFactory xceiverFactory,
+ Function<BlockID, Pipeline> refreshFunction) {
+
+ int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
+ TestBlockInputStream stream = new TestBlockInputStream(
+ blockInfo.getBlockID(), blockInfo.getLength(),
+ blockStreamData.get(repInd - 1));
+ blockStreams.add(stream);
+ return stream;
+ }
+ }
+
+ /**
+ * A block stream that returns data from the provided ByteBuffer. Intended to
+ * be used in tests, rather than reading from a real block stream.
+ */
+ public static class TestBlockInputStream extends BlockExtendedInputStream {
+
+ private ByteBuffer data;
+ private boolean closed = false;
+ private BlockID blockID;
+ private long length;
+ private boolean shouldError = false;
+ private static final byte EOF = -1;
+
+ TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
+ this.blockID = blockId;
+ this.length = blockLen;
+ this.data = data;
+ data.position(0);
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void setShouldError(boolean val) {
+ shouldError = val;
+ }
+
+ @Override
+ public BlockID getBlockID() {
+ return blockID;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public long getRemaining() {
+ return data.remaining();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ throws IOException {
+ return read(ByteBuffer.wrap(b, off, len));
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (shouldError) {
+ throw new IOException("Simulated error reading block");
+ }
+ if (getRemaining() == 0) {
+ return EOF;
+ }
+ int toRead = Math.min(buf.remaining(), (int)getRemaining());
+ for (int i = 0; i < toRead; i++) {
+ buf.put(data.get());
+ }
+ return toRead;
+ };
+
+ @Override
+ protected int readWithStrategy(ByteReaderStrategy strategy) throws
+ IOException {
+ throw new IOException("Should not be called");
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ @Override
+ public void unbuffer() {
+ }
+
+ @Override
+ public long getPos() {
+ return data.position();
+ }
+
+ @Override
+ public void seek(long pos) {
+ data.position((int)pos);
+ }
+
+ }
+
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 1098eb4..5a29626 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
@@ -65,14 +64,15 @@ public class TestECBlockInputStream {
@Test
public void testSufficientLocations() {
// EC-3-2, 5MB block, so all 3 data locations are needed
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OmKeyLocationInfo keyInfo = ECStreamTestUtil
+ .createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
// EC-3-2, very large block, so all 3 data locations are needed
- keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5000 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
@@ -82,7 +82,7 @@ public class TestECBlockInputStream {
// EC-3-2, 1 byte short of 1MB with 1 location
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
- keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB - 1, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
@@ -92,7 +92,7 @@ public class TestECBlockInputStream {
// locations.
dnMap.clear();
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
- keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertFalse(ecb.hasSufficientLocations());
@@ -104,7 +104,7 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
- keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertFalse(ecb.hasSufficientLocations());
@@ -115,7 +115,8 @@ public class TestECBlockInputStream {
public void testCorrectBlockSizePassedToBlockStreamLessThanCell()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB - 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -131,7 +132,8 @@ public class TestECBlockInputStream {
public void testCorrectBlockSizePassedToBlockStreamTwoCells()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -146,7 +148,8 @@ public class TestECBlockInputStream {
public void testCorrectBlockSizePassedToBlockStreamThreeCells()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -162,7 +165,8 @@ public class TestECBlockInputStream {
public void
testCorrectBlockSizePassedToBlockStreamThreeFullAndPartialStripe()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -178,7 +182,8 @@ public class TestECBlockInputStream {
public void testCorrectBlockSizePassedToBlockStreamSingleFullCell()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -192,7 +197,8 @@ public class TestECBlockInputStream {
public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 9 * ONEMB);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 9 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -206,7 +212,8 @@ public class TestECBlockInputStream {
@Test
public void testSimpleRead() throws IOException {
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -224,7 +231,8 @@ public class TestECBlockInputStream {
@Test
public void testReadPastEOF() throws IOException {
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 50);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 50);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -242,7 +250,8 @@ public class TestECBlockInputStream {
// EC-3-2, 5MB block, so all 3 data locations are needed
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
100);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -276,7 +285,8 @@ public class TestECBlockInputStream {
public void testSeekPastBlockLength() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
ecb.seek(1000);
@@ -287,7 +297,8 @@ public class TestECBlockInputStream {
public void testSeekToLength() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
ecb.seek(100);
@@ -298,7 +309,8 @@ public class TestECBlockInputStream {
public void testSeekToLengthZeroLengthBlock() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 0);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 0);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
ecb.seek(0);
@@ -311,7 +323,8 @@ public class TestECBlockInputStream {
public void testSeekToValidPosition() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
ecb.seek(ONEMB - 1);
@@ -342,37 +355,6 @@ public class TestECBlockInputStream {
}
}
- private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
- long blockLength, Map<DatanodeDetails, Integer> dnMap) {
-
- Pipeline pipeline = Pipeline.newBuilder()
- .setState(Pipeline.PipelineState.CLOSED)
- .setId(PipelineID.randomId())
- .setNodes(new ArrayList<>(dnMap.keySet()))
- .setReplicaIndexes(dnMap)
- .setReplicationConfig(repConf)
- .build();
-
- OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
- .setBlockID(new BlockID(1, 1))
- .setLength(blockLength)
- .setOffset(0)
- .setPipeline(pipeline)
- .setPartNumber(0)
- .build();
- return keyInfo;
- }
-
- private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
- int nodeCount, long blockLength) {
- Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
- for (int i = 0; i < nodeCount; i++) {
- datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
- }
-
- return createKeyInfo(repConf, blockLength, datanodes);
- }
-
private static class TestBlockInputStreamFactory implements
BlockInputStreamFactory {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
new file mode 100644
index 0000000..c1c0574
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
+
+/**
+ * Test for the ECBlockReconstructedInputStream class.
+ */
+public class TestECBlockReconstructedInputStream {
+
+ private ECReplicationConfig repConfig;
+ private ECStreamTestUtil.TestBlockInputStreamFactory streamFactory;
+ private long randomSeed;
+ private ThreadLocalRandom random = ThreadLocalRandom.current();
+ private SplittableRandom dataGenerator;
+
+ @Before
+ public void setup() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2);
+ streamFactory = new ECStreamTestUtil.TestBlockInputStreamFactory();
+
+ randomSeed = random.nextLong();
+ dataGenerator = new SplittableRandom(randomSeed);
+ }
+
+ private ECBlockReconstructedStripeInputStream createStripeInputStream(
+ Map<DatanodeDetails, Integer> dnMap, long blockLength) {
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory);
+ }
+
+ @Test
+ public void testBlockLengthReturned() throws IOException {
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(4, 5);
+ try(ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, 12345L)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ Assert.assertEquals(12345L, stream.getLength());
+ }
+ }
+ }
+
+ @Test
+ public void testBlockIDReturned() throws IOException {
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 4, 5);
+ try(ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, 12345L)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ Assert.assertEquals(new BlockID(1, 1), stream.getBlockID());
+ }
+ }
+ }
+
+ @Test
+ public void testReadDataByteBufferMultipleStripes() throws IOException {
+ int readBufferSize = random.nextInt(4096);
+ // 3 stripes and a partial chunk
+ int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+ + repConfig.getEcChunkSize() - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+ repConfig.getEcChunkSize() * 4);
+ ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+ dataGenerator, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ try(ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, blockLength)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+ int totalRead = 0;
+ dataGenerator = new SplittableRandom(randomSeed);
+ while (totalRead < blockLength) {
+ int expectedRead = Math.min(blockLength - totalRead, readBufferSize);
+ long read = stream.read(b);
+ totalRead += read;
+ Assert.assertEquals(expectedRead, read);
+ ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+ b.clear();
+ }
+ // Next read should be EOF
+ b.clear();
+ long read = stream.read(b);
+ Assert.assertEquals(-1, read);
+ }
+ }
+ }
+
+ @Test
+ public void testReadDataByteBufferUnderBufferSize() throws IOException {
+ int readBufferSize = 4096;
+ // Small block with less data that the read size
+ int blockLength = 1024;
+
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 1024);
+ ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+ dataGenerator, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ try(ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, blockLength)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+ dataGenerator = new SplittableRandom(randomSeed);
+ long read = stream.read(b);
+ Assert.assertEquals(blockLength, read);
+ ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+ b.clear();
+ // Next read should be EOF
+ read = stream.read(b);
+ Assert.assertEquals(-1, read);
+ }
+ }
+ }
+
+ @Test
+ public void testReadByteAtATime() throws IOException {
+ // 3 stripes and a partial chunk
+ int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+ + repConfig.getEcChunkSize() - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+ repConfig.getEcChunkSize() * 4);
+ ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+ dataGenerator, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ try(ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, blockLength)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+
+ dataGenerator = new SplittableRandom(randomSeed);
+ int totalRead = 0;
+ while (true) {
+ int val = stream.read();
+ if (val == -1) {
+ break;
+ }
+ Assert.assertEquals(dataGenerator.nextInt(255), val);
+ totalRead += 1;
+ }
+ Assert.assertEquals(blockLength, totalRead);
+ }
+ }
+ }
+
+ @Test
+ public void testReadByteBuffer() throws IOException {
+ // 3 stripes and a partial chunk
+ int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+ + repConfig.getEcChunkSize() - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+ repConfig.getEcChunkSize() * 4);
+ ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+ dataGenerator, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ byte[] buf = new byte[1024];
+ try(ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, blockLength)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ int totalRead = 0;
+ dataGenerator = new SplittableRandom(randomSeed);
+ while (totalRead < blockLength) {
+ int expectedRead = Math.min(blockLength - totalRead, 1024);
+ long read = stream.read(buf, 0, buf.length);
+ totalRead += read;
+ Assert.assertEquals(expectedRead, read);
+ ECStreamTestUtil.assertBufferMatches(
+ ByteBuffer.wrap(buf, 0, (int)read), dataGenerator);
+ }
+ // Next read should be EOF
+ long read = stream.read(buf, 0, buf.length);
+ Assert.assertEquals(-1, read);
+ }
+ }
+ }
+
+ @Test
+ public void testSeek() throws IOException {
+ int readBufferSize = repConfig.getEcChunkSize() + 1024;
+ // 3 stripes and a partial chunk
+ int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+ + repConfig.getEcChunkSize() - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+ repConfig.getEcChunkSize() * 4);
+ ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+ dataGenerator, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ Map<DatanodeDetails, Integer> dnMap
+ = ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ try(ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, blockLength)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+
+ int seekPosition = 0;
+ for (int i = 0; i < 100; i++) {
+ resetAndAdvanceDataGenerator(seekPosition);
+ long expectedRead = Math.min(stream.getRemaining(), readBufferSize);
+ long read = stream.read(b);
+ Assert.assertEquals(expectedRead, read);
+ ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+ seekPosition = random.nextInt(blockLength);
+ stream.seek(seekPosition);
+ b.clear();
+ }
+ // Seeking beyond EOF should give an error
+ try {
+ stream.seek(blockLength + 1);
+ Assert.fail("Seek beyond EOF should error");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+ }
+ }
+
+ private void resetAndAdvanceDataGenerator(long position) {
+ dataGenerator = new SplittableRandom(randomSeed);
+ for (long i = 0; i < position; i++) {
+ dataGenerator.nextInt(255);
+ }
+ }
+
+
+
+ /**
+ * Return a list of num ByteBuffers of the given size.
+ * @param num Number of buffers to create
+ * @param size The size of each buffer
+ * @return
+ */
+ private ByteBuffer[] allocateBuffers(int num, int size) {
+ ByteBuffer[] bufs = new ByteBuffer[num];
+ for (int i = 0; i < num; i++) {
+ bufs[i] = ByteBuffer.allocate(size);
+ }
+ return bufs;
+ }
+
+ private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity)
{
+ List<ByteBuffer> dataStreams = new ArrayList<>();
+ for (ByteBuffer b : data) {
+ dataStreams.add(b);
+ }
+ for (ByteBuffer b : parity) {
+ dataStreams.add(b);
+ }
+ streamFactory.setBlockStreamData(dataStreams);
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 33fb33a..8b1d460 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -17,25 +17,15 @@
*/
package org.apache.hadoop.ozone.client.rpc.read;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
+import
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStreamFactory;
+import
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.security.token.Token;
-import org.apache.ozone.erasurecode.CodecRegistry;
-import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -43,12 +33,13 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.SplittableRandom;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Function;
+
+import static
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
/**
* Test for the ECBlockReconstructedStripeInputStream.
@@ -58,19 +49,26 @@ public class TestECBlockReconstructedStripeInputStream {
private static final int ONEMB = 1024 * 1024;
private ECReplicationConfig repConfig;
- private TestBlockInputStreamFactory streamFactory;
+ private ECStreamTestUtil.TestBlockInputStreamFactory streamFactory;
+ private long randomSeed;
+ private ThreadLocalRandom random = ThreadLocalRandom.current();
+ private SplittableRandom dataGen;
@Before
public void setup() {
repConfig = new ECReplicationConfig(3, 2,
ECReplicationConfig.EcCodec.RS, ONEMB);
- streamFactory = new TestBlockInputStreamFactory();
+ streamFactory = new ECStreamTestUtil.TestBlockInputStreamFactory();
+
+ randomSeed = random.nextLong();
+ dataGen = new SplittableRandom(randomSeed);
}
@Test
public void testSufficientLocations() {
// One chunk, only 1 location.
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 1, ONEMB);
+ OmKeyLocationInfo keyInfo = ECStreamTestUtil
+ .createKeyInfo(repConfig, 1, ONEMB);
try (ECBlockInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -80,17 +78,18 @@ public class TestECBlockReconstructedStripeInputStream {
Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
// Two Chunks, but missing data block 2.
- dnMap = createIndexMap(1, 4, 5);
- keyInfo = createKeyInfo(repConfig, ONEMB * 2, dnMap);
+ dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 2, dnMap);
try (ECBlockInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ keyInfo, true, null, null,
+ new ECStreamTestUtil.TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
// Three Chunks, but missing data block 2 and 3.
- dnMap = createIndexMap(1, 4, 5);
- keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+ dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
try (ECBlockInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -98,8 +97,8 @@ public class TestECBlockReconstructedStripeInputStream {
}
// Three Chunks, but missing data block 2 and 3 and parity 1.
- dnMap = createIndexMap(1, 4);
- keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+ dnMap = ECStreamTestUtil.createIndexMap(1, 4);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
try (ECBlockInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -112,33 +111,32 @@ public class TestECBlockReconstructedStripeInputStream {
// Generate the input data for 3 full stripes and generate the parity.
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize * 2 - 1;
+ int blockLength = chunkSize * repConfig.getData() * 3 + partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 *
chunkSize);
- dataBufs[1].limit(4 * chunkSize - 1);
- dataBufs[2].limit(3 * chunkSize);
- for (ByteBuffer b : dataBufs) {
- randomFill(b);
- }
+ ECStreamTestUtil.randomFill(dataBufs, chunkSize, dataGen, blockLength);
+
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
// Two data missing
- locations.add(createIndexMap(1, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
// One data missing
- locations.add(createIndexMap(1, 2, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
// Two data missing including first
- locations.add(createIndexMap(2, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
// One data and one parity missing
- locations.add(createIndexMap(2, 3, 4));
+ locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
for (Map<DatanodeDetails, Integer> dnMap : locations) {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
null, null, streamFactory)) {
@@ -146,7 +144,7 @@ public class TestECBlockReconstructedStripeInputStream {
for (int i = 0; i < 3; i++) {
int read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
- validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
+ ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
}
Assert.assertEquals(stripeSize(), read);
@@ -161,8 +159,8 @@ public class TestECBlockReconstructedStripeInputStream {
// The next read is a partial stripe
int read = ecb.readStripe(bufs);
Assert.assertEquals(partialStripeSize, read);
- validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
- validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+ ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+ ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(0, bufs[2].position());
@@ -178,13 +176,8 @@ public class TestECBlockReconstructedStripeInputStream {
public void testReadPartialStripe() throws IOException {
int blockLength = repConfig.getEcChunkSize() - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
- // First buffer has only the blockLength, the other two will have no data.
- dataBufs[0].limit(blockLength);
- dataBufs[1].limit(0);
- dataBufs[2].limit(0);
- for (ByteBuffer b : dataBufs) {
- randomFill(b);
- }
+ ECStreamTestUtil
+ .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen,
blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
addDataStreamsToFactory(dataBufs, parity);
@@ -192,16 +185,18 @@ public class TestECBlockReconstructedStripeInputStream {
// We have a length that is less than a single chunk, so blocks 2 and 3
// are padding and will not be present. Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.
- Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(4, 5);
OmKeyLocationInfo keyInfo =
- createKeyInfo(repConfig, blockLength, dnMap);
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
null, null, streamFactory)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
- validateContents(dataBufs[0], bufs[0], 0, blockLength);
+ ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
Assert.assertEquals(0, bufs[1].remaining());
Assert.assertEquals(0, bufs[1].position());
Assert.assertEquals(0, bufs[2].remaining());
@@ -224,13 +219,8 @@ public class TestECBlockReconstructedStripeInputStream {
int blockLength = chunkSize * 2 - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
- // First buffer has only the blockLength, the other two will have no data.
- dataBufs[0].limit(chunkSize);
- dataBufs[1].limit(chunkSize - 1);
- dataBufs[2].limit(0);
- for (ByteBuffer b : dataBufs) {
- randomFill(b);
- }
+ ECStreamTestUtil
+ .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen,
blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
addDataStreamsToFactory(dataBufs, parity);
@@ -238,17 +228,19 @@ public class TestECBlockReconstructedStripeInputStream {
// We have a length that is less than a single chunk, so blocks 2 and 3
// are padding and will not be present. Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.
- Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(4, 5);
OmKeyLocationInfo keyInfo =
- createKeyInfo(repConfig, blockLength, dnMap);
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
null, null, streamFactory)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
- validateContents(dataBufs[0], bufs[0], 0, chunkSize);
- validateContents(dataBufs[1], bufs[1], 0, chunkSize - 1);
+ ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+ ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(0, bufs[2].position());
// Check the underlying streams have been advanced by 1 chunk:
@@ -269,13 +261,8 @@ public class TestECBlockReconstructedStripeInputStream {
int blockLength = chunkSize * 3 - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
- // First buffer has only the blockLength, the other two will have no data.
- dataBufs[0].limit(chunkSize);
- dataBufs[1].limit(chunkSize);
- dataBufs[2].limit(chunkSize - 1);
- for (ByteBuffer b : dataBufs) {
- randomFill(b);
- }
+ ECStreamTestUtil
+ .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen,
blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
// We have a length that is less than a stripe, so chunks 1 and 2 are full.
@@ -284,15 +271,15 @@ public class TestECBlockReconstructedStripeInputStream {
List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
// Two data missing
- locations.add(createIndexMap(3, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(3, 4, 5));
// Two data missing
- locations.add(createIndexMap(1, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
// One data missing - the last one
- locations.add(createIndexMap(1, 2, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(1, 2, 5));
// One data and one parity missing
- locations.add(createIndexMap(2, 3, 4));
+ locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
// One data and one parity missing
- locations.add(createIndexMap(1, 2, 4));
+ locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4));
for (Map<DatanodeDetails, Integer> dnMap : locations) {
streamFactory = new TestBlockInputStreamFactory();
@@ -300,16 +287,17 @@ public class TestECBlockReconstructedStripeInputStream {
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
OmKeyLocationInfo keyInfo =
- createKeyInfo(repConfig, blockLength, dnMap);
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
null, null, streamFactory)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
- validateContents(dataBufs[0], bufs[0], 0, chunkSize);
- validateContents(dataBufs[1], bufs[1], 0, chunkSize);
- validateContents(dataBufs[2], bufs[2], 0, chunkSize - 1);
+ ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+ ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
+ ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
// Check the underlying streams have been advanced by 1 chunk:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
Assert.assertEquals(0, bis.getRemaining());
@@ -327,13 +315,8 @@ public class TestECBlockReconstructedStripeInputStream {
public void testErrorThrownIfBlockNotLongEnough() throws IOException {
int blockLength = repConfig.getEcChunkSize() - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
- // First buffer has only the blockLength, the other two will have no data.
- dataBufs[0].limit(blockLength);
- dataBufs[1].limit(0);
- dataBufs[2].limit(0);
- for (ByteBuffer b : dataBufs) {
- randomFill(b);
- }
+ ECStreamTestUtil
+ .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen,
blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
addDataStreamsToFactory(dataBufs, parity);
@@ -345,9 +328,10 @@ public class TestECBlockReconstructedStripeInputStream {
// We have a length that is less than a single chunk, so blocks 2 and 3
// are padding and will not be present. Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.
- Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(4, 5);
OmKeyLocationInfo keyInfo =
- createKeyInfo(repConfig, blockLength, dnMap);
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
try (ECBlockReconstructedStripeInputStream ecb =
new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
true,
@@ -369,28 +353,25 @@ public class TestECBlockReconstructedStripeInputStream {
int partialStripeSize = chunkSize * 2 - 1;
int dataLength = stripeSize() * 3 + partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 *
chunkSize);
- dataBufs[1].limit(4 * chunkSize - 1);
- dataBufs[2].limit(3 * chunkSize);
- for (ByteBuffer b : dataBufs) {
- randomFill(b);
- }
+ ECStreamTestUtil
+ .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, dataLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
// Two data missing
- locations.add(createIndexMap(1, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
// One data missing
- locations.add(createIndexMap(1, 2, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
// Two data missing including first
- locations.add(createIndexMap(2, 4, 5));
+ locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
// One data and one parity missing
- locations.add(createIndexMap(2, 3, 4));
+ locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
for (Map<DatanodeDetails, Integer> dnMap : locations) {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -429,7 +410,7 @@ public class TestECBlockReconstructedStripeInputStream {
// seek to the start of stripe 3
clearBuffers(bufs);
- ecb.seek(stripeSize() * 2);
+ ecb.seek(stripeSize() * (long)2);
read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], 2 * chunkSize, chunkSize);
@@ -442,8 +423,9 @@ public class TestECBlockReconstructedStripeInputStream {
@Test
public void testSeekToPartialOffsetFails() {
- Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 4, 5);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 4, 5);
+ OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -465,13 +447,12 @@ public class TestECBlockReconstructedStripeInputStream {
// Generate the input data for 3 full stripes and generate the parity.
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize * 2 - 1;
+ int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+ + partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
4 * chunkSize);
- dataBufs[1].limit(4 * chunkSize - 1);
- dataBufs[2].limit(3 * chunkSize);
- for (ByteBuffer b : dataBufs) {
- randomFill(b);
- }
+ ECStreamTestUtil
+ .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen,
blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
List<List<Integer>> failLists = new ArrayList<>();
@@ -487,8 +468,9 @@ public class TestECBlockReconstructedStripeInputStream {
addDataStreamsToFactory(dataBufs, parity);
// Data block index 3 is missing and needs recovered initially.
- Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 2, 4, 5);
- OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -572,77 +554,6 @@ public class TestECBlockReconstructedStripeInputStream {
}
/**
- * Returns a new map containing a random DatanodeDetails for each index in
- * inputs.
- * @param idxs A list of indexes to add to the map
- * @return A map of DatanodeDetails to index.
- */
- private Map<DatanodeDetails, Integer> createIndexMap(int... idxs) {
- Map<DatanodeDetails, Integer> map = new HashMap<>();
- for (int i : idxs) {
- map.put(MockDatanodeDetails.randomDatanodeDetails(), i);
- }
- return map;
- }
-
- /**
- * Given a set of data buffers, generate the parity data for the inputs.
- * @param data A set of data buffers
- * @param ecConfig The ECReplicationConfig representing the scheme
- * @return
- * @throws IOException
- */
- private ByteBuffer[] generateParity(ByteBuffer[] data,
- ECReplicationConfig ecConfig) throws IOException {
- // First data buffer dictates the size
- int cellSize = data[0].limit();
- // Store the positions of the remaining data buffers so we can restore them
- int[] dataLimits = new int[data.length];
- for (int i=1; i<data.length; i++) {
- dataLimits[i] = data[i].limit();
- data[i].limit(cellSize);
- zeroFill(data[i]);
- data[i].flip();
- }
- ByteBuffer[] parity = new ByteBuffer[ecConfig.getParity()];
- for (int i = 0; i < ecConfig.getParity(); i++) {
- parity[i] = ByteBuffer.allocate(cellSize);
- }
- RawErasureEncoder encoder = CodecRegistry.getInstance()
- .getCodecFactory(repConfig.getCodec().toString())
- .createEncoder(repConfig);
- encoder.encode(data, parity);
-
- data[0].flip();
- for (int i = 1; i < data.length; i++) {
- data[i].limit(dataLimits[i]);
- data[i].position(0);
- }
- return parity;
- }
-
- /**
- * Fill the remaining space in a buffer random bytes.
- * @param buf
- */
- private void randomFill(ByteBuffer buf) {
- while (buf.hasRemaining()) {
- buf.put((byte)ThreadLocalRandom.current().nextInt(255));
- }
- buf.flip();
- }
-
- /**
- * Fill / Pad the remaining space in a buffer with zeros.
- * @param buf
- */
- private void zeroFill(ByteBuffer buf) {
- byte[] a = buf.array();
- Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
- buf.position(buf.limit());
- }
-
- /**
* Return a list of num ByteBuffers of the given size.
* @param num Number of buffers to create
* @param size The size of each buffer
@@ -678,156 +589,4 @@ public class TestECBlockReconstructedStripeInputStream {
return bufs;
}
- private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
- long blockLength, Map<DatanodeDetails, Integer> dnMap) {
-
- Pipeline pipeline = Pipeline.newBuilder()
- .setState(Pipeline.PipelineState.CLOSED)
- .setId(PipelineID.randomId())
- .setNodes(new ArrayList<>(dnMap.keySet()))
- .setReplicaIndexes(dnMap)
- .setReplicationConfig(repConf)
- .build();
-
- OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
- .setBlockID(new BlockID(1, 1))
- .setLength(blockLength)
- .setOffset(0)
- .setPipeline(pipeline)
- .setPartNumber(0)
- .build();
- return keyInfo;
- }
-
- private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
- int nodeCount, long blockLength) {
- Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
- for (int i = 0; i < nodeCount; i++) {
- datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
- }
- return createKeyInfo(repConf, blockLength, datanodes);
- }
-
- private static class TestBlockInputStreamFactory implements
- BlockInputStreamFactory {
-
- private List<TestBlockInputStream> blockStreams = new ArrayList<>();
- private List<ByteBuffer> blockStreamData;
-
- private Pipeline currentPipeline;
-
- public List<TestBlockInputStream> getBlockStreams() {
- return blockStreams;
- }
-
- public void setBlockStreamData(List<ByteBuffer> bufs) {
- this.blockStreamData = bufs;
- }
-
- public void setCurrentPipeline(Pipeline pipeline) {
- this.currentPipeline = pipeline;
- }
-
- public BlockExtendedInputStream create(ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
- XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
-
- int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
- TestBlockInputStream stream = new TestBlockInputStream(
- blockInfo.getBlockID(), blockInfo.getLength(),
- blockStreamData.get(repInd - 1));
- blockStreams.add(stream);
- return stream;
- }
- }
-
- private static class TestBlockInputStream extends BlockExtendedInputStream {
-
- private ByteBuffer data;
- private boolean closed = false;
- private BlockID blockID;
- private long length;
- private boolean shouldError = false;
- private static final byte EOF = -1;
-
- TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
- this.blockID = blockId;
- this.length = blockLen;
- this.data = data;
- data.position(0);
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public void setShouldError(boolean val) {
- shouldError = val;
- }
-
- @Override
- public BlockID getBlockID() {
- return blockID;
- }
-
- @Override
- public long getLength() {
- return length;
- }
-
- @Override
- public long getRemaining() {
- return data.remaining();
- }
-
- @Override
- public int read(byte[] b, int off, int len)
- throws IOException {
- return read(ByteBuffer.wrap(b, off, len));
- }
-
- @Override
- public int read(ByteBuffer buf) throws IOException {
- if (shouldError) {
- throw new IOException("Simulated error reading block");
- }
- if (getRemaining() == 0) {
- return EOF;
- }
- int toRead = Math.min(buf.remaining(), (int)getRemaining());
- for (int i = 0; i < toRead; i++) {
- buf.put(data.get());
- }
- return toRead;
- };
-
- @Override
- protected int readWithStrategy(ByteReaderStrategy strategy) throws
- IOException {
- throw new IOException("Should not be called");
- }
-
- @Override
- public void close() {
- closed = true;
- }
-
- @Override
- public void unbuffer() {
- }
-
- @Override
- public long getPos() {
- return data.position();
- }
-
- @Override
- public void seek(long pos) {
- data.position((int)pos);
- }
-
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]