This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 02c9efc HDDS-1491. Ozone KeyInputStream seek() should not read the
chunk file. (#795)
02c9efc is described below
commit 02c9efcb8174140c75d26dfd3ee0c280bde58fc8
Author: Hanisha Koneru <[email protected]>
AuthorDate: Mon May 13 20:49:52 2019 -0700
HDDS-1491. Ozone KeyInputStream seek() should not read the chunk file.
(#795)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 222 +++++++++++++++------
.../hdds/scm/storage/TestBlockInputStream.java | 193 ++++++++++++++++++
.../hadoop/hdds/scm/storage/package-info.java | 21 ++
.../hadoop/ozone/client/io/KeyInputStream.java | 153 +++++++++-----
.../ozone/client/rpc/TestKeyInputStream.java | 175 ++++++++++++++++
.../apache/hadoop/ozone/om/TestChunkStreams.java | 4 +-
6 files changed, 655 insertions(+), 113 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 6667163..bb4a5b0 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.storage;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -61,10 +62,18 @@ public class BlockInputStream extends InputStream
implements Seekable {
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private List<ChunkInfo> chunks;
+ // ChunkIndex points to the index current chunk in the buffers or the the
+ // index of chunk which will be read next into the buffers in
+ // readChunkFromContainer().
private int chunkIndex;
+ // ChunkIndexOfCurrentBuffer points to the index of chunk read into the
+ // buffers or index of the last chunk in the buffers. It is updated only
+ // when a new chunk is read from container into the buffers.
+ private int chunkIndexOfCurrentBuffer;
private long[] chunkOffset;
private List<ByteBuffer> buffers;
private int bufferIndex;
+ private long bufferPosition;
private final boolean verifyChecksum;
/**
@@ -76,24 +85,34 @@ public class BlockInputStream extends InputStream
implements Seekable {
* @param chunks list of chunks to read
* @param traceID container protocol call traceID
* @param verifyChecksum verify checksum
+ * @param initialPosition the initial position of the stream pointer. This
+ * position is seeked now if the up-stream was seeked
+ * before this was created.
*/
public BlockInputStream(
BlockID blockID, XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
- boolean verifyChecksum) {
+ boolean verifyChecksum, long initialPosition) throws IOException {
this.blockID = blockID;
this.traceID = traceID;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.chunks = chunks;
- this.chunkIndex = -1;
+ this.chunkIndex = 0;
+ this.chunkIndexOfCurrentBuffer = -1;
// chunkOffset[i] stores offset at which chunk i stores data in
// BlockInputStream
this.chunkOffset = new long[this.chunks.size()];
initializeChunkOffset();
this.buffers = null;
this.bufferIndex = 0;
+ this.bufferPosition = -1;
this.verifyChecksum = verifyChecksum;
+ if (initialPosition > 0) {
+ // The stream was seeked to a position before the stream was
+ // initialized. So seeking to the position now.
+ seek(initialPosition);
+ }
}
private void initializeChunkOffset() {
@@ -176,7 +195,7 @@ public class BlockInputStream extends InputStream
implements Seekable {
*
* @return true if EOF, false if more data is available
*/
- private boolean blockStreamEOF() {
+ protected boolean blockStreamEOF() {
if (buffersHaveData() || chunksRemaining()) {
return false;
} else {
@@ -223,12 +242,19 @@ public class BlockInputStream extends InputStream
implements Seekable {
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
+ if (!buffersAllocated()) {
+ // The current chunk at chunkIndex has not been read from the
+ // container. Read the chunk and put the data into buffers.
+ readChunkFromContainer();
+ }
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
} else if (chunksRemaining()) {
// There are additional chunks available.
+ // Read the next chunk in the block.
+ chunkIndex += 1;
readChunkFromContainer();
} else {
// All available input has been consumed.
@@ -237,26 +263,31 @@ public class BlockInputStream extends InputStream
implements Seekable {
}
}
- private boolean buffersHaveData() {
- boolean hasData = false;
-
+ private boolean buffersAllocated() {
if (buffers == null || buffers.isEmpty()) {
return false;
}
+ return true;
+ }
- while (bufferIndex < (buffers.size())) {
- if (buffers.get(bufferIndex).hasRemaining()) {
- // current buffer has data
- hasData = true;
- break;
- } else {
- if (buffersRemaining()) {
- // move to next available buffer
- ++bufferIndex;
- Preconditions.checkState(bufferIndex < buffers.size());
- } else {
- // no more buffers remaining
+ private boolean buffersHaveData() {
+ boolean hasData = false;
+
+ if (buffersAllocated()) {
+ while (bufferIndex < (buffers.size())) {
+ if (buffers.get(bufferIndex).hasRemaining()) {
+ // current buffer has data
+ hasData = true;
break;
+ } else {
+ if (buffersRemaining()) {
+ // move to next available buffer
+ ++bufferIndex;
+ Preconditions.checkState(bufferIndex < buffers.size());
+ } else {
+ // no more buffers remaining
+ break;
+ }
}
}
}
@@ -272,7 +303,14 @@ public class BlockInputStream extends InputStream
implements Seekable {
if ((chunks == null) || chunks.isEmpty()) {
return false;
}
- return (chunkIndex < (chunks.size() - 1));
+ // Check if more chunks are remaining in the stream after chunkIndex
+ if (chunkIndex < (chunks.size() - 1)) {
+ return true;
+ }
+ // ChunkIndex is the last chunk in the stream. Check if this chunk has
+ // been read from container or not. Return true if chunkIndex has not
+ // been read yet and false otherwise.
+ return chunkIndexOfCurrentBuffer != chunkIndex;
}
/**
@@ -283,34 +321,14 @@ public class BlockInputStream extends InputStream
implements Seekable {
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void readChunkFromContainer() throws IOException {
- // On every chunk read chunkIndex should be increased so as to read the
- // next chunk
- chunkIndex += 1;
- XceiverClientReply reply;
- ReadChunkResponseProto readChunkResponse = null;
+ // Read the chunk at chunkIndex
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
List<DatanodeDetails> excludeDns = null;
ByteString byteString;
- List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
+ List<DatanodeDetails> dnList = getDatanodeList();
while (true) {
- try {
- reply = ContainerProtocolCalls
- .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
- ContainerProtos.ContainerCommandResponseProto response;
- response = reply.getResponse().get();
- ContainerProtocolCalls.validateContainerResponse(response);
- readChunkResponse = response.getReadChunk();
- } catch (IOException e) {
- if (e instanceof StorageContainerException) {
- throw e;
- }
- throw new IOException("Unexpected OzoneException: " + e.toString(), e);
- } catch (ExecutionException | InterruptedException e) {
- throw new IOException(
- "Failed to execute ReadChunk command for chunk " + chunkInfo
- .getChunkName(), e);
- }
- byteString = readChunkResponse.getData();
+ List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
+ byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
try {
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
@@ -333,7 +351,7 @@ public class BlockInputStream extends InputStream
implements Seekable {
if (excludeDns == null) {
excludeDns = new ArrayList<>();
}
- excludeDns.addAll(reply.getDatanodes());
+ excludeDns.addAll(dnListFromReadChunkCall);
if (excludeDns.size() == dnList.size()) {
throw ioe;
}
@@ -342,6 +360,47 @@ public class BlockInputStream extends InputStream
implements Seekable {
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
+ chunkIndexOfCurrentBuffer = chunkIndex;
+
+ // The bufferIndex and position might need to be adjusted if seek() was
+ // called on the stream before. This needs to be done so that the buffer
+ // position can be advanced to the 'seeked' position.
+ adjustBufferIndex();
+ }
+
+ /**
+ * Send RPC call to get the chunk from the container.
+ */
+ @VisibleForTesting
+ protected ByteString readChunk(final ChunkInfo chunkInfo,
+ List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
+ throws IOException {
+ XceiverClientReply reply;
+ ReadChunkResponseProto readChunkResponse = null;
+ try {
+ reply = ContainerProtocolCalls
+ .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
+ ContainerProtos.ContainerCommandResponseProto response;
+ response = reply.getResponse().get();
+ ContainerProtocolCalls.validateContainerResponse(response);
+ readChunkResponse = response.getReadChunk();
+ dnListFromReply.addAll(reply.getDatanodes());
+ } catch (IOException e) {
+ if (e instanceof StorageContainerException) {
+ throw e;
+ }
+ throw new IOException("Unexpected OzoneException: " + e.toString(), e);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IOException(
+ "Failed to execute ReadChunk command for chunk " + chunkInfo
+ .getChunkName(), e);
+ }
+ return readChunkResponse.getData();
+ }
+
+ @VisibleForTesting
+ protected List<DatanodeDetails> getDatanodeList() {
+ return xceiverClient.getPipeline().getNodes();
}
@Override
@@ -352,9 +411,8 @@ public class BlockInputStream extends InputStream
implements Seekable {
throw new EOFException("EOF encountered pos: " + pos + " container key: "
+ blockID.getLocalID());
}
- if (chunkIndex == -1) {
- chunkIndex = Arrays.binarySearch(chunkOffset, pos);
- } else if (pos < chunkOffset[chunkIndex]) {
+
+ if (pos < chunkOffset[chunkIndex]) {
chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
} else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
.getLen()) {
@@ -368,40 +426,71 @@ public class BlockInputStream extends InputStream
implements Seekable {
// accordingly so that chunkIndex = insertionPoint - 1
chunkIndex = -chunkIndex -2;
}
- // adjust chunkIndex so that readChunkFromContainer reads the correct chunk
- chunkIndex -= 1;
- readChunkFromContainer();
- adjustBufferIndex(pos);
+
+ // The bufferPosition should be adjusted to account for the chunk offset
+ // of the chunk the the pos actually points to.
+ bufferPosition = pos - chunkOffset[chunkIndex];
+
+ // Check if current buffers correspond to the chunk index being seeked
+ // and if the buffers have any data.
+ if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
+ // Position the buffer to the seeked position.
+ adjustBufferIndex();
+ } else {
+ // Release the current buffers. The next readChunkFromContainer will
+ // read the required chunk and position the buffer to the seeked
+ // position.
+ releaseBuffers();
+ }
}
- private void adjustBufferIndex(long pos) {
- long tempOffest = chunkOffset[chunkIndex];
+ private void adjustBufferIndex() {
+ if (bufferPosition == -1) {
+ // The stream has not been seeked to a position. No need to adjust the
+ // buffer Index and position.
+ return;
+ }
+ // The bufferPosition is w.r.t the buffers for current chunk.
+ // Adjust the bufferIndex and position to the seeked position.
+ long tempOffest = 0;
for (int i = 0; i < buffers.size(); i++) {
- if (pos - tempOffest >= buffers.get(i).capacity()) {
+ if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
tempOffest += buffers.get(i).capacity();
} else {
bufferIndex = i;
break;
}
}
- buffers.get(bufferIndex).position((int) (pos - tempOffest));
+ buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
+ // Reset the bufferPosition as the seek() operation has been completed.
+ bufferPosition = -1;
}
@Override
public synchronized long getPos() throws IOException {
- if (chunkIndex == -1) {
- // no data consumed yet, a new stream OR after seek
- return 0;
- }
-
- if (blockStreamEOF()) {
+ // position = chunkOffset of current chunk (at chunkIndex) + position of
+ // the buffer corresponding to the chunk.
+ long bufferPos = 0;
+
+ if (bufferPosition >= 0) {
+ // seek has been called but the buffers were empty. Hence, the buffer
+ // position will be advanced after the buffers are filled.
+ // We return the chunkOffset + bufferPosition here as that will be the
+ // position of the buffer pointer after reading the chunk file.
+ bufferPos = bufferPosition;
+
+ } else if (blockStreamEOF()) {
// all data consumed, buffers have been released.
// get position from the chunk offset and chunk length of last chunk
- return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen();
+ bufferPos = chunks.get(chunkIndex).getLen();
+
+ } else if (buffersAllocated()) {
+ // get position from available buffers of current chunk
+ bufferPos = buffers.get(bufferIndex).position();
+
}
- // get position from available buffers of current chunk
- return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
+ return chunkOffset[chunkIndex] + bufferPos;
}
@Override
@@ -412,4 +501,9 @@ public class BlockInputStream extends InputStream
implements Seekable {
public BlockID getBlockID() {
return blockID;
}
+
+ @VisibleForTesting
+ protected int getChunkIndex() {
+ return chunkIndex;
+ }
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
new file mode 100644
index 0000000..35c1022
--- /dev/null
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ChecksumData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ChecksumType;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests {@link BlockInputStream}.
+ */
+public class TestBlockInputStream {
+
+ private static BlockInputStream blockInputStream;
+ private static List<ChunkInfo> chunks;
+ private static int blockSize;
+
+ private static final int CHUNK_SIZE = 20;
+
+ @Before
+ public void setup() throws Exception {
+ BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+ chunks = createChunkList(10);
+ String traceID = UUID.randomUUID().toString();
+ blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks,
+ traceID, false, 0);
+
+ blockSize = 0;
+ for (ChunkInfo chunk : chunks) {
+ blockSize += chunk.getLen();
+ }
+ }
+
+ /**
+ * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
+ * and the last chunk with length CHUNK_SIZE/2.
+ * @param numChunks
+ * @return
+ */
+ private static List<ChunkInfo> createChunkList(int numChunks) {
+ ChecksumData dummyChecksumData = ChecksumData.newBuilder()
+ .setType(ChecksumType.NONE)
+ .setBytesPerChecksum(100)
+ .build();
+ List<ChunkInfo> chunkList = new ArrayList<>(numChunks);
+ int i;
+ for (i = 0; i < numChunks - 1; i++) {
+ String chunkName = "chunk-" + i;
+ ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+ .setChunkName(chunkName)
+ .setOffset(0)
+ .setLen(CHUNK_SIZE)
+ .setChecksumData(dummyChecksumData)
+ .build();
+ chunkList.add(chunkInfo);
+ }
+ ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+ .setChunkName("chunk-" + i)
+ .setOffset(0)
+ .setLen(CHUNK_SIZE/2)
+ .setChecksumData(dummyChecksumData)
+ .build();
+ chunkList.add(chunkInfo);
+
+ return chunkList;
+ }
+
+ /**
+ * A dummy BlockInputStream to test the functionality of BlockInputStream.
+ */
+ private static class DummyBlockInputStream extends BlockInputStream {
+
+ DummyBlockInputStream(BlockID blockID,
+ XceiverClientManager xceiverClientManager,
+ XceiverClientSpi xceiverClient,
+ List<ChunkInfo> chunks,
+ String traceID,
+ boolean verifyChecksum,
+ long initialPosition) throws IOException {
+ super(blockID, xceiverClientManager, xceiverClient, chunks, traceID,
+ verifyChecksum, initialPosition);
+ }
+
+ @Override
+ protected ByteString readChunk(final ChunkInfo chunkInfo,
+ List<DatanodeDetails> excludeDns, List<DatanodeDetails>
dnListFromReply)
+ throws IOException {
+ return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
+ }
+
+ @Override
+ protected List<DatanodeDetails> getDatanodeList() {
+ // return an empty dummy list of size 10
+ return new ArrayList<>(10);
+ }
+
+ /**
+ * Create ByteString with the input data to return when a readChunk call is
+ * placed.
+ */
+ private static ByteString getByteString(String data, int length) {
+ while (data.length() < length) {
+ data = data + "0";
+ }
+ return ByteString.copyFrom(data.getBytes(), 0, length);
+ }
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ // Seek to position 0
+ int pos = 0;
+ seekAndVerify(pos);
+ Assert.assertEquals("ChunkIndex is incorrect", 0,
+ blockInputStream.getChunkIndex());
+
+ pos = CHUNK_SIZE;
+ seekAndVerify(pos);
+ Assert.assertEquals("ChunkIndex is incorrect", 1,
+ blockInputStream.getChunkIndex());
+
+ pos = (CHUNK_SIZE * 5) + 5;
+ seekAndVerify(pos);
+ Assert.assertEquals("ChunkIndex is incorrect", 5,
+ blockInputStream.getChunkIndex());
+
+ try {
+ // Try seeking beyond the blockSize.
+ pos = blockSize + 10;
+ seekAndVerify(pos);
+ Assert.fail("Seek to position beyond block size should fail.");
+ } catch (EOFException e) {
+ // Expected
+ }
+
+ // Seek to random positions between 0 and the block size.
+ Random random = new Random();
+ for (int i = 0; i < 10; i++) {
+ pos = random.nextInt(blockSize);
+ seekAndVerify(pos);
+ }
+ }
+
+ @Test
+ public void testBlockEOF() throws Exception {
+ // Seek to some position < blockSize and verify EOF is not reached.
+ seekAndVerify(CHUNK_SIZE);
+ Assert.assertFalse(blockInputStream.blockStreamEOF());
+
+ // Seek to blockSize-1 and verify that EOF is not reached as the chunk
+ // has not been read from container yet.
+ seekAndVerify(blockSize-1);
+ Assert.assertFalse(blockInputStream.blockStreamEOF());
+ }
+
+ private void seekAndVerify(int pos) throws Exception {
+ blockInputStream.seek(pos);
+ Assert.assertEquals("Current position of buffer does not match with the " +
+ "seeked position", pos, blockInputStream.getPos());
+ }
+}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java
new file mode 100644
index 0000000..abdd04e
--- /dev/null
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+/**
+ * This package contains Ozone InputStream related tests.
+ */
+package org.apache.hadoop.hdds.scm.storage;
\ No newline at end of file
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 3a92e01..5b63420 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -84,11 +84,28 @@ public class KeyInputStream extends InputStream implements
Seekable {
* @param streamLength the max number of bytes that should be written to this
* stream.
*/
+ @VisibleForTesting
public synchronized void addStream(BlockInputStream stream,
long streamLength) {
streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
}
+ /**
+ * Append another ChunkInputStreamEntry to the end of the list.
+ * The stream will be constructed from the input information when it needs
+ * to be accessed.
+ */
+ private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo,
+ XceiverClientManager xceiverClientMngr, String clientRequestId,
+ boolean verifyChecksum) {
+ streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo,
+ xceiverClientMngr, clientRequestId, verifyChecksum));
+ }
+
+ private synchronized ChunkInputStreamEntry getStreamEntry(int index)
+ throws IOException {
+ return streamEntries.get(index).getStream();
+ }
@Override
public synchronized int read() throws IOException {
@@ -120,7 +137,7 @@ public class KeyInputStream extends InputStream implements
Seekable {
.getRemaining() == 0)) {
return totalReadLen == 0 ? EOF : totalReadLen;
}
- ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
+ ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex);
int numBytesToRead = Math.min(len, (int)current.getRemaining());
int numBytesRead = current.read(b, off, numBytesToRead);
if (numBytesRead != numBytesToRead) {
@@ -212,13 +229,81 @@ public class KeyInputStream extends InputStream
implements Seekable {
public static class ChunkInputStreamEntry extends InputStream
implements Seekable {
- private final BlockInputStream blockInputStream;
+ private BlockInputStream blockInputStream;
+ private final OmKeyLocationInfo blockLocationInfo;
private final long length;
+ private final XceiverClientManager xceiverClientManager;
+ private final String requestId;
+ private boolean verifyChecksum;
+
+ // the position of the blockInputStream is maintained by this variable
+ // till the stream is initialized
+ private long position;
+
+ public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo,
+ XceiverClientManager xceiverClientMngr, String clientRequestId,
+ boolean verifyChecksum) {
+ this.blockLocationInfo = omKeyLocationInfo;
+ this.length = omKeyLocationInfo.getLength();
+ this.xceiverClientManager = xceiverClientMngr;
+ this.requestId = clientRequestId;
+ this.verifyChecksum = verifyChecksum;
+ }
+ @VisibleForTesting
public ChunkInputStreamEntry(BlockInputStream blockInputStream,
long length) {
this.blockInputStream = blockInputStream;
this.length = length;
+ this.blockLocationInfo = null;
+ this.xceiverClientManager = null;
+ this.requestId = null;
+ }
+
+ private ChunkInputStreamEntry getStream() throws IOException {
+ if (this.blockInputStream == null) {
+ initializeBlockInputStream();
+ }
+ return this;
+ }
+
+ private void initializeBlockInputStream() throws IOException {
+ BlockID blockID = blockLocationInfo.getBlockID();
+ long containerID = blockID.getContainerID();
+ Pipeline pipeline = blockLocationInfo.getPipeline();
+
+ // irrespective of the container state, we will always read via
Standalone
+ // protocol.
+ if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+ pipeline = Pipeline.newBuilder(pipeline)
+ .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
+ }
+ XceiverClientSpi xceiverClient = xceiverClientManager
+ .acquireClient(pipeline);
+ boolean success = false;
+ long containerKey = blockLocationInfo.getLocalID();
+ try {
+ LOG.debug("Initializing stream for get key to access {} {}",
+ containerID, containerKey);
+ ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
+ .getDatanodeBlockIDProtobuf();
+ if (blockLocationInfo.getToken() != null) {
+ UserGroupInformation.getCurrentUser().
+ addToken(blockLocationInfo.getToken());
+ }
+ ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
+ .getBlock(xceiverClient, datanodeBlockID, requestId);
+ List<ContainerProtos.ChunkInfo> chunks =
+ response.getBlockData().getChunksList();
+ success = true;
+ this.blockInputStream = new BlockInputStream(
+ blockLocationInfo.getBlockID(), xceiverClientManager,
xceiverClient,
+ chunks, requestId, verifyChecksum, position);
+ } finally {
+ if (!success) {
+ xceiverClientManager.releaseClient(xceiverClient, false);
+ }
+ }
}
synchronized long getRemaining() throws IOException {
@@ -240,17 +325,27 @@ public class KeyInputStream extends InputStream
implements Seekable {
@Override
public synchronized void close() throws IOException {
- blockInputStream.close();
+ if (blockInputStream != null) {
+ blockInputStream.close();
+ }
}
@Override
public void seek(long pos) throws IOException {
- blockInputStream.seek(pos);
+ if (blockInputStream != null) {
+ blockInputStream.seek(pos);
+ } else {
+ position = pos;
+ }
}
@Override
public long getPos() throws IOException {
- return blockInputStream.getPos();
+ if (blockInputStream != null) {
+ return blockInputStream.getPos();
+ } else {
+ return position;
+ }
}
@Override
@@ -266,7 +361,6 @@ public class KeyInputStream extends InputStream implements
Seekable {
storageContainerLocationClient,
String requestId, boolean verifyChecksum) throws IOException {
long length = 0;
- long containerKey;
KeyInputStream groupInputStream = new KeyInputStream();
groupInputStream.key = keyInfo.getKeyName();
List<OmKeyLocationInfo> keyLocationInfos =
@@ -274,48 +368,13 @@ public class KeyInputStream extends InputStream
implements Seekable {
groupInputStream.streamOffset = new long[keyLocationInfos.size()];
for (int i = 0; i < keyLocationInfos.size(); i++) {
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
- BlockID blockID = omKeyLocationInfo.getBlockID();
- long containerID = blockID.getContainerID();
- Pipeline pipeline = omKeyLocationInfo.getPipeline();
+ LOG.debug("Adding stream for accessing {}. The stream will be " +
+ "initialized later.", omKeyLocationInfo);
+ groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager,
+ requestId, verifyChecksum);
- // irrespective of the container state, we will always read via
Standalone
- // protocol.
- if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
- pipeline = Pipeline.newBuilder(pipeline)
- .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
- }
- XceiverClientSpi xceiverClient = xceiverClientManager
- .acquireClient(pipeline);
- boolean success = false;
- containerKey = omKeyLocationInfo.getLocalID();
- try {
- LOG.debug("get key accessing {} {}",
- containerID, containerKey);
- groupInputStream.streamOffset[i] = length;
- ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
- .getDatanodeBlockIDProtobuf();
- if (omKeyLocationInfo.getToken() != null) {
- UserGroupInformation.getCurrentUser().
- addToken(omKeyLocationInfo.getToken());
- }
- ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
- .getBlock(xceiverClient, datanodeBlockID, requestId);
- List<ContainerProtos.ChunkInfo> chunks =
- response.getBlockData().getChunksList();
- for (ContainerProtos.ChunkInfo chunk : chunks) {
- length += chunk.getLen();
- }
- success = true;
- BlockInputStream inputStream = new BlockInputStream(
- omKeyLocationInfo.getBlockID(), xceiverClientManager,
xceiverClient,
- chunks, requestId, verifyChecksum);
- groupInputStream.addStream(inputStream,
- omKeyLocationInfo.getLength());
- } finally {
- if (!success) {
- xceiverClientManager.releaseClient(xceiverClient, false);
- }
- }
+ groupInputStream.streamOffset[i] = length;
+ length += omKeyLocationInfo.getLength();
}
groupInputStream.length = length;
return new LengthInputStream(groupInputStream, length);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
new file mode 100644
index 0000000..fa8a289
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests {@link KeyInputStream}.
+ */
+public class TestKeyInputStream {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf = new OzoneConfiguration();
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static int flushSize;
+ private static int maxFlushSize;
+ private static int blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ chunkSize = 100;
+ flushSize = 4 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(3)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "test-key-input-stream-volume";
+ bucketName = "test-key-input-stream-bucket";
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private String getKeyName() {
+ return UUID.randomUUID().toString();
+ }
+
+ private OzoneOutputStream createKey(String keyName, ReplicationType type,
+ long size) throws Exception {
+ return ContainerTestHelper
+ .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ XceiverClientMetrics metrics = XceiverClientManager
+ .getXceiverClientMetrics();
+ long writeChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.WriteChunk);
+ long readChunkCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.ReadChunk);
+
+ String keyName = getKeyName();
+ OzoneOutputStream key = ContainerTestHelper.createKey(keyName,
+ ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+ // write data spanning 3 chunks
+ int dataLength = (2 * chunkSize) + (chunkSize / 2);
+ byte[] inputData = ContainerTestHelper.getFixedLengthString(
+ keyString, dataLength).getBytes(UTF_8);
+ key.write(inputData);
+ key.close();
+
+ Assert.assertEquals(writeChunkCount + 3,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+
+ KeyInputStream keyInputStream = (KeyInputStream) objectStore
+ .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+ .getInputStream();
+
+ // Seek to position 150
+ keyInputStream.seek(150);
+
+ Assert.assertEquals(150, keyInputStream.getPos());
+
+ // Seek operation should not result in any readChunk operation.
+ Assert.assertEquals(readChunkCount, metrics
+ .getContainerOpsMetrics(ContainerProtos.Type.ReadChunk));
+ Assert.assertEquals(readChunkCount, metrics
+ .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+ byte[] readData = new byte[chunkSize];
+ keyInputStream.read(readData, 0, chunkSize);
+
+ // Since we reading data from index 150 to 250 and the chunk boundary is
+ // 100 bytes, we need to read 2 chunks.
+ Assert.assertEquals(readChunkCount + 2,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+ keyInputStream.close();
+
+ // Verify that the data read matches with the input data at corresponding
+ // indices.
+ for (int i = 0; i < chunkSize; i++) {
+ Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index e4e449b..45f04df 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -49,7 +49,7 @@ public class TestChunkStreams {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, null, null, new ArrayList<>(), null,
- true) {
+ true, 0) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@@ -106,7 +106,7 @@ public class TestChunkStreams {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, null, null, new ArrayList<>(), null,
- true) {
+ true, 0) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]