This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new db35f8c  HDDS-4942. EC: Implement ECBlockInputStream to read a single 
EC BlockGroup (#2507)
db35f8c is described below

commit db35f8c003b34fe790694a11d2a49ef50100e12f
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Aug 26 08:16:39 2021 +0100

    HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup 
(#2507)
---
 .../ozone/client/io/BlockInputStreamProvider.java  |  34 ++
 .../client/io/BlockInputStreamProviderImpl.java    |  51 +++
 .../hadoop/ozone/client/io/ECBlockInputStream.java | 322 +++++++++++++++++
 .../client/rpc/read/TestECBlockInputStream.java    | 384 +++++++++++++++++++++
 4 files changed, 791 insertions(+)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
new file mode 100644
index 0000000..d1cc74a
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Interface used to create BlockInputStreams.
+ */
+public interface BlockInputStreamProvider {
+
+  BlockInputStream provide(BlockID blockId, long blockLen, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum);
+
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
new file mode 100644
index 0000000..507dadc
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamProvider to create
+ * BlockInputStreams in a real cluster.
+ */
+public class BlockInputStreamProviderImpl implements BlockInputStreamProvider {
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+
+  public BlockInputStreamProviderImpl(XceiverClientFactory xceiverFactory,
+      Function<BlockID, Pipeline> refreshFunction) {
+    this.xceiverClientFactory = xceiverFactory;
+    this.refreshFunction = refreshFunction;
+  }
+
+  @Override
+  public BlockInputStream provide(BlockID blockId, long blockLen,
+      Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum) {
+    return new BlockInputStream(blockId, blockLen, pipeline, token,
+        verifyChecksum, xceiverClientFactory, refreshFunction);
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
new file mode 100644
index 0000000..9c9d0a6
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
+
+  private static final int EOF = -1;
+
+  private final ECReplicationConfig repConfig;
+  private final int ecChunkSize;
+  private final BlockInputStreamProvider streamProvider;
+  private final boolean verifyChecksum;
+  private final OmKeyLocationInfo blockInfo;
+  private final DatanodeDetails[] dataLocations;
+  private final DatanodeDetails[] parityLocations;
+  private final BlockInputStream[] blockStreams;
+  private final int maxLocations;
+
+  private int position = 0;
+  private boolean closed = false;
+
+  public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockInputStreamProvider streamProvider) {
+    this.repConfig = repConfig;
+    this.ecChunkSize = ecChunkSize;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.streamProvider = streamProvider;
+    this.maxLocations = repConfig.getData() + repConfig.getParity();
+    this.dataLocations = new DatanodeDetails[repConfig.getData()];
+    this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+    this.blockStreams = new BlockInputStream[repConfig.getData()];
+
+    setBlockLocations(this.blockInfo.getPipeline());
+  }
+
+  public synchronized boolean hasSufficientLocations() {
+    // Until we implement "on the fly" recovery, all data location must be
+    // present and we have enough locations if that is the case.
+    //
+    // The number of locations needed is a function of the EC Chunk size. If 
the
+    // block length is <= the chunk size, we should only have location 1. If it
+    // is greater than the chunk size but less than chunk_size * 2, then we 
must
+    // have two locations. If it is greater than chunk_size * data_num, then we
+    // must have all data_num locations.
+    int expectedDataBlocks =
+        (int)Math.min(
+            Math.ceil((double)blockInfo.getLength() / ecChunkSize),
+            repConfig.getData());
+    for (int i=0; i<expectedDataBlocks; i++) {
+      if (dataLocations[i] == null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Using the current position, returns the index of the blockStream we should
+   * be reading from. This is the index in the internal array holding the
+   * stream reference. The block group index will be one greater than this.
+   * @return
+   */
+  private int currentStreamIndex() {
+    return ((position / ecChunkSize) % repConfig.getData());
+  }
+
+  /**
+   * Uses the current position and ecChunkSize to determine which of the
+   * internal block streams the next read should come from. Also opens the
+   * stream if it has not been opened already.
+   * @return BlockInput stream to read from.
+   */
+  private BlockInputStream getOrOpenStream() {
+    int ind = currentStreamIndex();
+    BlockInputStream stream = blockStreams[ind];
+    if (stream == null) {
+      // To read an EC block, we create a STANDALONE pipeline that contains the
+      // single location for the block index we want to read. The EC blocks are
+      // indexed from 1 to N, however the data locations are stored in the
+      // dataLocations array indexed from zero.
+      Pipeline pipeline = Pipeline.newBuilder()
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              HddsProtos.ReplicationFactor.ONE))
+          .setNodes(Arrays.asList(dataLocations[ind]))
+          .setId(PipelineID.randomId())
+          .setState(Pipeline.PipelineState.CLOSED)
+          .build();
+
+      stream = streamProvider.provide(blockInfo.getBlockID(),
+          internalBlockLength(ind+1), pipeline, blockInfo.getToken(),
+          verifyChecksum);
+      blockStreams[ind] = stream;
+    }
+    return stream;
+  }
+
+  /**
+   * Returns the length of the Nth block in the block group, taking account of 
a
+   * potentially partial last stripe. Note that the internal block index is
+   * numbered starting from 1.
+   * @param index - Index number of the internal block, starting from 1
+   * @return
+   */
+  private long internalBlockLength(int index) {
+    long lastStripe = blockInfo.getLength()
+        % ((long)ecChunkSize * repConfig.getData());
+    long blockSize = (blockInfo.getLength() - lastStripe) / 
repConfig.getData();
+    long lastCell = lastStripe / ecChunkSize + 1;
+    long lastCellLength = lastStripe % ecChunkSize;
+
+    if (index < lastCell) {
+      return blockSize + ecChunkSize;
+    } else if (index == lastCell) {
+      return blockSize + lastCellLength;
+    } else {
+      return blockSize;
+    }
+  }
+
+  private void setBlockLocations(Pipeline pipeline) {
+    for (DatanodeDetails node : pipeline.getNodes()) {
+      int index = pipeline.getReplicaIndex(node);
+      addBlockLocation(index, node);
+    }
+  }
+
+  private void addBlockLocation(int index, DatanodeDetails location) {
+    if (index > maxLocations) {
+      throw new IndexOutOfBoundsException("The index " + index + " is greater "
+          + "than the EC Replication Config (" + repConfig + ")");
+    }
+    if (index <= repConfig.getData()) {
+      dataLocations[index - 1] = location;
+    } else {
+      parityLocations[index - repConfig.getData() - 1] = location;
+    }
+  }
+
+  private long blockLength() {
+    return blockInfo.getLength();
+  }
+
+  private long remaining() {
+    return blockLength() - position;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    byte[] buf = new byte[1];
+    if (read(buf, 0, 1) == EOF) {
+      return EOF;
+    }
+    return Byte.toUnsignedInt(buf[0]);
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
+    int bufferLen = strategy.getTargetLength();
+    if (bufferLen == 0) {
+      return 0;
+    }
+    return readWithStrategy(strategy);
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+    ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
+    int bufferLen = strategy.getTargetLength();
+    if (bufferLen == 0) {
+      return 0;
+    }
+    return readWithStrategy(strategy);
+  }
+
+  /**
+   * Read from the internal BlockInputStreams one EC cell at a time into the
+   * strategy buffer. This call may read from several internal 
BlockInputStreams
+   * if there is sufficient space in the buffer.
+   * @param strategy
+   * @return
+   * @throws IOException
+   */
+  private synchronized int readWithStrategy(ByteReaderStrategy strategy) throws
+      IOException {
+    Preconditions.checkArgument(strategy != null);
+    checkOpen();
+
+    if (remaining() == 0) {
+      return EOF;
+    }
+
+    int totalRead = 0;
+    while(strategy.getTargetLength() > 0 && remaining() > 0) {
+      BlockInputStream stream = getOrOpenStream();
+      int read = readFromStream(stream, strategy);
+      totalRead += read;
+      position += read;
+    }
+    return totalRead;
+  }
+
+  /**
+   * Read the most allowable amount of data from the current stream. This
+   * ensures we don't read past the end of an EC cell or the overall block
+   * group length.
+   * @param stream Stream to read from
+   * @param strategy The ReaderStrategy to read data into
+   * @return
+   * @throws IOException
+   */
+  private int readFromStream(BlockInputStream stream,
+      ByteReaderStrategy strategy)
+      throws IOException {
+    // Number of bytes left to read from this streams EC cell.
+    long ecLimit = ecChunkSize - (position % ecChunkSize);
+    // Free space in the buffer to read into
+    long bufLimit = strategy.getTargetLength();
+    // How much we can read, the lower of the EC Cell, buffer and overall block
+    // remaining.
+    int expectedRead = (int)Math.min(Math.min(ecLimit, bufLimit), remaining());
+    int actualRead = strategy.readFromBlock(stream, expectedRead);
+    if (actualRead == -1) {
+      // The Block Stream reached EOF, but we did not expect it to, so the 
block
+      // might be corrupt.
+      throw new IOException("Expected to read " + expectedRead + " but got EOF"
+          + " from blockGroup " + stream.getBlockID() + " index "
+          + currentStreamIndex()+1);
+    }
+    return actualRead;
+  }
+
+  /**
+   * Verify that the input stream is open.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkOpen() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Block: "
+              + blockInfo.getBlockID());
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    for (BlockInputStream stream : blockStreams) {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    closed = true;
+  }
+
+  @Override
+  public synchronized void unbuffer() {
+    for (BlockInputStream stream : blockStreams) {
+      if (stream != null) {
+        stream.unbuffer();
+      }
+    }
+  }
+
+  @Override
+  public synchronized void seek(long l) throws IOException {
+    throw new NotImplementedException("Seek is not implements for EC");
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return position;
+  }
+
+  @Override
+  public synchronized boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
new file mode 100644
index 0000000..c5f9d0b
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamProvider;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  private ECReplicationConfig repConfig;
+  private TestBlockInputStreamProvider streamProvider;
+
+  @Before
+  public void setup() {
+    repConfig = new ECReplicationConfig(3, 2);
+    streamProvider = new TestBlockInputStreamProvider();
+  }
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+            keyInfo, true, new TestBlockInputStreamProvider())) {
+      Assert.assertTrue(ecb.hasSufficientLocations());
+    }
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamProvider())) {
+      Assert.assertTrue(ecb.hasSufficientLocations());
+    }
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamProvider())) {
+      Assert.assertTrue(ecb.hasSufficientLocations());
+    }
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamProvider())) {
+      Assert.assertFalse(ecb.hasSufficientLocations());
+    }
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamProvider())) {
+      Assert.assertFalse(ecb.hasSufficientLocations());
+    }
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamProvider())) {
+      Assert.assertFalse(ecb.hasSufficientLocations());
+    }
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStreamLessThanCell()
+      throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    }
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStreamTwoCells()
+      throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+      ecb.read(buf);
+      List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    }
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStreamThreeCells()
+      throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+      ecb.read(buf);
+      List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    }
+  }
+
+  @Test
+  public void 
testCorrectBlockSizePassedToBlockStreamThreeFullAndPartialStripe()
+      throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+      ecb.read(buf);
+      List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    }
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStreamSingleFullCell()
+      throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+      ecb.read(buf);
+      List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+    }
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells()
+      throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 9 * ONEMB);
+
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+      ecb.read(buf);
+      List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+      Assert.assertEquals(3 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    }
+  }
+
+  @Test
+  public void testSimpleRead() throws IOException {
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+
+      ByteBuffer buf = ByteBuffer.allocate(100);
+
+      int read = ecb.read(buf);
+      Assert.assertEquals(100, read);
+      validateBufferContents(buf, 0, 100, (byte) 0);
+      Assert.assertEquals(100, ecb.getPos());
+    }
+    for (TestBlockInputStream s : streamProvider.getBlockStreams()) {
+      Assert.assertTrue(s.isClosed());
+    }
+  }
+
+  @Test
+  public void testReadPastEOF() throws IOException {
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 50);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, streamProvider)) {
+
+      ByteBuffer buf = ByteBuffer.allocate(100);
+
+      int read = ecb.read(buf);
+      Assert.assertEquals(50, read);
+      read = ecb.read(buf);
+      Assert.assertEquals(read, -1);
+    }
+  }
+
+  @Test
+  public void testReadCrossingMultipleECChunkBounds() throws IOException {
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, 100,
+        keyInfo, true, streamProvider)) {
+
+      // EC Chunk size is 100 and 3-2. Create a byte buffer to read 3.5 chunks,
+      // so 350
+      ByteBuffer buf = ByteBuffer.allocate(350);
+      int read = ecb.read(buf);
+      Assert.assertEquals(350, read);
+
+      validateBufferContents(buf, 0, 100, (byte) 0);
+      validateBufferContents(buf, 100, 200, (byte) 1);
+      validateBufferContents(buf, 200, 300, (byte) 2);
+      validateBufferContents(buf, 300, 350, (byte) 0);
+
+      buf.clear();
+      read = ecb.read(buf);
+      Assert.assertEquals(350, read);
+
+      validateBufferContents(buf, 0, 50, (byte) 0);
+      validateBufferContents(buf, 50, 150, (byte) 1);
+      validateBufferContents(buf, 150, 250, (byte) 2);
+      validateBufferContents(buf, 250, 350, (byte) 0);
+
+    }
+    for (TestBlockInputStream s : streamProvider.getBlockStreams()) {
+      Assert.assertTrue(s.isClosed());
+    }
+  }
+
+  private void validateBufferContents(ByteBuffer buf, int from, int to,
+      byte val) {
+    for (int i=from; i<to; i++){
+      Assert.assertEquals(val, buf.get(i));
+    }
+  }
+
+  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+      long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .setNodes(new ArrayList<>(dnMap.keySet()))
+        .setReplicaIndexes(dnMap)
+        .setReplicationConfig(repConf)
+        .build();
+
+    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+        .setBlockID(new BlockID(1, 1))
+        .setLength(blockLength)
+        .setOffset(0)
+        .setPipeline(pipeline)
+        .setPartNumber(0)
+        .build();
+    return keyInfo;
+  }
+
+  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+      int nodeCount, long blockLength) {
+    Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+    for (int i = 0; i < nodeCount; i++) {
+      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+    }
+
+    return createKeyInfo(repConf, blockLength, datanodes);
+  }
+
+  private static class TestBlockInputStreamProvider implements
+      BlockInputStreamProvider {
+
+    private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+
+    public List<TestBlockInputStream> getBlockStreams() {
+      return blockStreams;
+    }
+
+    @Override
+    public BlockInputStream provide(BlockID blockId, long blockLen,
+        Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+        boolean verifyChecksum) {
+      TestBlockInputStream stream = new TestBlockInputStream(blockId, blockLen,
+          pipeline, token, verifyChecksum, null, null,
+          (byte)blockStreams.size());
+      blockStreams.add(stream);
+      return stream;
+    }
+  }
+
+  private static class TestBlockInputStream extends BlockInputStream {
+
+    private long position = 0;
+    private boolean closed = false;
+    private byte dataVal = 1;
+    private static final byte EOF = -1;
+
+    @SuppressWarnings("checkstyle:parameternumber")
+    TestBlockInputStream(BlockID blockId, long blockLen,
+        Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+        boolean verifyChecksum, XceiverClientFactory xceiverClientFactory,
+        Function<BlockID, Pipeline> refreshPipelineFunction, byte dataVal) {
+      super(blockId, blockLen, pipeline, token, verifyChecksum,
+          xceiverClientFactory, refreshPipelineFunction);
+      this.dataVal = dataVal;
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    public long getRemaining() {
+      return getLength() - position;
+    }
+
+    @Override
+    public int read(ByteBuffer buf) {
+      if (getRemaining() == 0) {
+        return EOF;
+      }
+
+      int toRead = Math.min(buf.remaining(), (int)getRemaining());
+      for (int i=0; i<toRead; i++) {
+        buf.put(dataVal);
+      }
+      position += toRead;
+      return toRead;
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+    }
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to