This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f52c16620fe HDDS-13973. The ground work to support stream read block
(#9342)
f52c16620fe is described below
commit f52c16620fe3a0032824ea0aa3a976f1e84bee07
Author: Chung En Lee <[email protected]>
AuthorDate: Wed Nov 26 04:14:31 2025 +0800
HDDS-13973. The ground work to support stream read block (#9342)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 19 +-
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 73 +-
.../hdds/scm/storage/StreamBlockInputStream.java | 747 +++++++++++++++++++++
.../client/io/BlockInputStreamFactoryImpl.java | 20 +
.../scm/storage/DummyStreamBlockInputStream.java | 143 ++++
.../scm/storage/TestStreamBlockInputStream.java | 264 ++++++++
.../client/io/TestBlockInputStreamFactoryImpl.java | 15 +-
.../org/apache/hadoop/hdds/DatanodeVersion.java | 2 +
.../java/org/apache/hadoop/hdds/HddsUtils.java | 7 +
.../ContainerCommandResponseBuilders.java | 17 +
.../hdds/scm/storage/ContainerProtocolCalls.java | 56 ++
.../org/apache/hadoop/ozone/audit/DNAction.java | 3 +-
.../container/common/impl/HddsDispatcher.java | 79 +++
.../common/interfaces/ContainerDispatcher.java | 12 +
.../ozone/container/common/interfaces/Handler.java | 7 +
.../transport/server/GrpcXceiverService.java | 8 +-
.../transport/server/ratis/DispatcherContext.java | 7 +
.../ozone/container/keyvalue/KeyValueHandler.java | 115 +++-
.../container/keyvalue/TestKeyValueHandler.java | 93 +++
.../src/main/proto/DatanodeClientProtocol.proto | 16 +
.../rpc/read/TestStreamBlockInputStream.java | 245 +++++++
21 files changed, 1937 insertions(+), 11 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 5dc44f4d4ec..7329f2c16b7 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -113,6 +113,13 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private long streamBufferMaxSize = 32 * 1024 * 1024;
+ @Config(key = "stream.readblock.enable",
+ defaultValue = "false",
+ type = ConfigType.BOOLEAN,
+ description = "Allow ReadBlock to stream all the readChunk in one
request.",
+ tags = ConfigTag.CLIENT)
+ private boolean streamReadBlock = false;
+
@Config(key = "ozone.client.max.retries",
defaultValue = "5",
description = "Maximum number of retries by Ozone Client on "
@@ -151,7 +158,7 @@ public class OzoneClientConfig {
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
+ "determines which algorithm would be used to compute checksum for "
+ "chunk data. Default checksum type is CRC32.",
- tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
+ tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
private String checksumType = ChecksumType.CRC32.name();
@Config(key = "ozone.client.bytes.per.checksum",
@@ -160,7 +167,7 @@ public class OzoneClientConfig {
description = "Checksum will be computed for every bytes per checksum "
+ "number of bytes and stored sequentially. The minimum value for "
+ "this config is 8KB.",
- tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
+ tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
private int bytesPerChecksum = 16 * 1024;
@Config(key = "ozone.client.verify.checksum",
@@ -538,6 +545,14 @@ public int getMaxConcurrentWritePerKey() {
return this.maxConcurrentWritePerKey;
}
+ public boolean isStreamReadBlock() {
+ return streamReadBlock;
+ }
+
+ public void setStreamReadBlock(boolean streamReadBlock) {
+ this.streamReadBlock = streamReadBlock;
+ }
+
/**
* Enum for indicating what mode to use when combining chunk and block
* checksums to define an aggregate FileChecksum. This should be considered
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index b07cee4097c..09e01593feb 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -44,6 +44,8 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -451,7 +453,11 @@ private XceiverClientReply sendCommandWithRetry(
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
- responseProto = sendCommandAsync(request, dn).getResponse().get();
+ if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
+ responseProto = sendCommandReadBlock(request,
dn).getResponse().get();
+ } else {
+ responseProto = sendCommandAsync(request, dn).getResponse().get();
+ }
if (validators != null && !validators.isEmpty()) {
for (Validator validator : validators) {
validator.accept(request, responseProto);
@@ -495,7 +501,7 @@ private XceiverClientReply sendCommandWithRetry(
String message = "Failed to execute command {}";
if (LOG.isDebugEnabled()) {
LOG.debug(message + " on the pipeline {}.",
- processForDebug(request), pipeline);
+ processForDebug(request), pipeline);
} else {
LOG.warn(message + " on the pipeline {}.",
request.getCmdType(), pipeline);
@@ -623,6 +629,69 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
return new XceiverClientReply(replyFuture);
}
+ public XceiverClientReply sendCommandReadBlock(
+ ContainerCommandRequestProto request, DatanodeDetails dn)
+ throws IOException, InterruptedException {
+
+ CompletableFuture<ContainerCommandResponseProto> future =
+ new CompletableFuture<>();
+ ContainerCommandResponseProto.Builder response =
+ ContainerCommandResponseProto.newBuilder();
+ ContainerProtos.ReadBlockResponseProto.Builder readBlock =
+ ContainerProtos.ReadBlockResponseProto.newBuilder();
+ checkOpen(dn);
+ DatanodeID dnId = dn.getID();
+ Type cmdType = request.getCmdType();
+ semaphore.acquire();
+ long requestTime = System.currentTimeMillis();
+ metrics.incrPendingContainerOpsMetrics(cmdType);
+
+ final StreamObserver<ContainerCommandRequestProto> requestObserver =
+ asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
+ .send(new StreamObserver<ContainerCommandResponseProto>() {
+ @Override
+ public void onNext(
+ ContainerCommandResponseProto responseProto) {
+ if (responseProto.getResult() == Result.SUCCESS) {
+ readBlock.addReadChunk(responseProto.getReadChunk());
+ } else {
+ future.complete(
+ ContainerCommandResponseProto.newBuilder(responseProto)
+ .setCmdType(Type.ReadBlock).build());
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ future.completeExceptionally(t);
+ metrics.decrPendingContainerOpsMetrics(cmdType);
+ metrics.addContainerOpsLatency(
+ cmdType, Time.monotonicNow() - requestTime);
+ semaphore.release();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (readBlock.getReadChunkCount() > 0) {
+ future.complete(response.setReadBlock(readBlock)
+
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
+ }
+ if (!future.isDone()) {
+ future.completeExceptionally(new IOException(
+ "Stream completed but no reply for request " +
+ processForDebug(request)));
+ }
+ metrics.decrPendingContainerOpsMetrics(cmdType);
+ metrics.addContainerOpsLatency(
+ cmdType, System.currentTimeMillis() - requestTime);
+ semaphore.release();
+ }
+ });
+ requestObserver.onNext(request);
+ requestObserver.onCompleted();
+ return new XceiverClientReply(future);
+ }
+
private synchronized void checkOpen(DatanodeDetails dn)
throws IOException {
if (closed) {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
new file mode 100644
index 00000000000..3db7fd8f660
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link java.io.InputStream} called from KeyInputStream to read a block
from the
+ * container.
+ */
+public class StreamBlockInputStream extends BlockExtendedInputStream
+ implements Seekable, CanUnbuffer, ByteBufferReadable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamBlockInputStream.class);
+ private final BlockID blockID;
+ private final long blockLength;
+ private final AtomicReference<Pipeline> pipelineRef =
+ new AtomicReference<>();
+ private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
+ new AtomicReference<>();
+ private XceiverClientFactory xceiverClientFactory;
+ private XceiverClientSpi xceiverClient;
+
+ private List<Long> bufferOffsets;
+ private int bufferIndex;
+ private long blockPosition = -1;
+ private List<ByteBuffer> buffers;
+ // Checks if the StreamBlockInputStream has already read data from the
container.
+ private boolean allocated = false;
+ private long bufferOffsetWrtBlockData;
+ private long buffersSize;
+ private static final int EOF = -1;
+ private final List<XceiverClientSpi.Validator> validators;
+ private final boolean verifyChecksum;
+ private final Function<BlockID, BlockLocationInfo> refreshFunction;
+ private final RetryPolicy retryPolicy;
+ private int retries;
+
+ public StreamBlockInputStream(
+ BlockID blockID, long length, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token,
+ XceiverClientFactory xceiverClientFactory,
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config) throws IOException {
+ this.blockID = blockID;
+ this.blockLength = length;
+ setPipeline(pipeline);
+ tokenRef.set(token);
+ this.xceiverClientFactory = xceiverClientFactory;
+ this.validators = ContainerProtocolCalls.toValidatorList(
+ (request, response) -> validateBlock(response));
+ this.verifyChecksum = config.isChecksumVerify();
+ this.refreshFunction = refreshFunction;
+ this.retryPolicy =
+ HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
+ TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
+
+ }
+
+ @Override
+ public BlockID getBlockID() {
+ return blockID;
+ }
+
+ @Override
+ public long getLength() {
+ return blockLength;
+ }
+
+ @Override
+ public synchronized long getPos() {
+ if (blockLength == 0) {
+ return 0;
+ }
+ if (blockPosition >= 0) {
+ return blockPosition;
+ }
+
+ if (buffersHaveData()) {
+ // BufferOffset w.r.t to BlockData + BufferOffset w.r.t buffers +
+ // Position of current Buffer
+ return bufferOffsetWrtBlockData + bufferOffsets.get(bufferIndex) +
+ buffers.get(bufferIndex).position();
+ }
+ if (allocated && !dataRemainingInBlock()) {
+ Preconditions.checkState(
+ bufferOffsetWrtBlockData + buffersSize == blockLength,
+ "EOF detected but not at the last byte of the chunk");
+ return blockLength;
+ }
+ if (buffersAllocated()) {
+ return bufferOffsetWrtBlockData + buffersSize;
+ }
+ return 0;
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ int dataout = EOF;
+ int len = 1;
+ int available;
+ while (len > 0) {
+ try {
+ acquireClient();
+ available = prepareRead(1);
+ retries = 0;
+ } catch (SCMSecurityException ex) {
+ throw ex;
+ } catch (StorageContainerException e) {
+ handleStorageContainerException(e);
+ continue;
+ } catch (IOException ioe) {
+ handleIOException(ioe);
+ continue;
+ }
+ if (available == EOF) {
+ // There is no more data in the chunk stream. The buffers should have
+ // been released by now
+ Preconditions.checkState(buffers == null);
+ } else {
+ dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+ }
+
+ len -= available;
+ if (bufferEOF()) {
+ releaseBuffers(bufferIndex);
+ }
+ }
+
+
+ return dataout;
+
+
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ // According to the JavaDocs for InputStream, it is recommended that
+ // subclasses provide an override of bulk read if possible for performance
+ // reasons. In addition to performance, we need to do it for correctness
+ // reasons. The Ozone REST service uses PipedInputStream and
+ // PipedOutputStream to relay HTTP response data between a Jersey thread
and
+ // a Netty thread. It turns out that PipedInputStream/PipedOutputStream
+ // have a subtle dependency (bug?) on the wrapped stream providing separate
+ // implementations of single-byte read and bulk read. Without this, get
key
+ // responses might close the connection before writing all of the bytes
+ // advertised in the Content-Length.
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return 0;
+ }
+ int total = 0;
+ int available;
+ while (len > 0) {
+ try {
+ acquireClient();
+ available = prepareRead(len);
+ retries = 0;
+ } catch (SCMSecurityException ex) {
+ throw ex;
+ } catch (StorageContainerException e) {
+ handleStorageContainerException(e);
+ continue;
+ } catch (IOException ioe) {
+ handleIOException(ioe);
+ continue;
+ }
+ if (available == EOF) {
+ // There is no more data in the block stream. The buffers should have
+ // been released by now
+ Preconditions.checkState(buffers == null);
+ return total != 0 ? total : EOF;
+ }
+ buffers.get(bufferIndex).get(b, off + total, available);
+ len -= available;
+ total += available;
+
+ if (bufferEOF()) {
+ releaseBuffers(bufferIndex);
+ }
+ }
+ return total;
+
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+ if (byteBuffer == null) {
+ throw new NullPointerException();
+ }
+ int len = byteBuffer.remaining();
+ if (len == 0) {
+ return 0;
+ }
+ int total = 0;
+ int available;
+ while (len > 0) {
+ try {
+ acquireClient();
+ available = prepareRead(len);
+ retries = 0;
+ } catch (SCMSecurityException ex) {
+ throw ex;
+ } catch (StorageContainerException e) {
+ handleStorageContainerException(e);
+ continue;
+ } catch (IOException ioe) {
+ handleIOException(ioe);
+ continue;
+ }
+ if (available == EOF) {
+ // There is no more data in the block stream. The buffers should have
+ // been released by now
+ Preconditions.checkState(buffers == null);
+ return total != 0 ? total : EOF;
+ }
+ ByteBuffer readBuf = buffers.get(bufferIndex);
+ ByteBuffer tmpBuf = readBuf.duplicate();
+ tmpBuf.limit(tmpBuf.position() + available);
+ byteBuffer.put(tmpBuf);
+ readBuf.position(tmpBuf.position());
+
+ len -= available;
+ total += available;
+
+ if (bufferEOF()) {
+ releaseBuffers(bufferIndex);
+ }
+ }
+ return total;
+ }
+
+ @Override
+ protected int readWithStrategy(ByteReaderStrategy strategy) throws
IOException {
+ throw new NotImplementedException("readWithStrategy is not implemented.");
+ }
+
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ if (pos == 0 && blockLength == 0) {
+ // It is possible for length and pos to be zero in which case
+ // seek should return instead of throwing exception
+ return;
+ }
+ if (pos < 0 || pos > blockLength) {
+ throw new EOFException("EOF encountered at pos: " + pos + " for block: "
+ blockID);
+ }
+
+ if (buffersHavePosition(pos)) {
+ // The bufferPosition is w.r.t the current block.
+ // Adjust the bufferIndex and position to the seeked position.
+ adjustBufferPosition(pos - bufferOffsetWrtBlockData);
+ } else {
+ blockPosition = pos;
+ }
+ }
+
+ @Override
+ public synchronized boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void unbuffer() {
+ blockPosition = getPos();
+ releaseClient();
+ releaseBuffers();
+ }
+
+ private void setPipeline(Pipeline pipeline) throws IOException {
+ if (pipeline == null) {
+ return;
+ }
+ long replicaIndexes =
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
+
+ if (replicaIndexes > 1) {
+ throw new IOException(String.format("Pipeline: %s has nodes containing
different replica indexes.",
+ pipeline));
+ }
+
+ // irrespective of the container state, we will always read via Standalone
+ // protocol.
+ boolean okForRead =
+ pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
+ || pipeline.getType() == HddsProtos.ReplicationType.EC;
+ Pipeline readPipeline = okForRead ? pipeline :
pipeline.copyForRead().toBuilder()
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+ getLegacyFactor(pipeline.getReplicationConfig())))
+ .build();
+ pipelineRef.set(readPipeline);
+ }
+
+ protected synchronized void checkOpen() throws IOException {
+ if (xceiverClientFactory == null) {
+ throw new IOException("StreamBlockInputStream has been closed.");
+ }
+ }
+
+ protected synchronized void acquireClient() throws IOException {
+ checkOpen();
+ if (xceiverClient == null) {
+ final Pipeline pipeline = pipelineRef.get();
+ try {
+ xceiverClient =
xceiverClientFactory.acquireClientForReadData(pipeline);
+ } catch (IOException ioe) {
+ LOG.warn("Failed to acquire client for pipeline {}, block {}",
+ pipeline, blockID);
+ throw ioe;
+ }
+ }
+ }
+
+ private synchronized int prepareRead(int len) throws IOException {
+ for (;;) {
+ if (blockPosition >= 0) {
+ if (buffersHavePosition(blockPosition)) {
+ // The current buffers have the seeked position. Adjust the buffer
+ // index and position to point to the buffer position.
+ adjustBufferPosition(blockPosition - bufferOffsetWrtBlockData);
+ } else {
+ // Read a required block data to fill the buffers with seeked
+ // position data
+ readDataFromContainer(len);
+ }
+ }
+ if (buffersHaveData()) {
+ // Data is available from buffers
+ ByteBuffer bb = buffers.get(bufferIndex);
+ return Math.min(len, bb.remaining());
+ } else if (dataRemainingInBlock()) {
+ // There is more data in the block stream which has not
+ // been read into the buffers yet.
+ readDataFromContainer(len);
+ } else {
+ // All available input from this block stream has been consumed.
+ return EOF;
+ }
+ }
+
+
+ }
+
+ private boolean buffersHavePosition(long pos) {
+ // Check if buffers have been allocated
+ if (buffersAllocated()) {
+ // Check if the current buffers cover the input position
+ // Released buffers should not be considered when checking if position
+ // is available
+ return pos >= bufferOffsetWrtBlockData +
+ bufferOffsets.get(0) &&
+ pos < bufferOffsetWrtBlockData + buffersSize;
+ }
+ return false;
+ }
+
+ /**
+ * Check if the buffers have been allocated data and false otherwise.
+ */
+ @VisibleForTesting
+ protected boolean buffersAllocated() {
+ return buffers != null && !buffers.isEmpty();
+ }
+
+ /**
+ * Adjust the buffers position to account for seeked position and/ or
checksum
+ * boundary reads.
+ * @param bufferPosition the position to which the buffers must be advanced
+ */
+ private void adjustBufferPosition(long bufferPosition) {
+ // The bufferPosition is w.r.t the current buffers.
+ // Adjust the bufferIndex and position to the seeked bufferPosition.
+ bufferIndex = Collections.binarySearch(bufferOffsets, bufferPosition);
+ // bufferIndex is negative if bufferPosition isn't found in bufferOffsets
+ // count (bufferIndex = -bufferIndex - 2) to get bufferPosition is between
which offsets.
+ if (bufferIndex < 0) {
+ bufferIndex = -bufferIndex - 2;
+ }
+
+ buffers.get(bufferIndex).position(
+ (int) (bufferPosition - bufferOffsets.get(bufferIndex)));
+
+ // Reset buffers > bufferIndex to position 0. We do this to reset any
+ // previous reads/ seeks which might have updated any buffer position.
+ // For buffers < bufferIndex, we do not need to reset the position as it
+ // not required for this read. If a seek was done to a position in the
+ // previous indices, the buffer position reset would be performed in the
+ // seek call.
+ for (int i = bufferIndex + 1; i < buffers.size(); i++) {
+ buffers.get(i).position(0);
+ }
+
+ // Reset the blockPosition as chunk stream has been initialized i.e. the
+ // buffers have been allocated.
+ blockPosition = -1;
+ }
+
+ /**
+ * Reads full or partial Chunk from DN Container based on the current
+ * position of the ChunkInputStream, the number of bytes of data to read
+ * and the checksum boundaries.
+ * If successful, then the read data in saved in the buffers so that
+ * subsequent read calls can utilize it.
+ * @param len number of bytes of data to be read
+ * @throws IOException if there is an I/O error while performing the call
+ * to Datanode
+ */
+ private synchronized void readDataFromContainer(int len) throws IOException {
+ // index of first byte to be read from the block
+ long startByteIndex;
+ if (blockPosition >= 0) {
+ // If seek operation was called to advance the buffer position, the
+ // chunk should be read from that position onwards.
+ startByteIndex = blockPosition;
+ } else {
+ // Start reading the block from the last blockPosition onwards.
+ startByteIndex = bufferOffsetWrtBlockData + buffersSize;
+ }
+
+ // bufferOffsetWrtChunkData and buffersSize are updated after the data
+ // is read from Container and put into the buffers, but if read fails
+ // and is retried, we need the previous position. Position is reset after
+ // successful read in adjustBufferPosition()
+ blockPosition = getPos();
+ bufferOffsetWrtBlockData = readData(startByteIndex, len);
+ long tempOffset = 0L;
+ buffersSize = 0L;
+ bufferOffsets = new ArrayList<>(buffers.size());
+ for (ByteBuffer buffer : buffers) {
+ bufferOffsets.add(tempOffset);
+ tempOffset += buffer.limit();
+ buffersSize += buffer.limit();
+
+ }
+ bufferIndex = 0;
+ allocated = true;
+ adjustBufferPosition(startByteIndex - bufferOffsetWrtBlockData);
+
+ }
+
+ @VisibleForTesting
+ protected long readData(long startByteIndex, long len)
+ throws IOException {
+ Pipeline pipeline = pipelineRef.get();
+ buffers = new ArrayList<>();
+ ReadBlockResponseProto response =
+ ContainerProtocolCalls.readBlock(xceiverClient, startByteIndex,
+ len, blockID, validators, tokenRef.get(),
pipeline.getReplicaIndexes(), verifyChecksum);
+ List<ReadChunkResponseProto> readBlocks = response.getReadChunkList();
+
+ for (ReadChunkResponseProto readBlock : readBlocks) {
+ if (readBlock.hasDataBuffers()) {
+ buffers.addAll(BufferUtils.getReadOnlyByteBuffers(
+ readBlock.getDataBuffers().getBuffersList()));
+ } else {
+ throw new IOException("Unexpected error while reading chunk data " +
+ "from container. No data returned.");
+ }
+ }
+ return response.getReadChunk(0)
+ .getChunkData().getOffset();
+ }
+
+ /**
+ * Check if the buffers have any data remaining between the current
+ * position and the limit.
+ */
+ private boolean buffersHaveData() {
+ boolean hasData = false;
+ if (buffersAllocated()) {
+ int buffersLen = buffers.size();
+ while (bufferIndex < buffersLen) {
+ ByteBuffer buffer = buffers.get(bufferIndex);
+ if (buffer != null && buffer.hasRemaining()) {
+ // current buffer has data
+ hasData = true;
+ break;
+ } else {
+ if (bufferIndex < buffersLen - 1) {
+ // move to next available buffer
+ ++bufferIndex;
+ Preconditions.checkState(bufferIndex < buffers.size());
+ } else {
+ // no more buffers remaining
+ break;
+ }
+ }
+ }
+ }
+
+ return hasData;
+ }
+
+ /**
+ * Check if there is more data in the chunk which has not yet been read
+ * into the buffers.
+ */
+ private boolean dataRemainingInBlock() {
+ long bufferPos;
+ if (blockPosition >= 0) {
+ bufferPos = blockPosition;
+ } else {
+ bufferPos = bufferOffsetWrtBlockData + buffersSize;
+ }
+
+ return bufferPos < blockLength;
+ }
+
+ /**
+ * Check if current buffer had been read till the end.
+ */
+ private boolean bufferEOF() {
+ return allocated && buffersAllocated() &&
!buffers.get(bufferIndex).hasRemaining();
+ }
+
+ /**
+ * Release the buffers upto the given index.
+ * @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the
+ * buffers must be released
+ */
+ private void releaseBuffers(int releaseUptoBufferIndex) {
+ int buffersLen = buffers.size();
+ if (releaseUptoBufferIndex == buffersLen - 1) {
+ // Before releasing all the buffers, if block EOF is not reached, then
+ // blockPosition should be set to point to the last position of the
+ // buffers. This should be done so that getPos() can return the current
+ // block position
+ blockPosition = bufferOffsetWrtBlockData +
+ bufferOffsets.get(releaseUptoBufferIndex) +
+ buffers.get(releaseUptoBufferIndex).capacity();
+ // Release all the buffers
+ releaseBuffers();
+ } else {
+ buffers = buffers.subList(releaseUptoBufferIndex + 1, buffersLen);
+ bufferOffsets = bufferOffsets.subList(
+ releaseUptoBufferIndex + 1, buffersLen);
+ bufferIndex = 0;
+ }
+ }
+
+ /**
+ * If EOF is reached, release the buffers.
+ */
+ private void releaseBuffers() {
+ buffers = null;
+ bufferIndex = 0;
+ // We should not reset bufferOffsetWrtBlockData and buffersSize here
+ // because when getPos() is called we use these
+ // values and determine whether chunk is read completely or not.
+ }
+
+ protected synchronized void releaseClient() {
+ if (xceiverClientFactory != null && xceiverClient != null) {
+ xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
+ xceiverClient = null;
+ }
+ }
+
+ private void validateBlock(
+ ContainerProtos.ContainerCommandResponseProto response
+ ) throws IOException {
+
+ ReadBlockResponseProto readBlock = response.getReadBlock();
+ for (ReadChunkResponseProto readChunk : readBlock.getReadChunkList()) {
+ List<ByteString> byteStrings;
+
+ ContainerProtos.ChunkInfo chunkInfo =
+ readChunk.getChunkData();
+ if (chunkInfo.getLen() <= 0) {
+ throw new IOException("Failed to get chunk: chunkName == "
+ + chunkInfo.getChunkName() + "len == " + chunkInfo.getLen());
+ }
+ byteStrings = readChunk.getDataBuffers().getBuffersList();
+ long buffersLen = BufferUtils.getBuffersLen(byteStrings);
+ if (buffersLen != chunkInfo.getLen()) {
+ // Bytes read from chunk should be equal to chunk size.
+ throw new OzoneChecksumException(String.format(
+ "Inconsistent read for chunk=%s len=%d bytesRead=%d",
+ chunkInfo.getChunkName(), chunkInfo.getLen(),
+ buffersLen));
+ }
+
+
+ if (verifyChecksum) {
+ ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+ chunkInfo.getChecksumData());
+ int startIndex = (int) readChunk.getChunkData().getOffset() /
checksumData.getBytesPerChecksum();
+
+ // ChecksumData stores checksum for each 'numBytesPerChecksum'
+ // number of bytes in a list. Compute the index of the first
+ // checksum to match with the read data
+
+ Checksum.verifyChecksum(byteStrings, checksumData, startIndex);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ protected synchronized void setBuffers(List<ByteBuffer> buffers) {
+ this.buffers = buffers;
+ }
+
+ private boolean shouldRetryRead(IOException cause) throws IOException {
+ RetryPolicy.RetryAction retryAction;
+ try {
+ retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+ }
+
+ @VisibleForTesting
+ public boolean isVerifyChecksum() {
+ return verifyChecksum;
+ }
+
+ private void refreshBlockInfo(IOException cause) throws IOException {
+ LOG.info("Attempting to update pipeline and block token for block {} from
pipeline {}: {}",
+ blockID, pipelineRef.get().getId(), cause.getMessage());
+ if (refreshFunction != null) {
+ LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
+ BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
+ if (blockLocationInfo == null) {
+ LOG.warn("No new block location info for block {}", blockID);
+ } else {
+ setPipeline(blockLocationInfo.getPipeline());
+ LOG.info("New pipeline for block {}: {}", blockID,
+ blockLocationInfo.getPipeline());
+
+ tokenRef.set(blockLocationInfo.getToken());
+ if (blockLocationInfo.getToken() != null) {
+ OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
+ tokenId.readFromByteArray(tokenRef.get().getIdentifier());
+ LOG.info("A new token is added for block {}. Expiry: {}",
+ blockID, Instant.ofEpochMilli(tokenId.getExpiryDate()));
+ }
+ }
+ } else {
+ throw cause;
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized ByteBuffer[] getCachedBuffers() {
+ return buffers == null ? null :
+ BufferUtils.getReadOnlyByteBuffers(buffers.toArray(new ByteBuffer[0]));
+ }
+
+ /**
+ * Check if this exception is because datanodes are not reachable.
+ */
+ private boolean isConnectivityIssue(IOException ex) {
+ return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ releaseClient();
+ releaseBuffers();
+ xceiverClientFactory = null;
+ }
+
+ private void handleStorageContainerException(StorageContainerException e)
throws IOException {
+ if (shouldRetryRead(e)) {
+ releaseClient();
+ refreshBlockInfo(e);
+ } else {
+ throw e;
+ }
+ }
+
+ private void handleIOException(IOException ioe) throws IOException {
+ if (shouldRetryRead(ioe)) {
+ if (isConnectivityIssue(ioe)) {
+ releaseClient();
+ refreshBlockInfo(ioe);
+ } else {
+ releaseClient();
+ }
+ } else {
+ throw ioe;
+ }
+ }
+}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 3fbee6be871..9249abaf42a 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.client.io;
+import static org.apache.hadoop.hdds.DatanodeVersion.STREAM_BLOCK_SUPPORT;
+
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -25,6 +27,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -32,6 +35,7 @@
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
@@ -85,6 +89,10 @@ public BlockExtendedInputStream create(ReplicationConfig
repConfig,
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, xceiverFactory, refreshFunction,
ecBlockStreamFactory, config);
+ } else if (config.isStreamReadBlock() &&
allDataNodesSupportStreamBlock(pipeline)) {
+ return new StreamBlockInputStream(
+ blockInfo.getBlockID(), blockInfo.getLength(),
+ pipeline, token, xceiverFactory, refreshFunction, config);
} else {
return new BlockInputStream(blockInfo,
pipeline, token, xceiverFactory, refreshFunction,
@@ -92,4 +100,16 @@ public BlockExtendedInputStream create(ReplicationConfig
repConfig,
}
}
+ private boolean allDataNodesSupportStreamBlock(Pipeline pipeline) {
+ // return true only if all DataNodes in the pipeline are on a version
+ // that supports for reading a block by streaming chunks..
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ if (dn.getCurrentVersion() <
+ STREAM_BLOCK_SUPPORT.toProtoValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
new file mode 100644
index 00000000000..e141845954d
--- /dev/null
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.hdds.client.BlockID;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+/**
+ * A dummy StreamBlockInputStream to mock read block call to DN.
+ */
+class DummyStreamBlockInputStream extends StreamBlockInputStream {
+
+ private final List<ByteString> readByteBuffers = new ArrayList<>();
+ private final List<ChunkInfo> chunks;
+ private final long[] chunkOffsets;
+ private final Map<String, byte[]> chunkDataMap;
+
+ @SuppressWarnings("parameternumber")
+ DummyStreamBlockInputStream(
+ BlockID blockId,
+ long blockLen,
+ Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token,
+ XceiverClientFactory xceiverClientManager,
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config,
+ List<ChunkInfo> chunks,
+ Map<String, byte[]> chunkDataMap) throws IOException {
+ super(blockId, blockLen, pipeline, token, xceiverClientManager,
+ refreshFunction, config);
+ this.chunks = chunks;
+ this.chunkDataMap = chunkDataMap;
+ chunkOffsets = new long[chunks.size()];
+ long temp = 0;
+ for (int i = 0; i < chunks.size(); i++) {
+ chunkOffsets[i] = temp;
+ temp += chunks.get(i).getLen();
+ }
+ }
+
+ @Override
+ protected synchronized void checkOpen() throws IOException {
+ // No action needed
+ }
+
+ @Override
+ protected void acquireClient() {
+ // No action needed
+ }
+
+ @Override
+ protected void releaseClient() {
+ // no-op
+ }
+
+ @Override
+ protected long readData(long offset, long len) {
+ int chunkIndex = Arrays.binarySearch(chunkOffsets, offset);
+ if (chunkIndex < 0) {
+ chunkIndex = -chunkIndex - 2;
+ }
+ ChunkInfo chunkInfo = chunks.get(chunkIndex);
+ readByteBuffers.clear();
+ long chunkOffset = offset - chunkInfo.getOffset();
+ if (isVerifyChecksum()) {
+ ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+ chunkInfo.getChecksumData());
+ int bytesPerChecksum = checksumData.getBytesPerChecksum();
+ chunkOffset = (chunkOffset / bytesPerChecksum) * bytesPerChecksum;
+ }
+ long bufferOffsetWrtBlockDataData = chunkOffsets[chunkIndex] + chunkOffset;
+ while (len > 0) {
+ ChunkInfo currentChunk = chunks.get(chunkIndex);
+ int bufferCapacity =
currentChunk.getChecksumData().getBytesPerChecksum();
+ long chunkLen = currentChunk.getLen();
+ long remainingToRead = Math.min(chunkLen, len);
+ if (isVerifyChecksum()) {
+ if (len < chunkLen) {
+ final ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
+ final long endByteIndex = len - 1;
+ final int bytesPerChecksum = checksumData.getBytesPerChecksum();
+ remainingToRead = (endByteIndex / bytesPerChecksum + 1) *
bytesPerChecksum;
+ } else {
+ remainingToRead = chunkLen;
+ }
+ }
+
+ long bufferLen;
+ while (remainingToRead > 0) {
+ if (remainingToRead < bufferCapacity) {
+ bufferLen = remainingToRead;
+ } else {
+ bufferLen = bufferCapacity;
+ }
+ ByteString byteString =
ByteString.copyFrom(chunkDataMap.get(chunks.get(chunkIndex).getChunkName()),
+ (int) chunkOffset, (int) bufferLen);
+
+ readByteBuffers.add(byteString);
+
+ chunkOffset += bufferLen;
+ remainingToRead -= bufferLen;
+ len -= bufferLen;
+ }
+ chunkOffset = 0;
+ chunkIndex++;
+ }
+ setBuffers(BufferUtils.getReadOnlyByteBuffers(readByteBuffers));
+ return bufferOffsetWrtBlockDataData;
+ }
+
+ public List<ByteString> getReadByteBuffers() {
+ return readByteBuffers;
+ }
+}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
new file mode 100644
index 00000000000..81cb6d4d62c
--- /dev/null
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.primitives.Bytes;
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Function;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link TestStreamBlockInputStream}'s functionality.
+ */
+public class TestStreamBlockInputStream {
+ private int blockSize;
+ private static final int CHUNK_SIZE = 100;
+ private static final int BYTES_PER_CHECKSUM = 20;
+ private static final Random RANDOM = new Random();
+ private DummyStreamBlockInputStream blockStream;
+ private byte[] blockData;
+ private List<ChunkInfo> chunks;
+ private Map<String, byte[]> chunkDataMap;
+ private Checksum checksum;
+ private BlockID blockID;
+ private static final String CHUNK_NAME = "chunk-";
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
+ @BeforeEach
+ public void setup() throws Exception {
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamReadBlock(true);
+ blockID = new BlockID(new ContainerBlockID(1, 1));
+ checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM);
+ createChunkList(5);
+
+ Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+ blockStream = new DummyStreamBlockInputStream(blockID, blockSize, pipeline,
+ null, null, mock(Function.class), clientConfig, chunks, chunkDataMap);
+ }
+
+ /**
+ * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
+ * and the last chunk with length CHUNK_SIZE/2.
+ */
+ private void createChunkList(int numChunks)
+ throws Exception {
+
+ chunks = new ArrayList<>(numChunks);
+ chunkDataMap = new HashMap<>();
+ blockData = new byte[0];
+ int i, chunkLen;
+ byte[] byteData;
+ String chunkName;
+
+ for (i = 0; i < numChunks; i++) {
+ chunkName = CHUNK_NAME + i;
+ chunkLen = CHUNK_SIZE;
+ if (i == numChunks - 1) {
+ chunkLen = CHUNK_SIZE / 2;
+ }
+ byteData = generateRandomData(chunkLen);
+ ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+ .setChunkName(chunkName)
+ .setOffset(0)
+ .setLen(chunkLen)
+ .setChecksumData(checksum.computeChecksum(
+ byteData, 0, chunkLen).getProtoBufMessage())
+ .build();
+
+ chunkDataMap.put(chunkName, byteData);
+ chunks.add(chunkInfo);
+
+ blockSize += chunkLen;
+ blockData = Bytes.concat(blockData, byteData);
+ }
+ }
+
+ static byte[] generateRandomData(int length) {
+ byte[] bytes = new byte[length];
+ RANDOM.nextBytes(bytes);
+ return bytes;
+ }
+
+ /**
+ * Match readData with the chunkData byte-wise.
+ * @param readData Data read through ChunkInputStream
+ * @param inputDataStartIndex first index (inclusive) in chunkData to compare
+ * with read data
+ * @param length the number of bytes of data to match starting from
+ * inputDataStartIndex
+ */
+ private void matchWithInputData(byte[] readData, int inputDataStartIndex,
+ int length) {
+ for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
+ assertEquals(blockData[i], readData[i - inputDataStartIndex], "i: " + i);
+ }
+ }
+
+ private void matchWithInputData(List<ByteString> byteStrings,
+ int inputDataStartIndex, int length) {
+ int offset = inputDataStartIndex;
+ int totalBufferLen = 0;
+ for (ByteString byteString : byteStrings) {
+ int bufferLen = byteString.size();
+ matchWithInputData(byteString.toByteArray(), offset, bufferLen);
+ offset += bufferLen;
+ totalBufferLen += bufferLen;
+ }
+ assertEquals(length, totalBufferLen);
+ }
+
+ /**
+ * Seek to a position and verify through getPos().
+ */
+ private void seekAndVerify(int pos) throws Exception {
+ blockStream.seek(pos);
+ assertEquals(pos, blockStream.getPos(),
+ "Current position of buffer does not match with the sought position");
+ }
+
+ @Test
+ public void testFullChunkRead() throws Exception {
+ byte[] b = new byte[blockSize];
+ int numBytesRead = blockStream.read(b, 0, blockSize);
+ assertEquals(blockSize, numBytesRead);
+ matchWithInputData(b, 0, blockSize);
+ }
+
+ @Test
+ public void testPartialChunkRead() throws Exception {
+ int len = blockSize / 2;
+ byte[] b = new byte[len];
+
+ int numBytesRead = blockStream.read(b, 0, len);
+ assertEquals(len, numBytesRead);
+ matchWithInputData(b, 0, len);
+
+ // To read block data from index 0 to 225 (len = 225), we need to read
+ // chunk from offset 0 to 240 as the checksum boundary is at every 20
+ // bytes. Verify that 60 bytes of chunk data are read and stored in the
+ // buffers. Since checksum boundary is at every 20 bytes, there should be
+ // 240/20 number of buffers.
+ matchWithInputData(blockStream.getReadByteBuffers(), 0, 240);
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ seekAndVerify(0);
+ EOFException eofException = assertThrows(EOFException.class, () ->
seekAndVerify(blockSize + 1));
+ assertThat(eofException).hasMessage("EOF encountered at pos: " +
(blockSize + 1) + " for block: " + blockID);
+
+ // Seek before read should update the BlockInputStream#blockPosition
+ seekAndVerify(25);
+
+ // Read from the sought position.
+ // Reading from index 25 to 54 should result in the BlockInputStream
+ // copying chunk data from index 20 to 59 into the buffers (checksum
+ // boundaries).
+ byte[] b = new byte[30];
+ int numBytesRead = blockStream.read(b, 0, 30);
+ assertEquals(30, numBytesRead);
+ matchWithInputData(b, 25, 30);
+ matchWithInputData(blockStream.getReadByteBuffers(), 20, 40);
+
+ // After read, the position of the blockStream is evaluated from the
+ // buffers and the chunkPosition should be reset to -1.
+
+ // Only the last BYTES_PER_CHECKSUM will be cached in the buffers as
+ // buffers are released after each checksum boundary is read. So the
+ // buffers should contain data from index 40 to 59.
+ // Seek to a position within the cached buffers. BlockPosition should
+ // still not be used to set the position.
+ seekAndVerify(45);
+
+ // Seek to a position outside the current cached buffers. In this case, the
+ // chunkPosition should be updated to the seeked position.
+ seekAndVerify(75);
+
+ // Read upto checksum boundary should result in all the buffers being
+ // released and hence chunkPosition updated with current position of chunk.
+ seekAndVerify(25);
+ b = new byte[15];
+ numBytesRead = blockStream.read(b, 0, 15);
+ assertEquals(15, numBytesRead);
+ matchWithInputData(b, 25, 15);
+ }
+
+ @Test
+ public void testSeekAndRead() throws Exception {
+ // Seek to a position and read data
+ seekAndVerify(50);
+ byte[] b1 = new byte[20];
+ int numBytesRead = blockStream.read(b1, 0, 20);
+ assertEquals(20, numBytesRead);
+ matchWithInputData(b1, 50, 20);
+
+ // Next read should start from the position of the last read + 1 i.e. 70
+ byte[] b2 = new byte[20];
+ numBytesRead = blockStream.read(b2, 0, 20);
+ assertEquals(20, numBytesRead);
+ matchWithInputData(b2, 70, 20);
+
+ byte[] b3 = new byte[20];
+ seekAndVerify(80);
+ numBytesRead = blockStream.read(b3, 0, 20);
+ assertEquals(20, numBytesRead);
+ matchWithInputData(b3, 80, 20);
+ }
+
+ @Test
+ public void testUnbuffered() throws Exception {
+ byte[] b1 = new byte[20];
+ int numBytesRead = blockStream.read(b1, 0, 20);
+ assertEquals(20, numBytesRead);
+ matchWithInputData(b1, 0, 20);
+
+ blockStream.unbuffer();
+
+ assertFalse(blockStream.buffersAllocated());
+
+ // Next read should start from the position of the last read + 1 i.e. 20
+ byte[] b2 = new byte[20];
+ numBytesRead = blockStream.read(b2, 0, 20);
+ assertEquals(20, numBytesRead);
+ matchWithInputData(b2, 20, 20);
+ }
+
+}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
index dbc42816036..2fd81fc91dc 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
@@ -39,7 +39,10 @@
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
/**
@@ -49,8 +52,9 @@ public class TestBlockInputStreamFactoryImpl {
private OzoneConfiguration conf = new OzoneConfiguration();
- @Test
- public void testNonECGivesBlockInputStream() throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testNonECGivesBlockInputStream(boolean streamReadBlockEnabled)
throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
@@ -62,11 +66,16 @@ public void testNonECGivesBlockInputStream() throws
IOException {
Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1);
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(true);
+ clientConfig.setStreamReadBlock(streamReadBlockEnabled);
BlockExtendedInputStream stream =
factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
blockInfo.getToken(), null, null,
clientConfig);
- assertInstanceOf(BlockInputStream.class, stream);
+ if (streamReadBlockEnabled) {
+ assertInstanceOf(StreamBlockInputStream.class, stream);
+ } else {
+ assertInstanceOf(BlockInputStream.class, stream);
+ }
assertEquals(stream.getBlockID(), blockInfo.getBlockID());
assertEquals(stream.getLength(), blockInfo.getLength());
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
index 4c0bb03c165..2717e8eb3d9 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
@@ -33,6 +33,8 @@ public enum DatanodeVersion implements ComponentVersion {
SEPARATE_RATIS_PORTS_AVAILABLE(1, "Version with separated Ratis port."),
COMBINED_PUTBLOCK_WRITECHUNK_RPC(2, "WriteChunk can optionally support " +
"a PutBlock request"),
+ STREAM_BLOCK_SUPPORT(3,
+ "This version has support for reading a block by streaming chunks."),
FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index c07a21680ef..78ff1feabca 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -418,6 +418,7 @@ public static boolean isReadOnly(
switch (proto.getCmdType()) {
case ReadContainer:
case ReadChunk:
+ case ReadBlock:
case ListBlock:
case GetBlock:
case GetSmallFile:
@@ -478,6 +479,7 @@ public static boolean requireBlockToken(Type cmdType) {
case PutBlock:
case PutSmallFile:
case ReadChunk:
+ case ReadBlock:
case WriteChunk:
case FinalizeBlock:
return true;
@@ -553,6 +555,11 @@ public static BlockID
getBlockID(ContainerCommandRequestProtoOrBuilder msg) {
blockID = msg.getReadChunk().getBlockID();
}
break;
+ case ReadBlock:
+ if (msg.hasReadBlock()) {
+ blockID = msg.getReadBlock().getBlockID();
+ }
+ break;
case WriteChunk:
if (msg.hasWriteChunk()) {
blockID = msg.getWriteChunk().getBlockID();
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 5e547898662..ededa8d070b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -334,6 +334,23 @@ public static ContainerCommandResponseProto
getReadChunkResponse(
.build();
}
+ public static ContainerCommandResponseProto getReadBlockResponse(
+ ContainerCommandRequestProto request, DatanodeBlockID blockID,
+ ChunkInfo chunkInfo, ChunkBufferToByteString data,
+ Function<ByteBuffer, ByteString> byteBufferToByteString) {
+
+ ReadChunkResponseProto.Builder response;
+ response = ReadChunkResponseProto.newBuilder()
+ .setChunkData(chunkInfo)
+ .setDataBuffers(DataBuffers.newBuilder()
+ .addAllBuffers(data.toByteStringList(byteBufferToByteString))
+ .build())
+ .setBlockID(blockID);
+ return getSuccessResponseBuilder(request)
+ .setReadChunk(response)
+ .build();
+ }
+
public static ContainerCommandResponseProto getFinalizeBlockResponse(
ContainerCommandRequestProto msg, BlockData data) {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index a934fc51372..1117d29c1aa 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -55,6 +55,8 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
@@ -904,4 +906,58 @@ public static List<Validator> toValidatorList(Validator
validator) {
return datanodeToResponseMap;
}
+ /**
+ * Calls the container protocol to read a chunk.
+ *
+ * @param xceiverClient client to perform call
+ * @param offset offset where block starts
+ * @param len length of data to read
+ * @param blockID ID of the block
+ * @param validators functions to validate the response
+ * @param token a token for this block (may be null)
+ * @return container protocol read chunk response
+ * @throws IOException if there is an I/O error while performing the call
+ */
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ public static ContainerProtos.ReadBlockResponseProto readBlock(
+ XceiverClientSpi xceiverClient, long offset, long len, BlockID blockID,
+ List<Validator> validators, Token<? extends TokenIdentifier> token,
+ Map<DatanodeDetails, Integer> replicaIndexes, boolean verifyChecksum)
throws IOException {
+ final ReadBlockRequestProto.Builder readBlockRequest =
+ ReadBlockRequestProto.newBuilder()
+ .setOffset(offset)
+ .setVerifyChecksum(verifyChecksum)
+ .setLen(len);
+ final ContainerCommandRequestProto.Builder builder =
+ ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock)
+ .setContainerID(blockID.getContainerID());
+ if (token != null) {
+ builder.setEncodedToken(token.encodeToUrlString());
+ }
+
+ return tryEachDatanode(xceiverClient.getPipeline(),
+ d -> readBlock(xceiverClient,
+ validators, blockID, builder, readBlockRequest, d, replicaIndexes),
+ d -> toErrorMessage(blockID, d));
+ }
+
+ private static ReadBlockResponseProto readBlock(XceiverClientSpi
xceiverClient,
+ List<Validator> validators,
BlockID blockID,
+
ContainerCommandRequestProto.Builder builder,
+
ReadBlockRequestProto.Builder readBlockBuilder,
+ DatanodeDetails datanode,
+ Map<DatanodeDetails,
Integer> replicaIndexes) throws IOException {
+ final DatanodeBlockID.Builder datanodeBlockID =
blockID.getDatanodeBlockIDProtobufBuilder();
+ int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
+ if (replicaIndex > 0) {
+ datanodeBlockID.setReplicaIndex(replicaIndex);
+ }
+ readBlockBuilder.setBlockID(datanodeBlockID);
+ final ContainerCommandRequestProto request = builder
+ .setDatanodeUuid(datanode.getUuidString())
+ .setReadBlock(readBlockBuilder).build();
+ ContainerCommandResponseProto response =
+ xceiverClient.sendCommand(request, validators);
+ return response.getReadBlock();
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 09117335617..61d1c49da04 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -43,7 +43,8 @@ public enum DNAction implements AuditAction {
STREAM_INIT,
FINALIZE_BLOCK,
ECHO,
- GET_CONTAINER_CHECKSUM_INFO;
+ GET_CONTAINER_CHECKSUM_INFO,
+ READ_BLOCK;
@Override
public String getAction() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index ea47c4945b8..e6be4a490b4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -24,6 +24,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
+import io.opentelemetry.api.trace.Span;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@@ -52,6 +53,7 @@
import org.apache.hadoop.hdds.security.token.NoopTokenVerifier;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -75,6 +77,8 @@
import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -812,6 +816,80 @@ public StateMachine.DataChannel getStreamDataChannel(
}
}
+ @Override
+ public void streamDataReadOnly(ContainerCommandRequestProto msg,
+ StreamObserver<ContainerCommandResponseProto>
streamObserver,
+ DispatcherContext dispatcherContext) {
+ Type cmdType = msg.getCmdType();
+ String traceID = msg.getTraceID();
+ Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID);
+ AuditAction action = getAuditAction(msg.getCmdType());
+ EventType eventType = getEventType(msg);
+
+ try (UncheckedAutoCloseable ignored = protocolMetrics.measure(cmdType)) {
+ Preconditions.checkNotNull(msg);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(),
+ traceID);
+ }
+
+ PerformanceStringBuilder perf = new PerformanceStringBuilder();
+ ContainerCommandResponseProto responseProto = null;
+ long containerID = msg.getContainerID();
+ Container container = getContainer(containerID);
+ long startTime = Time.monotonicNow();
+
+ if (DispatcherContext.op(dispatcherContext).validateToken()) {
+ validateToken(msg);
+ }
+ if (getMissingContainerSet().contains(containerID)) {
+ throw new StorageContainerException(
+ "ContainerID " + containerID
+ + " has been lost and and cannot be recreated on this
DataNode",
+ ContainerProtos.Result.CONTAINER_MISSING);
+ }
+ if (container == null) {
+ throw new StorageContainerException(
+ "ContainerID " + containerID + " does not exist",
+ ContainerProtos.Result.CONTAINER_NOT_FOUND);
+ }
+ ContainerType containerType = getContainerType(container);
+ Handler handler = getHandler(containerType);
+ if (handler == null) {
+ throw new StorageContainerException("Invalid " +
+ "ContainerType " + containerType,
+ ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+ }
+ perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
+ responseProto = handler.readBlock(
+ msg, container, dispatcherContext, streamObserver);
+ long oPLatencyMS = Time.monotonicNow() - startTime;
+ metrics.incContainerOpsLatencies(cmdType, oPLatencyMS);
+ if (responseProto == null) {
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.SUCCESS, null);
+ } else {
+ containerSet.scanContainer(containerID, "ReadBlock failed " +
responseProto.getResult());
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE,
+ new Exception(responseProto.getMessage()));
+ streamObserver.onNext(responseProto);
+ }
+ perf.appendOpLatencyMs(oPLatencyMS);
+ performanceAudit(action, msg, dispatcherContext, perf, oPLatencyMS);
+
+ } catch (StorageContainerException sce) {
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE, sce);
+ streamObserver.onNext(ContainerUtils.logAndReturnError(LOG, sce, msg));
+ } catch (IOException ioe) {
+ final String s = ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
+ + " for " + dispatcherContext + ": " + ioe.getMessage();
+ final StorageContainerException sce = new StorageContainerException(
+ s, ioe, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+ streamObserver.onNext(ContainerUtils.logAndReturnError(LOG, sce, msg));
+ } finally {
+ span.end();
+ }
+ }
+
private static DNAction getAuditAction(Type cmdType) {
switch (cmdType) {
case CreateContainer : return DNAction.CREATE_CONTAINER;
@@ -836,6 +914,7 @@ private static DNAction getAuditAction(Type cmdType) {
case FinalizeBlock : return DNAction.FINALIZE_BLOCK;
case Echo : return DNAction.ECHO;
case GetContainerChecksumInfo: return DNAction.GET_CONTAINER_CHECKSUM_INFO;
+ case ReadBlock : return DNAction.READ_BLOCK;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 1c3071a3791..d6c33455008 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -24,6 +24,7 @@
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
/**
* Dispatcher acts as the bridge between the transport layer and
@@ -89,4 +90,15 @@ default StateMachine.DataChannel getStreamDataChannel(
throw new UnsupportedOperationException(
"getStreamDataChannel not supported.");
}
+
+ /**
+ * When reading data form client by streaming chunks.
+ */
+ default void streamDataReadOnly(
+ ContainerCommandRequestProto msg,
+ StreamObserver<ContainerCommandResponseProto> streamObserver,
+ DispatcherContext dispatcherContext) {
+ throw new UnsupportedOperationException(
+ "streamDataReadOnly not supported.");
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 4337d667618..0abcab5afea 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -40,9 +40,11 @@
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
/**
* Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -264,4 +266,9 @@ public void setClusterID(String clusterID) {
this.clusterId = clusterID;
}
+ public abstract ContainerCommandResponseProto readBlock(
+ ContainerCommandRequestProto msg, Container container,
+ DispatcherContext dispatcherContext,
+ StreamObserver<ContainerCommandResponseProto> streamObserver);
+
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 6728744d147..041958b4227 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -104,8 +104,12 @@ public void onNext(ContainerCommandRequestProto request) {
.build();
try {
- final ContainerCommandResponseProto resp =
dispatcher.dispatch(request, context);
- responseObserver.onNext(resp);
+ if (request.getCmdType() == Type.ReadBlock) {
+ dispatcher.streamDataReadOnly(request, responseObserver, null);
+ } else {
+ final ContainerCommandResponseProto resp =
dispatcher.dispatch(request, context);
+ responseObserver.onNext(resp);
+ }
} catch (Throwable e) {
LOG.error("Got exception when processing"
+ " ContainerCommandRequestProto {}", request, e);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
index f9ee0a4bd0f..e0f0ccbe193 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -34,6 +34,8 @@
public final class DispatcherContext {
private static final DispatcherContext HANDLE_READ_CHUNK
= newBuilder(Op.HANDLE_READ_CHUNK).build();
+ private static final DispatcherContext HANDLE_READ_BLOCK
+ = newBuilder(Op.HANDLE_READ_BLOCK).build();
private static final DispatcherContext HANDLE_WRITE_CHUNK
= newBuilder(Op.HANDLE_WRITE_CHUNK).build();
private static final DispatcherContext HANDLE_GET_SMALL_FILE
@@ -60,6 +62,10 @@ public static DispatcherContext getHandleReadChunk() {
return HANDLE_READ_CHUNK;
}
+ public static DispatcherContext getHandleReadBlock() {
+ return HANDLE_READ_BLOCK;
+ }
+
public static DispatcherContext getHandleWriteChunk() {
return HANDLE_WRITE_CHUNK;
}
@@ -92,6 +98,7 @@ public enum Op {
NULL,
HANDLE_READ_CHUNK,
+ HANDLE_READ_BLOCK,
HANDLE_WRITE_CHUNK,
HANDLE_GET_SMALL_FILE,
HANDLE_PUT_SMALL_FILE,
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 584cb98b367..ecf13cf12af 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -46,6 +46,7 @@
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess;
+import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadBlockResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
@@ -87,6 +88,7 @@
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
@@ -102,6 +104,7 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -162,6 +165,7 @@
import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2046,6 +2050,115 @@ public void deleteUnreferenced(Container container,
long localID)
}
}
+ @Override
+ public ContainerCommandResponseProto readBlock(
+ ContainerCommandRequestProto request, Container kvContainer,
+ DispatcherContext dispatcherContext,
+ StreamObserver<ContainerCommandResponseProto> streamObserver) {
+ ContainerCommandResponseProto responseProto = null;
+ if (!request.hasReadBlock()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Malformed Read Block request. trace ID: {}",
+ request.getTraceID());
+ }
+ return malformedRequest(request);
+ }
+ try {
+ ReadBlockRequestProto readBlock = request.getReadBlock();
+
+ BlockID blockID = BlockID.getFromProtobuf(
+ readBlock.getBlockID());
+ // This is a new api the block should always be checked.
+ BlockUtils.verifyReplicaIdx(kvContainer, blockID);
+ BlockUtils.verifyBCSId(kvContainer, blockID);
+
+ BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ long blockOffset = 0;
+ int chunkIndex = -1;
+ long chunkOffset = 0;
+ long offset = readBlock.getOffset();
+ for (int i = 0; i < chunkInfos.size(); i++) {
+ final long chunkLen = chunkInfos.get(i).getLen();
+ blockOffset += chunkLen;
+ if (blockOffset > offset) {
+ chunkIndex = i;
+ chunkOffset = offset - blockOffset + chunkLen;
+ break;
+ }
+ }
+ Preconditions.checkState(chunkIndex >= 0);
+
+ if (dispatcherContext == null) {
+ dispatcherContext = DispatcherContext.getHandleReadBlock();
+ }
+
+ ChunkBufferToByteString data;
+
+ long len = readBlock.getLen();
+ long adjustedChunkOffset, adjustedChunkLen;
+ do {
+ ContainerProtos.ChunkInfo chunk = chunkInfos.get(chunkIndex);
+ if (readBlock.getVerifyChecksum()) {
+ Pair<Long, Long> adjustedOffsetAndLength =
+ computeChecksumBoundaries(chunk, chunkOffset, len);
+ adjustedChunkOffset = adjustedOffsetAndLength.getLeft();
+ adjustedChunkLen = adjustedOffsetAndLength.getRight();
+ adjustedChunkOffset += chunk.getOffset();
+ } else {
+ adjustedChunkOffset = chunkOffset;
+ adjustedChunkLen = Math.min(
+ chunk.getLen() + chunk.getOffset() - chunkOffset, len);
+ }
+
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
+ ContainerProtos.ChunkInfo.newBuilder(chunk)
+ .setOffset(adjustedChunkOffset)
+ .setLen(adjustedChunkLen).build());
+ BlockUtils.verifyReplicaIdx(kvContainer, blockID);
+ BlockUtils.verifyBCSId(kvContainer, blockID);
+ data = getChunkManager().readChunk(
+ kvContainer, blockID, chunkInfo, dispatcherContext);
+
+ Preconditions.checkNotNull(data, "Chunk data is null");
+ streamObserver.onNext(
+ getReadBlockResponse(request,
+ blockData.getProtoBufMessage().getBlockID(),
+ chunkInfo.getProtoBufMessage(),
+ data, byteBufferToByteString));
+ len -= adjustedChunkLen + adjustedChunkOffset - chunkOffset;
+ chunkOffset = 0;
+ chunkIndex++;
+ } while (len > 0);
+
+ metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen());
+ } catch (StorageContainerException ex) {
+ responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ioe) {
+ responseProto = ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Read Block failed", ioe,
IO_EXCEPTION),
+ request);
+ }
+ return responseProto;
+ }
+
+ private Pair<Long, Long> computeChecksumBoundaries(
+ ContainerProtos.ChunkInfo chunkInfo, long startByteIndex, long dataLen) {
+
+ int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
+
+ // index of the last byte to be read from chunk, inclusively.
+ final long endByteIndex = startByteIndex + dataLen - 1;
+
+ long adjustedChunkOffset = (startByteIndex / bytesPerChecksum)
+ * bytesPerChecksum; // inclusive
+ final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
+ * bytesPerChecksum; // exclusive
+ long adjustedChunkLen =
+ Math.min(endIndex, chunkInfo.getLen()) - adjustedChunkOffset;
+ return Pair.of(adjustedChunkOffset, adjustedChunkLen);
+ }
+
@Override
public void addFinalizedBlock(Container container, long localID) {
KeyValueContainer keyValueContainer = (KeyValueContainer)container;
@@ -2084,7 +2197,7 @@ private boolean logBlocksIfNonZero(Container container)
}
if (nonZero) {
LOG.error("blocks in rocksDB on container delete: {}",
- stringBuilder.toString());
+ stringBuilder);
}
}
return nonZero;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 6afee1c5d77..efede65e524 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -42,8 +42,10 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -53,9 +55,11 @@
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -68,22 +72,29 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -94,22 +105,28 @@
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,7 +144,10 @@ public class TestKeyValueHandler {
private Path dbFile;
private static final long DUMMY_CONTAINER_ID = 9999;
+ private static final long LOCAL_ID = 1;
private static final String DUMMY_PATH = "dummy/dir/doesnt/exist";
+ private static final long CHUNK_SIZE = 1024 * 1024; // 1MB
+ private static final long BYTES_PER_CHECKSUM = 256 * 1024;
private static final String DATANODE_UUID = UUID.randomUUID().toString();
private static final String CLUSTER_ID = UUID.randomUUID().toString();
@@ -938,4 +958,77 @@ private KeyValueHandler createKeyValueHandler(Path path)
throws IOException {
return kvHandler;
}
+
+ @Test
+ public void testReadBlock() throws IOException {
+
+ StreamObserver<ContainerCommandResponseProto> streamObserver =
mock(StreamObserver.class);
+ KeyValueContainer container = mock(KeyValueContainer.class);
+ final KeyValueHandler kvHandler = new KeyValueHandler(new
OzoneConfiguration(),
+ UUID.randomUUID().toString(), mock(ContainerSet.class),
mock(VolumeSet.class), mock(ContainerMetrics.class),
+ mock(IncrementalReportSender.class),
mock(ContainerChecksumTreeManager.class));
+ final KeyValueHandler keyValueHandler = spy(kvHandler);
+ DispatcherContext dispatcherContext = mock(DispatcherContext.class);
+
+ List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
+ BlockData blockData = new BlockData(new BlockID(1, 1));
+ for (int i = 0; i < 4; i++) {
+ chunkInfoList.add(ContainerProtos.ChunkInfo
+ .newBuilder()
+ .setOffset(0)
+ .setLen(CHUNK_SIZE)
+ .setChecksumData(
+ ChecksumData.newBuilder().setBytesPerChecksum((int)
BYTES_PER_CHECKSUM)
+ .setType(ChecksumType.CRC32).build())
+ .setChunkName("chunkName" + i)
+ .build());
+ }
+ blockData.setChunks(chunkInfoList);
+
+ try (MockedStatic<BlockUtils> blockUtils = mockStatic(BlockUtils.class)) {
+ BlockManager blockManager = mock(BlockManager.class);
+ ChunkManager chunkManager = mock(ChunkManager.class);
+ when(keyValueHandler.getBlockManager()).thenReturn(blockManager);
+ when(keyValueHandler.getChunkManager()).thenReturn(chunkManager);
+ when(blockManager.getBlock(any(), any())).thenReturn(blockData);
+ ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(0));
+ when(chunkManager.readChunk(any(), any(),
+ any(), any()))
+ .thenReturn(data);
+ testReadBlock(0, 1, keyValueHandler, dispatcherContext,
+ streamObserver, container);
+ testReadBlock(0, CHUNK_SIZE + 1, keyValueHandler, dispatcherContext,
+ streamObserver, container);
+ testReadBlock(CHUNK_SIZE / 2, 2 * CHUNK_SIZE, keyValueHandler,
dispatcherContext,
+ streamObserver, container);
+ }
+ }
+
+ private static ContainerCommandRequestProto readBlockRequest(
+ long offset, long length) {
+ return ContainerCommandRequestProto.newBuilder()
+ .setCmdType(Type.ReadBlock)
+ .setReadBlock(
+ ContainerProtos.ReadBlockRequestProto.newBuilder()
+ .setBlockID(
+ ContainerProtos.DatanodeBlockID.newBuilder()
+ .setContainerID(DUMMY_CONTAINER_ID)
+ .setLocalID(LOCAL_ID))
+ .setOffset(offset)
+ .setLen(length)
+ .setVerifyChecksum(true))
+ .setContainerID(DUMMY_CONTAINER_ID)
+ .setDatanodeUuid(UUID.randomUUID().toString())
+ .build();
+ }
+
+ private static void testReadBlock(
+ long offset, long length, KeyValueHandler keyValueHandler,
DispatcherContext dispatcherContext,
+ StreamObserver<ContainerCommandResponseProto> streamObserver,
KeyValueContainer container) {
+ int responseCount = (int) (((offset + length - 1) / CHUNK_SIZE) + 1 -
(offset / CHUNK_SIZE));
+ ContainerCommandRequestProto requestProto = readBlockRequest(offset,
length);
+ keyValueHandler.readBlock(requestProto, container, dispatcherContext,
streamObserver);
+ verify(streamObserver, times(responseCount)).onNext(any());
+ clearInvocations(streamObserver);
+ }
}
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index bd890eae64a..c5548244560 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -77,6 +77,8 @@ package hadoop.hdds.datanode;
* 18. CopyContainer - Copies a container from a remote machine.
*
* 19. FinalizeBlock - Finalize block request from client.
+ *
+ * 20. ReadBlock - Allows us to read a block.
*/
enum Type {
@@ -108,6 +110,7 @@ enum Type {
FinalizeBlock = 21;
Echo = 22;
GetContainerChecksumInfo = 23;
+ ReadBlock = 24;
}
@@ -218,6 +221,7 @@ message ContainerCommandRequestProto {
optional FinalizeBlockRequestProto finalizeBlock = 25;
optional EchoRequestProto echo = 26;
optional GetContainerChecksumInfoRequestProto getContainerChecksumInfo =
27;
+ optional ReadBlockRequestProto readBlock = 28;
}
message ContainerCommandResponseProto {
@@ -250,6 +254,7 @@ message ContainerCommandResponseProto {
optional FinalizeBlockResponseProto finalizeBlock = 22;
optional EchoResponseProto echo = 23;
optional GetContainerChecksumInfoResponseProto getContainerChecksumInfo =
24;
+ optional ReadBlockResponseProto readBlock = 25;
}
message ContainerDataProto {
@@ -393,6 +398,17 @@ message ListBlockResponseProto {
repeated BlockData blockData = 1;
}
+message ReadBlockRequestProto {
+ required DatanodeBlockID blockID = 1;
+ required uint64 offset = 2;
+ required uint64 len = 3;
+ required bool verifyChecksum = 4;
+}
+
+message ReadBlockResponseProto {
+ repeated ReadChunkResponseProto readChunk = 1;
+}
+
message EchoRequestProto {
optional bytes payload = 1;
optional int32 payloadSizeResp = 2;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
new file mode 100644
index 00000000000..80ae5118467
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.om.TestBucket;
+
+/**
+ * Tests {@link StreamBlockInputStream}.
+ */
+public class TestStreamBlockInputStream extends TestInputStreamBase {
+ /**
+ * Run the tests as a single test method to avoid needing a new mini-cluster
+ * for each test.
+ */
+ @ContainerLayoutTestInfo.ContainerTest
+ void testAll(ContainerLayoutVersion layout) throws Exception {
+ try (MiniOzoneCluster cluster = newCluster()) {
+ cluster.waitForClusterToBeReady();
+
+ OzoneConfiguration conf = cluster.getConf();
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamReadBlock(true);
+ OzoneConfiguration copy = new OzoneConfiguration(conf);
+ copy.setFromObject(clientConfig);
+ try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+ updateConfig(layout);
+ TestBucket bucket = TestBucket.newBuilder(client).build();
+
+ testBlockReadBuffers(bucket);
+ testBufferRelease(bucket);
+ testCloseReleasesBuffers(bucket);
+ testReadEmptyBlock(bucket);
+ }
+ }
+ }
+
+ /**
+ * Test to verify that data read from blocks is stored in a list of buffers
+ * with max capacity equal to the bytes per checksum.
+ */
+ private void testBlockReadBuffers(TestBucket bucket) throws Exception {
+ String keyName = getNewKeyName();
+ int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
+ byte[] inputData = bucket.writeRandomBytes(keyName, dataLength);
+
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+
+ StreamBlockInputStream block0Stream =
+ (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
+
+
+ // To read 1 byte of chunk data, ChunkInputStream should get one full
+ // checksum boundary worth of data from Container and store it in
buffers.
+ IOUtils.readFully(block0Stream, new byte[1]);
+ checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+ // Read > checksum boundary of data from chunk0
+ int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+ byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
+ bucket.validateData(inputData, 0, readData);
+
+ // The first checksum boundary size of data was already existing in the
+ // ChunkStream buffers. Once that data is read, the next checksum
+ // boundary size of data will be fetched again to read the remaining
data.
+ // Hence, there should be 1 checksum boundary size of data stored in the
+ // ChunkStreams buffers at the end of the read.
+ checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+ // Seek to a position in the third checksum boundary (so that current
+ // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
+ // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
+ // data being read into the buffers. There should be 2 buffers in the
+ // stream but the first buffer should be released after it is read
+ // and the second buffer should have BYTES_PER_CHECKSUM capacity.
+ int offset = 2 * BYTES_PER_CHECKSUM + 1;
+ readData = readDataFromBlock(block0Stream, offset, readDataLen);
+ bucket.validateData(inputData, offset, readData);
+ checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+
+ // Read the full chunk data -1 and verify that all chunk data is read
into
+ // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
+ // released once all chunk data is read.
+ readData = readDataFromBlock(block0Stream, 0, CHUNK_SIZE - 1);
+ bucket.validateData(inputData, 0, readData);
+ checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+ // Read the last byte of chunk and verify that the buffers are released.
+ IOUtils.readFully(block0Stream, new byte[1]);
+ assertNull(block0Stream.getCachedBuffers(),
+ "ChunkInputStream did not release buffers after reaching EOF.");
+ }
+ }
+
+ private void testCloseReleasesBuffers(TestBucket bucket) throws Exception {
+ String keyName = getNewKeyName();
+ bucket.writeRandomBytes(keyName, CHUNK_SIZE);
+
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+ StreamBlockInputStream block0Stream =
+ (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
+
+ readDataFromBlock(block0Stream, 0, 1);
+ assertNotNull(block0Stream.getCachedBuffers());
+
+ block0Stream.close();
+
+ assertNull(block0Stream.getCachedBuffers());
+ }
+ }
+
+ /**
+ * Test that ChunkInputStream buffers are released as soon as the last byte
+ * of the buffer is read.
+ */
+ private void testBufferRelease(TestBucket bucket) throws Exception {
+ String keyName = getNewKeyName();
+ byte[] inputData = bucket.writeRandomBytes(keyName, CHUNK_SIZE);
+
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+
+ StreamBlockInputStream block0Stream =
+ (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
+
+ // Read checksum boundary - 1 bytes of data
+ int readDataLen = BYTES_PER_CHECKSUM - 1;
+ byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
+ bucket.validateData(inputData, 0, readData);
+
+ // There should be 1 byte of data remaining in the buffer which is not
+ // yet read. Hence, the buffer should not be released.
+ checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+ assertEquals(1, block0Stream.getCachedBuffers()[0].remaining());
+
+ // Reading the last byte in the buffer should result in all the buffers
+ // being released.
+ readData = readDataFromBlock(block0Stream, 1);
+ bucket.validateData(inputData, readDataLen, readData);
+ assertNull(block0Stream.getCachedBuffers(),
+ "Chunk stream buffers not released after last byte is read");
+
+ // Read more data to get the data till the next checksum boundary.
+ readDataLen = BYTES_PER_CHECKSUM / 2;
+ readDataFromBlock(block0Stream, readDataLen);
+ // There should be one buffer and the buffer should not be released as
+ // there is data pending to be read from the buffer
+ checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+ ByteBuffer lastCachedBuffer = block0Stream.getCachedBuffers()[0];
+ assertEquals(BYTES_PER_CHECKSUM - readDataLen,
+ lastCachedBuffer.remaining());
+
+ // Read more than the remaining data in buffer (but less than the next
+ // checksum boundary).
+ int position = (int) block0Stream.getPos();
+ readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2;
+ readData = readDataFromBlock(block0Stream, readDataLen);
+ bucket.validateData(inputData, position, readData);
+ // After reading the remaining data in the buffer, the buffer should be
+ // released and next checksum size of data must be read into the buffers
+ checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+ // Verify that the previously cached buffer is released by comparing it
+ // with the current cached buffer
+ assertNotEquals(lastCachedBuffer,
+ block0Stream.getCachedBuffers()[0]);
+ }
+ }
+
+ private byte[] readDataFromBlock(StreamBlockInputStream
streamBlockInputStream,
+ int offset, int readDataLength) throws
IOException {
+ byte[] readData = new byte[readDataLength];
+ streamBlockInputStream.seek(offset);
+ IOUtils.readFully(streamBlockInputStream, readData);
+ return readData;
+ }
+
+ private byte[] readDataFromBlock(StreamBlockInputStream
streamBlockInputStream,
+ int readDataLength) throws IOException {
+ byte[] readData = new byte[readDataLength];
+ IOUtils.readFully(streamBlockInputStream, readData);
+ return readData;
+ }
+
+ /**
+ * Verify number of buffers and their capacities.
+ * @param buffers chunk stream buffers
+ */
+ private void checkBufferSizeAndCapacity(ByteBuffer[] buffers) {
+ assertEquals(1, buffers.length,
+ "ChunkInputStream does not have expected number of " +
+ "ByteBuffers");
+ for (ByteBuffer buffer : buffers) {
+ assertEquals(BYTES_PER_CHECKSUM, buffer.capacity(),
+ "ChunkInputStream ByteBuffer capacity is wrong");
+ }
+ }
+
+ private void testReadEmptyBlock(TestBucket bucket) throws Exception {
+ String keyName = getNewKeyName();
+ int dataLength = 10;
+ bucket.writeRandomBytes(keyName, 0);
+
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+
+ byte[] readData = new byte[dataLength];
+ assertTrue(keyInputStream.getPartStreams().isEmpty());
+ IOUtils.read(keyInputStream, readData);
+ for (byte b : readData) {
+ assertEquals((byte) 0, b);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]