This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new db35f8c HDDS-4942. EC: Implement ECBlockInputStream to read a single
EC BlockGroup (#2507)
db35f8c is described below
commit db35f8c003b34fe790694a11d2a49ef50100e12f
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Aug 26 08:16:39 2021 +0100
HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup
(#2507)
---
.../ozone/client/io/BlockInputStreamProvider.java | 34 ++
.../client/io/BlockInputStreamProviderImpl.java | 51 +++
.../hadoop/ozone/client/io/ECBlockInputStream.java | 322 +++++++++++++++++
.../client/rpc/read/TestECBlockInputStream.java | 384 +++++++++++++++++++++
4 files changed, 791 insertions(+)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
new file mode 100644
index 0000000..d1cc74a
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Interface used to create BlockInputStreams.
+ */
+public interface BlockInputStreamProvider {
+
+ BlockInputStream provide(BlockID blockId, long blockLen, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum);
+
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
new file mode 100644
index 0000000..507dadc
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamProvider to create
+ * BlockInputStreams in a real cluster.
+ */
+public class BlockInputStreamProviderImpl implements BlockInputStreamProvider {
+
+ private final XceiverClientFactory xceiverClientFactory;
+ private final Function<BlockID, Pipeline> refreshFunction;
+
+ public BlockInputStreamProviderImpl(XceiverClientFactory xceiverFactory,
+ Function<BlockID, Pipeline> refreshFunction) {
+ this.xceiverClientFactory = xceiverFactory;
+ this.refreshFunction = refreshFunction;
+ }
+
+ @Override
+ public BlockInputStream provide(BlockID blockId, long blockLen,
+ Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+ boolean verifyChecksum) {
+ return new BlockInputStream(blockId, blockLen, pipeline, token,
+ verifyChecksum, xceiverClientFactory, refreshFunction);
+ }
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
new file mode 100644
index 0000000..9c9d0a6
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream
+ implements Seekable, CanUnbuffer, ByteBufferReadable {
+
+ private static final int EOF = -1;
+
+ private final ECReplicationConfig repConfig;
+ private final int ecChunkSize;
+ private final BlockInputStreamProvider streamProvider;
+ private final boolean verifyChecksum;
+ private final OmKeyLocationInfo blockInfo;
+ private final DatanodeDetails[] dataLocations;
+ private final DatanodeDetails[] parityLocations;
+ private final BlockInputStream[] blockStreams;
+ private final int maxLocations;
+
+ private int position = 0;
+ private boolean closed = false;
+
+ public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
+ OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ BlockInputStreamProvider streamProvider) {
+ this.repConfig = repConfig;
+ this.ecChunkSize = ecChunkSize;
+ this.verifyChecksum = verifyChecksum;
+ this.blockInfo = blockInfo;
+ this.streamProvider = streamProvider;
+ this.maxLocations = repConfig.getData() + repConfig.getParity();
+ this.dataLocations = new DatanodeDetails[repConfig.getData()];
+ this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+ this.blockStreams = new BlockInputStream[repConfig.getData()];
+
+ setBlockLocations(this.blockInfo.getPipeline());
+ }
+
+ public synchronized boolean hasSufficientLocations() {
+ // Until we implement "on the fly" recovery, all data location must be
+ // present and we have enough locations if that is the case.
+ //
+ // The number of locations needed is a function of the EC Chunk size. If
the
+ // block length is <= the chunk size, we should only have location 1. If it
+ // is greater than the chunk size but less than chunk_size * 2, then we
must
+ // have two locations. If it is greater than chunk_size * data_num, then we
+ // must have all data_num locations.
+ int expectedDataBlocks =
+ (int)Math.min(
+ Math.ceil((double)blockInfo.getLength() / ecChunkSize),
+ repConfig.getData());
+ for (int i=0; i<expectedDataBlocks; i++) {
+ if (dataLocations[i] == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Using the current position, returns the index of the blockStream we should
+ * be reading from. This is the index in the internal array holding the
+ * stream reference. The block group index will be one greater than this.
+ * @return
+ */
+ private int currentStreamIndex() {
+ return ((position / ecChunkSize) % repConfig.getData());
+ }
+
+ /**
+ * Uses the current position and ecChunkSize to determine which of the
+ * internal block streams the next read should come from. Also opens the
+ * stream if it has not been opened already.
+ * @return BlockInput stream to read from.
+ */
+ private BlockInputStream getOrOpenStream() {
+ int ind = currentStreamIndex();
+ BlockInputStream stream = blockStreams[ind];
+ if (stream == null) {
+ // To read an EC block, we create a STANDALONE pipeline that contains the
+ // single location for the block index we want to read. The EC blocks are
+ // indexed from 1 to N, however the data locations are stored in the
+ // dataLocations array indexed from zero.
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setReplicationConfig(new StandaloneReplicationConfig(
+ HddsProtos.ReplicationFactor.ONE))
+ .setNodes(Arrays.asList(dataLocations[ind]))
+ .setId(PipelineID.randomId())
+ .setState(Pipeline.PipelineState.CLOSED)
+ .build();
+
+ stream = streamProvider.provide(blockInfo.getBlockID(),
+ internalBlockLength(ind+1), pipeline, blockInfo.getToken(),
+ verifyChecksum);
+ blockStreams[ind] = stream;
+ }
+ return stream;
+ }
+
+ /**
+ * Returns the length of the Nth block in the block group, taking account of
a
+ * potentially partial last stripe. Note that the internal block index is
+ * numbered starting from 1.
+ * @param index - Index number of the internal block, starting from 1
+ * @return
+ */
+ private long internalBlockLength(int index) {
+ long lastStripe = blockInfo.getLength()
+ % ((long)ecChunkSize * repConfig.getData());
+ long blockSize = (blockInfo.getLength() - lastStripe) /
repConfig.getData();
+ long lastCell = lastStripe / ecChunkSize + 1;
+ long lastCellLength = lastStripe % ecChunkSize;
+
+ if (index < lastCell) {
+ return blockSize + ecChunkSize;
+ } else if (index == lastCell) {
+ return blockSize + lastCellLength;
+ } else {
+ return blockSize;
+ }
+ }
+
+ private void setBlockLocations(Pipeline pipeline) {
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ int index = pipeline.getReplicaIndex(node);
+ addBlockLocation(index, node);
+ }
+ }
+
+ private void addBlockLocation(int index, DatanodeDetails location) {
+ if (index > maxLocations) {
+ throw new IndexOutOfBoundsException("The index " + index + " is greater "
+ + "than the EC Replication Config (" + repConfig + ")");
+ }
+ if (index <= repConfig.getData()) {
+ dataLocations[index - 1] = location;
+ } else {
+ parityLocations[index - repConfig.getData() - 1] = location;
+ }
+ }
+
+ private long blockLength() {
+ return blockInfo.getLength();
+ }
+
+ private long remaining() {
+ return blockLength() - position;
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ byte[] buf = new byte[1];
+ if (read(buf, 0, 1) == EOF) {
+ return EOF;
+ }
+ return Byte.toUnsignedInt(buf[0]);
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
+ int bufferLen = strategy.getTargetLength();
+ if (bufferLen == 0) {
+ return 0;
+ }
+ return readWithStrategy(strategy);
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+ ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
+ int bufferLen = strategy.getTargetLength();
+ if (bufferLen == 0) {
+ return 0;
+ }
+ return readWithStrategy(strategy);
+ }
+
+ /**
+ * Read from the internal BlockInputStreams one EC cell at a time into the
+ * strategy buffer. This call may read from several internal
BlockInputStreams
+ * if there is sufficient space in the buffer.
+ * @param strategy
+ * @return
+ * @throws IOException
+ */
+ private synchronized int readWithStrategy(ByteReaderStrategy strategy) throws
+ IOException {
+ Preconditions.checkArgument(strategy != null);
+ checkOpen();
+
+ if (remaining() == 0) {
+ return EOF;
+ }
+
+ int totalRead = 0;
+ while(strategy.getTargetLength() > 0 && remaining() > 0) {
+ BlockInputStream stream = getOrOpenStream();
+ int read = readFromStream(stream, strategy);
+ totalRead += read;
+ position += read;
+ }
+ return totalRead;
+ }
+
+ /**
+ * Read the most allowable amount of data from the current stream. This
+ * ensures we don't read past the end of an EC cell or the overall block
+ * group length.
+ * @param stream Stream to read from
+ * @param strategy The ReaderStrategy to read data into
+ * @return
+ * @throws IOException
+ */
+ private int readFromStream(BlockInputStream stream,
+ ByteReaderStrategy strategy)
+ throws IOException {
+ // Number of bytes left to read from this streams EC cell.
+ long ecLimit = ecChunkSize - (position % ecChunkSize);
+ // Free space in the buffer to read into
+ long bufLimit = strategy.getTargetLength();
+ // How much we can read, the lower of the EC Cell, buffer and overall block
+ // remaining.
+ int expectedRead = (int)Math.min(Math.min(ecLimit, bufLimit), remaining());
+ int actualRead = strategy.readFromBlock(stream, expectedRead);
+ if (actualRead == -1) {
+ // The Block Stream reached EOF, but we did not expect it to, so the
block
+ // might be corrupt.
+ throw new IOException("Expected to read " + expectedRead + " but got EOF"
+ + " from blockGroup " + stream.getBlockID() + " index "
+ + currentStreamIndex()+1);
+ }
+ return actualRead;
+ }
+
+ /**
+ * Verify that the input stream is open.
+ * @throws IOException if the connection is closed.
+ */
+ private void checkOpen() throws IOException {
+ if (closed) {
+ throw new IOException(
+ ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Block: "
+ + blockInfo.getBlockID());
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ for (BlockInputStream stream : blockStreams) {
+ if (stream != null) {
+ stream.close();
+ }
+ }
+ closed = true;
+ }
+
+ @Override
+ public synchronized void unbuffer() {
+ for (BlockInputStream stream : blockStreams) {
+ if (stream != null) {
+ stream.unbuffer();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void seek(long l) throws IOException {
+ throw new NotImplementedException("Seek is not implements for EC");
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public synchronized boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
new file mode 100644
index 0000000..c5f9d0b
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamProvider;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+ private static final int ONEMB = 1024 * 1024;
+
+ private ECReplicationConfig repConfig;
+ private TestBlockInputStreamProvider streamProvider;
+
+ @Before
+ public void setup() {
+ repConfig = new ECReplicationConfig(3, 2);
+ streamProvider = new TestBlockInputStreamProvider();
+ }
+
+ @Test
+ // TODO - this test will need changed when we can do recovery reads.
+ public void testSufficientLocations() {
+ // EC-3-2, 5MB block, so all 3 data locations are needed
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, new TestBlockInputStreamProvider())) {
+ Assert.assertTrue(ecb.hasSufficientLocations());
+ }
+
+ // EC-3-2, very large block, so all 3 data locations are needed
+ keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, new TestBlockInputStreamProvider())) {
+ Assert.assertTrue(ecb.hasSufficientLocations());
+ }
+
+ Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+ // EC-3-2, 1 byte short of 1MB with 1 location
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+ keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, new TestBlockInputStreamProvider())) {
+ Assert.assertTrue(ecb.hasSufficientLocations());
+ }
+
+ // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+ dnMap.clear();
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+ keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, new TestBlockInputStreamProvider())) {
+ Assert.assertFalse(ecb.hasSufficientLocations());
+ }
+
+ // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+ // locations.
+ dnMap.clear();
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+ keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, new TestBlockInputStreamProvider())) {
+ Assert.assertFalse(ecb.hasSufficientLocations());
+ }
+
+ // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+ // this will fail as we don't support reconstruction reads yet.
+ dnMap.clear();
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+ keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, new TestBlockInputStreamProvider())) {
+ Assert.assertFalse(ecb.hasSufficientLocations());
+ }
+ }
+
+ @Test
+ public void testCorrectBlockSizePassedToBlockStreamLessThanCell()
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+ ecb.read(buf);
+ // We expect only 1 block stream and it should have a length passed of
+ // ONEMB - 100.
+ List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+ }
+ }
+
+ @Test
+ public void testCorrectBlockSizePassedToBlockStreamTwoCells()
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+ ecb.read(buf);
+ List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ Assert.assertEquals(ONEMB, streams.get(0).getLength());
+ Assert.assertEquals(100, streams.get(1).getLength());
+ }
+ }
+
+ @Test
+ public void testCorrectBlockSizePassedToBlockStreamThreeCells()
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+ ecb.read(buf);
+ List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ Assert.assertEquals(ONEMB, streams.get(0).getLength());
+ Assert.assertEquals(ONEMB, streams.get(1).getLength());
+ Assert.assertEquals(100, streams.get(2).getLength());
+ }
+ }
+
+ @Test
+ public void
testCorrectBlockSizePassedToBlockStreamThreeFullAndPartialStripe()
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+ ecb.read(buf);
+ List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+ Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+ Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+ }
+ }
+
+ @Test
+ public void testCorrectBlockSizePassedToBlockStreamSingleFullCell()
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+ ecb.read(buf);
+ List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ Assert.assertEquals(ONEMB, streams.get(0).getLength());
+ }
+ }
+
+ @Test
+ public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells()
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 9 * ONEMB);
+
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+ ecb.read(buf);
+ List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ Assert.assertEquals(3 * ONEMB, streams.get(0).getLength());
+ Assert.assertEquals(3 * ONEMB, streams.get(1).getLength());
+ Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+ }
+ }
+
+ @Test
+ public void testSimpleRead() throws IOException {
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+
+ ByteBuffer buf = ByteBuffer.allocate(100);
+
+ int read = ecb.read(buf);
+ Assert.assertEquals(100, read);
+ validateBufferContents(buf, 0, 100, (byte) 0);
+ Assert.assertEquals(100, ecb.getPos());
+ }
+ for (TestBlockInputStream s : streamProvider.getBlockStreams()) {
+ Assert.assertTrue(s.isClosed());
+ }
+ }
+
+ @Test
+ public void testReadPastEOF() throws IOException {
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 50);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, streamProvider)) {
+
+ ByteBuffer buf = ByteBuffer.allocate(100);
+
+ int read = ecb.read(buf);
+ Assert.assertEquals(50, read);
+ read = ecb.read(buf);
+ Assert.assertEquals(read, -1);
+ }
+ }
+
+ @Test
+ public void testReadCrossingMultipleECChunkBounds() throws IOException {
+ // EC-3-2, 5MB block, so all 3 data locations are needed
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, 100,
+ keyInfo, true, streamProvider)) {
+
+ // EC Chunk size is 100 and 3-2. Create a byte buffer to read 3.5 chunks,
+ // so 350
+ ByteBuffer buf = ByteBuffer.allocate(350);
+ int read = ecb.read(buf);
+ Assert.assertEquals(350, read);
+
+ validateBufferContents(buf, 0, 100, (byte) 0);
+ validateBufferContents(buf, 100, 200, (byte) 1);
+ validateBufferContents(buf, 200, 300, (byte) 2);
+ validateBufferContents(buf, 300, 350, (byte) 0);
+
+ buf.clear();
+ read = ecb.read(buf);
+ Assert.assertEquals(350, read);
+
+ validateBufferContents(buf, 0, 50, (byte) 0);
+ validateBufferContents(buf, 50, 150, (byte) 1);
+ validateBufferContents(buf, 150, 250, (byte) 2);
+ validateBufferContents(buf, 250, 350, (byte) 0);
+
+ }
+ for (TestBlockInputStream s : streamProvider.getBlockStreams()) {
+ Assert.assertTrue(s.isClosed());
+ }
+ }
+
+ private void validateBufferContents(ByteBuffer buf, int from, int to,
+ byte val) {
+ for (int i=from; i<to; i++){
+ Assert.assertEquals(val, buf.get(i));
+ }
+ }
+
+ private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setReplicationConfig(repConf)
+ .build();
+
+ OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(new BlockID(1, 1))
+ .setLength(blockLength)
+ .setOffset(0)
+ .setPipeline(pipeline)
+ .setPartNumber(0)
+ .build();
+ return keyInfo;
+ }
+
+ private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ int nodeCount, long blockLength) {
+ Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+ for (int i = 0; i < nodeCount; i++) {
+ datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+ }
+
+ return createKeyInfo(repConf, blockLength, datanodes);
+ }
+
+ private static class TestBlockInputStreamProvider implements
+ BlockInputStreamProvider {
+
+ private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+
+ public List<TestBlockInputStream> getBlockStreams() {
+ return blockStreams;
+ }
+
+ @Override
+ public BlockInputStream provide(BlockID blockId, long blockLen,
+ Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+ boolean verifyChecksum) {
+ TestBlockInputStream stream = new TestBlockInputStream(blockId, blockLen,
+ pipeline, token, verifyChecksum, null, null,
+ (byte)blockStreams.size());
+ blockStreams.add(stream);
+ return stream;
+ }
+ }
+
+ private static class TestBlockInputStream extends BlockInputStream {
+
+ private long position = 0;
+ private boolean closed = false;
+ private byte dataVal = 1;
+ private static final byte EOF = -1;
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ TestBlockInputStream(BlockID blockId, long blockLen,
+ Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+ boolean verifyChecksum, XceiverClientFactory xceiverClientFactory,
+ Function<BlockID, Pipeline> refreshPipelineFunction, byte dataVal) {
+ super(blockId, blockLen, pipeline, token, verifyChecksum,
+ xceiverClientFactory, refreshPipelineFunction);
+ this.dataVal = dataVal;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public long getRemaining() {
+ return getLength() - position;
+ }
+
+ @Override
+ public int read(ByteBuffer buf) {
+ if (getRemaining() == 0) {
+ return EOF;
+ }
+
+ int toRead = Math.min(buf.remaining(), (int)getRemaining());
+ for (int i=0; i<toRead; i++) {
+ buf.put(dataVal);
+ }
+ position += toRead;
+ return toRead;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]