This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d673fbf634c HDDS-13975. Limit the number of responses in stream read
block. (#9375)
d673fbf634c is described below
commit d673fbf634c2c8af79ffbbd7358365621f6e7195
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Dec 1 08:40:14 2025 -0800
HDDS-13975. Limit the number of responses in stream read block. (#9375)
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 46 +--
.../hdds/scm/storage/StreamBlockInputStream.java | 260 ++++++++-------
.../scm/storage/TestStreamBlockInputStream.java | 354 ---------------------
.../hadoop/hdds/scm/StreamingReadResponse.java | 9 +
.../apache/hadoop/hdds/scm/XceiverClientSpi.java | 10 +-
.../common/helpers/RandomAccessBlockFile.java | 92 ++++++
.../hdds/scm/storage/ContainerProtocolCalls.java | 52 ++-
.../org/apache/hadoop/ozone/common/Checksum.java | 9 +
.../container/common/impl/HddsDispatcher.java | 5 +-
.../common/interfaces/ContainerDispatcher.java | 2 +
.../ozone/container/common/interfaces/Handler.java | 4 +-
.../transport/server/GrpcXceiverService.java | 25 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 169 ++++++----
.../src/main/proto/DatanodeClientProtocol.proto | 2 +
.../rpc/read/TestStreamBlockInputStream.java | 67 ++++
.../ozone/client/rpc/read/TestStreamRead.java | 227 +++++++++++++
16 files changed, 748 insertions(+), 585 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index b6a3d00f010..d53cc957cbf 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -37,6 +37,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
@@ -59,6 +60,7 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.util.Time;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -386,11 +388,14 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
}
private List<DatanodeDetails> sortDatanodes(ContainerCommandRequestProto
request) throws IOException {
+ return sortDatanodes(getRequestBlockID(request), request.getCmdType());
+ }
+
+ List<DatanodeDetails> sortDatanodes(DatanodeBlockID blockID,
ContainerProtos.Type cmdType) throws IOException {
List<DatanodeDetails> datanodeList = null;
- DatanodeBlockID blockID = getRequestBlockID(request);
if (blockID != null) {
- if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
+ if (cmdType != ContainerProtos.Type.ReadChunk) {
datanodeList = pipeline.getNodes();
int getBlockDNLeaderIndex =
datanodeList.indexOf(pipeline.getLeaderNode());
if (getBlockDNLeaderIndex > 0) {
@@ -516,23 +521,19 @@ private XceiverClientReply sendCommandWithRetry(
}
}
- /**
- * Starts a streaming read operation, intended to read entire blocks from
the datanodes. This method expects a
- * {@link StreamingReaderSpi} to be passed in, which will be used to receive
the streamed data from the datanode.
- * Upon successfully starting the streaming read, a {@link
StreamingReadResponse} is set into the pass StreamObserver,
- * which contains information about the datanode used for the read, and the
request observer that can be used to
- * manage the stream (e.g., to cancel it if needed). A semaphore is acquired
to limit the number of concurrent
- * streaming reads so upon successful return of this method, the caller must
ensure to call
- * {@link #completeStreamRead(StreamingReadResponse)} to release the
semaphore once the streaming read is complete.
- * @param request The container command request to initiate the streaming
read.
- * @param streamObserver The observer that will handle the streamed
responses.=
- * @throws IOException
- * @throws InterruptedException
- */
@Override
public void streamRead(ContainerCommandRequestProto request,
- StreamingReaderSpi streamObserver) throws IOException,
InterruptedException {
- List<DatanodeDetails> datanodeList = sortDatanodes(request);
+ StreamingReadResponse streamObserver) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("->{}, send onNext request {}",
+ streamObserver, TextFormat.shortDebugString(request.getReadBlock()));
+ }
+ streamObserver.getRequestObserver().onNext(request);
+ }
+
+ @Override
+ public void initStreamRead(BlockID blockID, StreamingReaderSpi
streamObserver) throws IOException {
+ final List<DatanodeDetails> datanodeList = sortDatanodes(null,
ContainerProtos.Type.ReadBlock);
IOException lastException = null;
for (DatanodeDetails dn : datanodeList) {
try {
@@ -542,21 +543,20 @@ public void streamRead(ContainerCommandRequestProto
request,
if (stub == null) {
throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Executing command {} on datanode {}",
processForDebug(request), dn);
- }
+ LOG.debug("initStreamRead {} on datanode {}",
blockID.getContainerBlockID(), dn);
StreamObserver<ContainerCommandRequestProto> requestObserver = stub
.withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(streamObserver);
streamObserver.setStreamingReadResponse(new StreamingReadResponse(dn,
(ClientCallStreamObserver<ContainerCommandRequestProto>)
requestObserver));
- requestObserver.onNext(request);
- requestObserver.onCompleted();
return;
} catch (IOException e) {
LOG.error("Failed to start streaming read to DataNode {}", dn, e);
semaphore.release();
lastException = e;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted initStreamRead to " + dn + " for "
+ blockID, e);
}
}
if (lastException != null) {
@@ -572,7 +572,7 @@ public void streamRead(ContainerCommandRequestProto request,
* needed.
*/
@Override
- public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+ public void completeStreamRead() {
semaphore.release();
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
index 223ce65881f..8ddd5c4220d 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -17,9 +17,13 @@
package org.apache.hadoop.hdds.scm.storage;
+import static org.apache.ratis.thirdparty.io.grpc.Status.Code.CANCELLED;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,6 +34,7 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
@@ -37,6 +42,7 @@
import org.apache.hadoop.hdds.scm.StreamingReadResponse;
import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -44,9 +50,10 @@
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,18 +65,19 @@ public class StreamBlockInputStream extends
BlockExtendedInputStream
implements Seekable, CanUnbuffer, ByteBufferReadable {
private static final Logger LOG =
LoggerFactory.getLogger(StreamBlockInputStream.class);
private static final int EOF = -1;
- private static final Throwable CANCELLED_EXCEPTION = new
Throwable("Cancelled by client");
private final BlockID blockID;
private final long blockLength;
+ private final int responseDataSize = 1 << 20; // 1 MB
+ private final long preReadSize = 32 << 20; // 32 MB
private final AtomicReference<Pipeline> pipelineRef = new
AtomicReference<>();
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
new AtomicReference<>();
private XceiverClientFactory xceiverClientFactory;
- private XceiverClientSpi xceiverClient;
+ private XceiverClientGrpc xceiverClient;
private ByteBuffer buffer;
private long position = 0;
- private boolean initialized = false;
+ private long requestedLength = 0;
private StreamingReader streamingReader;
private final boolean verifyChecksum;
@@ -111,7 +119,7 @@ public synchronized long getPos() {
@Override
public synchronized int read() throws IOException {
checkOpen();
- if (!dataAvailableToRead()) {
+ if (!dataAvailableToRead(1)) {
return EOF;
}
position++;
@@ -129,7 +137,7 @@ public synchronized int read(ByteBuffer targetBuf) throws
IOException {
checkOpen();
int read = 0;
while (targetBuf.hasRemaining()) {
- if (!dataAvailableToRead()) {
+ if (!dataAvailableToRead(targetBuf.remaining())) {
break;
}
int toCopy = Math.min(buffer.remaining(), targetBuf.remaining());
@@ -143,16 +151,21 @@ public synchronized int read(ByteBuffer targetBuf) throws
IOException {
return read > 0 ? read : EOF;
}
- private boolean dataAvailableToRead() throws IOException {
+ private synchronized boolean dataAvailableToRead(int length) throws
IOException {
if (position >= blockLength) {
return false;
}
initialize();
- if (buffer == null || buffer.remaining() == 0) {
- int loaded = fillBuffer();
- return loaded != EOF;
+
+ if (bufferHasRemaining()) {
+ return true;
}
- return true;
+ buffer = streamingReader.read(length);
+ return bufferHasRemaining();
+ }
+
+ private synchronized boolean bufferHasRemaining() {
+ return buffer != null && buffer.hasRemaining();
}
@Override
@@ -174,6 +187,7 @@ public synchronized void seek(long pos) throws IOException {
}
closeStream();
position = pos;
+ requestedLength = pos;
}
@Override
@@ -188,12 +202,11 @@ public synchronized void unbuffer() {
releaseClient();
}
- private void closeStream() {
+ private synchronized void closeStream() {
if (streamingReader != null) {
- streamingReader.cancel();
+ streamingReader.onCompleted();
streamingReader = null;
}
- initialized = false;
buffer = null;
}
@@ -207,36 +220,61 @@ protected synchronized void acquireClient() throws
IOException {
checkOpen();
if (xceiverClient == null) {
final Pipeline pipeline = pipelineRef.get();
+ final XceiverClientSpi client;
try {
- xceiverClient =
xceiverClientFactory.acquireClientForReadData(pipeline);
+ client = xceiverClientFactory.acquireClientForReadData(pipeline);
} catch (IOException ioe) {
LOG.warn("Failed to acquire client for pipeline {}, block {}",
pipeline, blockID);
throw ioe;
}
+
+ if (client == null) {
+ throw new IOException("Failed to acquire client for " + pipeline);
+ }
+ if (!(client instanceof XceiverClientGrpc)) {
+ throw new IOException("Unexpected client class: " +
client.getClass().getName() + ", " + pipeline);
+ }
+
+ xceiverClient = (XceiverClientGrpc) client;
}
}
- private void initialize() throws IOException {
- if (initialized) {
- return;
- }
- while (true) {
+ private synchronized void initialize() throws IOException {
+ while (streamingReader == null) {
try {
acquireClient();
- streamingReader = new StreamingReader();
- ContainerProtocolCalls.readBlock(xceiverClient, position, blockID,
tokenRef.get(),
- pipelineRef.get().getReplicaIndexes(), streamingReader);
- initialized = true;
- return;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- handleExceptions(new IOException("Interrupted", ie));
} catch (IOException ioe) {
handleExceptions(ioe);
}
+
+ streamingReader = new StreamingReader();
+ xceiverClient.initStreamRead(blockID, streamingReader);
}
}
+ synchronized void readBlock(int length) throws IOException {
+ final long diff = position + length - requestedLength;
+ if (diff > 0) {
+ final long rounded = roundUp(diff + preReadSize, responseDataSize);
+ LOG.debug("position {}, length {}, requested {}, diff {}, rounded {},
preReadSize={}",
+ position, length, requestedLength, diff, rounded, preReadSize);
+ readBlockImpl(rounded);
+ requestedLength += rounded;
+ }
+ }
+
+ synchronized void readBlockImpl(long length) throws IOException {
+ if (streamingReader == null) {
+ throw new IOException("Uninitialized StreamingReader: " + blockID);
+ }
+ final StreamingReadResponse r = streamingReader.getResponse();
+ if (r == null) {
+ throw new IOException("Uninitialized StreamingReadResponse: " + blockID);
+ }
+ xceiverClient.streamRead(ContainerProtocolCalls.buildReadBlockCommandProto(
+ blockID, requestedLength, length, responseDataSize, tokenRef.get(),
pipelineRef.get()), r);
+ }
+
private void handleExceptions(IOException cause) throws IOException {
if (cause instanceof StorageContainerException ||
isConnectivityIssue(cause)) {
if (shouldRetryRead(cause, retryPolicy, retries++)) {
@@ -251,14 +289,6 @@ private void handleExceptions(IOException cause) throws
IOException {
}
}
- private int fillBuffer() throws IOException {
- if (!streamingReader.hasNext()) {
- return EOF;
- }
- buffer = streamingReader.readNext();
- return buffer == null ? EOF : buffer.limit();
- }
-
protected synchronized void releaseClient() {
if (xceiverClientFactory != null && xceiverClient != null) {
closeStream();
@@ -277,9 +307,9 @@ private void refreshBlockInfo(IOException cause) throws
IOException {
refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction);
}
- private synchronized void releaseStreamResources(StreamingReadResponse
response) {
+ private synchronized void releaseStreamResources() {
if (xceiverClient != null) {
- xceiverClient.completeStreamRead(response);
+ xceiverClient.completeStreamRead();
}
}
@@ -288,44 +318,52 @@ private synchronized void
releaseStreamResources(StreamingReadResponse response)
*/
public class StreamingReader implements StreamingReaderSpi {
- private final BlockingQueue<ContainerProtos.ReadBlockResponseProto>
responseQueue = new LinkedBlockingQueue<>(1);
- private final AtomicBoolean completed = new AtomicBoolean(false);
- private final AtomicBoolean failed = new AtomicBoolean(false);
+ /** Response queue: poll is blocking while offer is non-blocking. */
+ private final BlockingQueue<ReadBlockResponseProto> responseQueue = new
LinkedBlockingQueue<>();
+
+ private final CompletableFuture<Void> future = new CompletableFuture<>();
private final AtomicBoolean semaphoreReleased = new AtomicBoolean(false);
- private final AtomicReference<Throwable> error = new AtomicReference<>();
- private volatile StreamingReadResponse response;
+ private final AtomicReference<StreamingReadResponse> response = new
AtomicReference<>();
- public boolean hasNext() {
- return !responseQueue.isEmpty() || !completed.get();
+ void checkError() throws IOException {
+ if (future.isCompletedExceptionally()) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException("Streaming read failed", e);
+ }
+ }
}
- public ByteBuffer readNext() throws IOException {
- if (failed.get()) {
- Throwable cause = error.get();
- throw new IOException("Streaming read failed", cause);
+ ReadBlockResponseProto poll() throws IOException {
+ while (true) {
+ checkError();
+ if (future.isDone()) {
+ return null; // Stream ended
+ }
+
+ final ReadBlockResponseProto proto;
+ try {
+ proto = responseQueue.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for response", e);
+ }
+ if (proto != null) {
+ return proto;
+ }
}
+ }
- if (completed.get() && responseQueue.isEmpty()) {
+ private ByteBuffer read(int length) throws IOException {
+ checkError();
+ if (future.isDone()) {
return null; // Stream ended
}
- ReadBlockResponseProto readBlock;
- try {
- readBlock = responseQueue.poll(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting for response", e);
- }
- if (readBlock == null) {
- if (failed.get()) {
- Throwable cause = error.get();
- throw new IOException("Streaming read failed", cause);
- } else if (completed.get()) {
- return null; // Stream ended
- } else {
- throw new IOException("Timed out waiting for response");
- }
- }
+ readBlock(length);
+
+ final ReadBlockResponseProto readBlock = poll();
// The server always returns data starting from the last checksum
boundary. Therefore if the reader position is
// ahead of the position we received from the server, we need to adjust
the buffer position accordingly.
// If the reader position is behind
@@ -339,39 +377,48 @@ public ByteBuffer readNext() throws IOException {
}
if (pos > readBlock.getOffset()) {
int offset = (int)(pos - readBlock.getOffset());
+ if (offset > buf.limit()) {
+ offset = buf.limit();
+ }
buf.position(offset);
}
return buf;
}
private void releaseResources() {
- // release resources only if it was not yet completed
if (semaphoreReleased.compareAndSet(false, true)) {
- releaseStreamResources(response);
+ releaseStreamResources();
}
}
@Override
public void onNext(ContainerProtos.ContainerCommandResponseProto
containerCommandResponseProto) {
+ final ReadBlockResponseProto readBlock =
containerCommandResponseProto.getReadBlock();
try {
- ReadBlockResponseProto readBlock =
containerCommandResponseProto.getReadBlock();
ByteBuffer data = readBlock.getData().asReadOnlyByteBuffer();
if (verifyChecksum) {
ChecksumData checksumData =
ChecksumData.getFromProtoBuf(readBlock.getChecksumData());
Checksum.verifyChecksum(data, checksumData, 0);
}
offerToQueue(readBlock);
- } catch (OzoneChecksumException e) {
- LOG.warn("Checksum verification failed for block {} from datanode {}",
- getBlockID(), response.getDatanodeDetails(), e);
- cancelDueToError(e);
+ } catch (Exception e) {
+ final ByteString data = readBlock.getData();
+ final long offset = readBlock.getOffset();
+ final StreamingReadResponse r = getResponse();
+ LOG.warn("Failed to process block {} response at offset={}, size={}:
{}, {}",
+ getBlockID().getContainerBlockID(),
+ offset, data.size(), StringUtils.bytes2Hex(data.substring(0,
10).asReadOnlyByteBuffer()),
+ readBlock.getChecksumData(), e);
+ setFailed(e);
+ r.getRequestObserver().onError(e);
+ releaseResources();
}
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
- if (((StatusRuntimeException)
throwable).getStatus().getCode().name().equals("CANCELLED")) {
+ if (((StatusRuntimeException) throwable).getStatus().getCode() ==
CANCELLED) {
// This is expected when the client cancels the stream.
setCompleted();
}
@@ -387,57 +434,52 @@ public void onCompleted() {
releaseResources();
}
- /**
- * By calling cancel, the client will send a cancel signal to the server,
which will stop sending more data and
- * cause the onError() to be called in this observer with a CANCELLED
exception.
- */
- public void cancel() {
- if (response != null && response.getRequestObserver() != null) {
- response.getRequestObserver().cancel("Cancelled by client",
CANCELLED_EXCEPTION);
- setCompleted();
- releaseResources();
- }
- }
-
- public void cancelDueToError(Throwable exception) {
- if (response != null && response.getRequestObserver() != null) {
- response.getRequestObserver().onError(exception);
- setFailed(exception);
- releaseResources();
- }
+ StreamingReadResponse getResponse() {
+ return response.get();
}
private void setFailed(Throwable throwable) {
- if (completed.get()) {
- throw new IllegalArgumentException("Cannot mark a completed stream as
failed");
+ final boolean completed = future.completeExceptionally(throwable);
+ if (!completed) {
+ LOG.warn("Already failed: suppressed ", throwable);
}
- failed.set(true);
- error.set(throwable);
}
private void setCompleted() {
- if (!failed.get()) {
- completed.set(true);
+ final boolean changed = future.complete(null);
+ if (!changed) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Failed to setCompleted", e);
+ }
}
+
+ releaseResources();
}
private void offerToQueue(ReadBlockResponseProto item) {
- while (!completed.get() && !failed.get()) {
- try {
- if (responseQueue.offer(item, 100, TimeUnit.MILLISECONDS)) {
- return;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
+ if (LOG.isDebugEnabled()) {
+ final ContainerProtos.ChecksumData checksumData =
item.getChecksumData();
+ LOG.debug("offerToQueue {} bytes, numChecksums {},
bytesPerChecksum={}",
+ item.getData().size(), checksumData.getChecksumsList().size(),
checksumData.getBytesPerChecksum());
}
+ final boolean offered = responseQueue.offer(item);
+ Preconditions.assertTrue(offered, () -> "Failed to offer " + item);
}
@Override
public void setStreamingReadResponse(StreamingReadResponse
streamingReadResponse) {
- response = streamingReadResponse;
+ final boolean set = response.compareAndSet(null, streamingReadResponse);
+ Preconditions.assertTrue(set, () -> "Failed to set
streamingReadResponse");
}
}
+ static long roundUp(long required, int packet) {
+ final long n = (required - 1) / packet;
+ final long rounded = (n + 1) * packet;
+ Preconditions.assertTrue(rounded >= required);
+ Preconditions.assertTrue(rounded - packet < required);
+ return rounded;
+ }
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
deleted file mode 100644
index 83784499110..00000000000
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.storage;
-
-import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.Collections;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-import java.util.stream.Stream;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.ContainerBlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamingReadResponse;
-import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
-import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.OzoneChecksumException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Time;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.Status;
-import org.apache.ratis.thirdparty.io.grpc.StatusException;
-import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.invocation.InvocationOnMock;
-
-/**
- * Tests for {@link TestStreamBlockInputStream}'s functionality.
- */
-public class TestStreamBlockInputStream {
- private static final int BYTES_PER_CHECKSUM = 1024;
- private static final int BLOCK_SIZE = 1024;
- private StreamBlockInputStream blockStream;
- private final OzoneConfiguration conf = new OzoneConfiguration();
- private XceiverClientFactory xceiverClientFactory;
- private XceiverClientGrpc xceiverClient;
- private Checksum checksum;
- private ChecksumData checksumData;
- private byte[] data;
- private
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
requestObserver;
- private Function<BlockID, BlockLocationInfo> refreshFunction;
-
- @BeforeEach
- public void setup() throws Exception {
- Token<OzoneBlockTokenIdentifier> token = mock(Token.class);
- when(token.encodeToUrlString()).thenReturn("url");
-
- Set<HddsProtos.BlockTokenSecretProto.AccessModeProto> modes =
-
Collections.singleton(HddsProtos.BlockTokenSecretProto.AccessModeProto.READ);
- OzoneBlockTokenIdentifier tokenIdentifier = new
OzoneBlockTokenIdentifier("owner", new BlockID(1, 1),
- modes, Time.monotonicNow() + 10000, 10);
- tokenIdentifier.setSecretKeyId(UUID.randomUUID());
- when(token.getIdentifier()).thenReturn(tokenIdentifier.getBytes());
- Pipeline pipeline = MockPipeline.createSingleNodePipeline();
-
- BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
- when(blockLocationInfo.getPipeline()).thenReturn(pipeline);
- when(blockLocationInfo.getToken()).thenReturn(token);
-
- xceiverClient = mock(XceiverClientGrpc.class);
- when(xceiverClient.getPipeline()).thenReturn(pipeline);
- xceiverClientFactory = mock(XceiverClientFactory.class);
- when(xceiverClientFactory.acquireClientForReadData(any()))
- .thenReturn(xceiverClient);
- requestObserver = mock(ClientCallStreamObserver.class);
-
- OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
- clientConfig.setStreamReadBlock(true);
- clientConfig.setMaxReadRetryCount(1);
- refreshFunction = mock(Function.class);
- when(refreshFunction.apply(any())).thenReturn(blockLocationInfo);
- BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
- checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM);
- createDataAndChecksum();
- blockStream = new StreamBlockInputStream(blockID, BLOCK_SIZE, pipeline,
- token, xceiverClientFactory, refreshFunction, clientConfig);
- }
-
- @AfterEach
- public void teardown() {
- if (blockStream != null) {
- try {
- blockStream.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
-
- @Test
- public void testCloseStreamReleasesResources() throws IOException,
InterruptedException {
- setupSuccessfulRead();
- assertEquals(data[0], blockStream.read());
- blockStream.close();
- // Verify that cancel() was called on the requestObserver mock
- verify(requestObserver).cancel(any(), any());
- // Verify that release() was called on the xceiverClient mock
- verify(xceiverClientFactory).releaseClientForReadData(xceiverClient,
false);
- verify(xceiverClient, times(1)).completeStreamRead(any());
- }
-
- @Test
- public void testUnbufferReleasesResourcesAndResumesFromLastPosition() throws
IOException, InterruptedException {
- setupSuccessfulRead();
- assertEquals(data[0], blockStream.read());
- assertEquals(1, blockStream.getPos());
- blockStream.unbuffer();
- // Verify that cancel() was called on the requestObserver mock
- verify(requestObserver).cancel(any(), any());
- // Verify that release() was called on the xceiverClient mock
- verify(xceiverClientFactory).releaseClientForReadData(xceiverClient,
false);
- verify(xceiverClient, times(1)).completeStreamRead(any());
- // The next read should "rebuffer" and continue from the last position
- assertEquals(data[1], blockStream.read());
- assertEquals(2, blockStream.getPos());
- }
-
- @Test
- public void testSeekReleasesTheStreamAndStartsFromNewPosition() throws
IOException, InterruptedException {
- setupSuccessfulRead();
- assertEquals(data[0], blockStream.read());
- blockStream.seek(100);
- assertEquals(100, blockStream.getPos());
- // Verify that cancel() was called on the requestObserver mock
- verify(requestObserver).cancel(any(), any());
- verify(xceiverClient, times(1)).completeStreamRead(any());
- // The xceiverClient should not be released
- verify(xceiverClientFactory, never())
- .releaseClientForReadData(xceiverClient, false);
-
- assertEquals(data[100], blockStream.read());
- assertEquals(101, blockStream.getPos());
- }
-
- @Test
- public void testErrorThrownIfStreamReturnsError() throws IOException,
InterruptedException {
- // Note the error will only be thrown when the buffer needs to be
refilled. I think case, as its the first
- // read it will try to fill the buffer and encounter the error, but a
reader could continue reading until the
- // buffer is exhausted before seeing the error.
- doAnswer((InvocationOnMock invocation) -> {
- StreamingReaderSpi streamObserver = invocation.getArgument(1);
- StreamingReadResponse resp =
- new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
- streamObserver.setStreamingReadResponse(resp);
- streamObserver.onError(new IOException("Test induced error"));
- return null;
- }).when(xceiverClient).streamRead(any(), any());
- assertThrows(IOException.class, () -> blockStream.read());
- verify(xceiverClient, times(1)).completeStreamRead(any());
- }
-
- @Test
- public void seekOutOfBounds() throws IOException, InterruptedException {
- setupSuccessfulRead();
- assertThrows(IOException.class, () -> blockStream.seek(-1));
- assertThrows(IOException.class, () -> blockStream.seek(BLOCK_SIZE + 1));
- }
-
- @Test
- public void readPastEOFReturnsEOF() throws IOException, InterruptedException
{
- setupSuccessfulRead();
- blockStream.seek(BLOCK_SIZE);
- // Ensure the stream is at EOF even after two attempts to read
- assertEquals(-1, blockStream.read());
- assertEquals(-1, blockStream.read());
- assertEquals(BLOCK_SIZE, blockStream.getPos());
- }
-
- @Test
- public void ensureExceptionThrownForReadAfterClosed() throws IOException,
InterruptedException {
- setupSuccessfulRead();
- blockStream.close();
- ByteBuffer byteBuffer = ByteBuffer.allocate(10);
- byte[] byteArray = new byte[10];
- assertThrows(IOException.class, () -> blockStream.read());
- assertThrows(IOException.class, () -> {
- // Findbugs complains about ignored return value without this :(
- int r = blockStream.read(byteArray, 0, 10);
- });
- assertThrows(IOException.class, () -> blockStream.read(byteBuffer));
- assertThrows(IOException.class, () -> blockStream.seek(10));
- }
-
- @ParameterizedTest
- @MethodSource("exceptionsTriggeringRefresh")
- public void testRefreshFunctionCalledForAllDNsBadOnInitialize(IOException
thrown)
- throws IOException, InterruptedException {
- // In this case, if the first attempt to connect to any of the DNs fails,
it should retry by refreshing the pipeline
-
- doAnswer((InvocationOnMock invocation) -> {
- throw thrown;
- }).doAnswer((InvocationOnMock invocation) -> {
- StreamingReaderSpi streamObserver = invocation.getArgument(1);
- StreamingReadResponse resp =
- new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
- streamObserver.setStreamingReadResponse(resp);
- streamObserver.onNext(createChunkResponse(false));
- streamObserver.onCompleted();
- return null;
- }).when(xceiverClient).streamRead(any(), any());
- blockStream.read();
- verify(refreshFunction, times(1)).apply(any());
- }
-
- @ParameterizedTest
- @MethodSource("exceptionsNotTriggeringRefresh")
- public void testRefreshNotCalledForAllDNsBadOnInitialize(IOException thrown)
- throws IOException, InterruptedException {
- // In this case, if the first attempt to connect to any of the DNs fails,
it should retry by refreshing the pipeline
- doAnswer((InvocationOnMock invocation) -> {
- throw thrown;
- }).when(xceiverClient).streamRead(any(), any());
- assertThrows(IOException.class, () -> blockStream.read());
- verify(refreshFunction, times(0)).apply(any());
- }
-
- @Test
- public void testExceptionThrownAfterRetriesExhausted() throws IOException,
InterruptedException {
- // In this case, if the first attempt to connect to any of the DNs fails,
it should retry by refreshing the pipeline
- doAnswer((InvocationOnMock invocation) -> {
- throw new StorageContainerException(CONTAINER_NOT_FOUND);
- }).when(xceiverClient).streamRead(any(), any());
-
- assertThrows(IOException.class, () -> blockStream.read());
- verify(refreshFunction, times(1)).apply(any());
- }
-
- @Test
- public void testInvalidChecksumThrowsException() throws IOException,
InterruptedException {
- doAnswer((InvocationOnMock invocation) -> {
- StreamingReaderSpi streamObserver = invocation.getArgument(1);
- StreamingReadResponse resp =
- new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
- streamObserver.setStreamingReadResponse(resp);
- streamObserver.onNext(createChunkResponse(true));
- streamObserver.onCompleted();
- return null;
- }).when(xceiverClient).streamRead(any(), any());
- assertThrows(IOException.class, () -> blockStream.read());
- }
-
- private void createDataAndChecksum() throws OzoneChecksumException {
- data = new byte[BLOCK_SIZE];
- new SecureRandom().nextBytes(data);
- checksumData = checksum.computeChecksum(data);
- }
-
- private void setupSuccessfulRead() throws IOException, InterruptedException {
- doAnswer((InvocationOnMock invocation) -> {
- StreamingReaderSpi streamObserver = invocation.getArgument(1);
- StreamingReadResponse resp =
- new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
- streamObserver.setStreamingReadResponse(resp);
- streamObserver.onNext(createChunkResponse(false));
- streamObserver.onCompleted();
- return null;
- }).when(xceiverClient).streamRead(any(), any());
- }
-
- private ContainerProtos.ContainerCommandResponseProto
createChunkResponse(boolean invalidChecksum) {
- ContainerProtos.ReadBlockResponseProto response = invalidChecksum ?
- createInValidChecksumResponse() : createValidResponse();
-
- return ContainerProtos.ContainerCommandResponseProto.newBuilder()
- .setCmdType(ContainerProtos.Type.ReadBlock)
- .setReadBlock(response)
- .setResult(ContainerProtos.Result.SUCCESS)
- .build();
- }
-
- private ContainerProtos.ReadBlockResponseProto createValidResponse() {
- return ContainerProtos.ReadBlockResponseProto.newBuilder()
- .setChecksumData(checksumData.getProtoBufMessage())
- .setData(ByteString.copyFrom(data))
- .setOffset(0)
- .build();
- }
-
- private ContainerProtos.ReadBlockResponseProto
createInValidChecksumResponse() {
- byte[] invalidData = new byte[data.length];
- System.arraycopy(data, 0, invalidData, 0, data.length);
- // Corrupt the data
- invalidData[0] = (byte) (invalidData[0] + 1);
- return ContainerProtos.ReadBlockResponseProto.newBuilder()
- .setChecksumData(checksumData.getProtoBufMessage())
- .setData(ByteString.copyFrom(invalidData))
- .setOffset(0)
- .build();
- }
-
- private static Stream<Arguments> exceptionsTriggeringRefresh() {
- return Stream.of(
- Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)),
- Arguments.of(new IOException(new ExecutionException(
- new StatusException(Status.UNAVAILABLE))))
- );
- }
-
- private static Stream<Arguments> exceptionsNotTriggeringRefresh() {
- return Stream.of(
- Arguments.of(new SCMSecurityException("Security problem")),
- Arguments.of(new OzoneChecksumException("checksum missing")),
- Arguments.of(new IOException("Some random exception."))
- );
- }
-
-}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
index ea8694cd8b7..3018fda7ea6 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
@@ -29,11 +29,15 @@ public class StreamingReadResponse {
private final DatanodeDetails dn;
private final
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
requestObserver;
+ private final String name;
public StreamingReadResponse(DatanodeDetails dn,
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
requestObserver) {
this.dn = dn;
this.requestObserver = requestObserver;
+
+ final String s = dn.getID().toString();
+ this.name = "dn" + s.substring(s.lastIndexOf('-')) + "_stream";
}
public DatanodeDetails getDatanodeDetails() {
@@ -43,4 +47,9 @@ public DatanodeDetails getDatanodeDetails() {
public
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
getRequestObserver() {
return requestObserver;
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index f1bf7a8ef85..54be3c5686a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -26,6 +26,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -144,12 +145,15 @@ public ContainerCommandResponseProto sendCommand(
}
}
- public void streamRead(ContainerCommandRequestProto request,
- StreamingReaderSpi streamObserver) throws IOException,
InterruptedException {
+ public void initStreamRead(BlockID blockID, StreamingReaderSpi
streamObserver) throws IOException {
throw new UnsupportedOperationException("Stream read is not supported");
}
- public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+ public void streamRead(ContainerCommandRequestProto request,
StreamingReadResponse streamObserver) {
+ throw new UnsupportedOperationException("Stream read is not supported");
+ }
+
+ public void completeStreamRead() {
throw new UnsupportedOperationException("Stream read is not supported");
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/RandomAccessBlockFile.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/RandomAccessBlockFile.java
new file mode 100644
index 00000000000..fa5dd1ca21c
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/RandomAccessBlockFile.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link RandomAccessFile} for blocks. */
+public class RandomAccessBlockFile {
+ private static final Logger LOG =
LoggerFactory.getLogger(RandomAccessBlockFile.class);
+
+ private File blockFile;
+ private RandomAccessFile raf;
+ private FileChannel channel;
+
+ public RandomAccessBlockFile() {
+ }
+
+ public synchronized boolean isOpen() {
+ return blockFile != null;
+ }
+
+ public synchronized void open(File file) throws FileNotFoundException {
+ Preconditions.assertNull(blockFile, "blockFile");
+ blockFile = Objects.requireNonNull(file, "blockFile == null");
+ raf = new RandomAccessFile(blockFile, "r");
+ channel = raf.getChannel();
+ }
+
+ public synchronized void position(long newPosition) throws IOException {
+ Preconditions.assertTrue(isOpen(), "Not opened");
+ final long oldPosition = channel.position();
+ if (newPosition != oldPosition) {
+ LOG.debug("seek {} -> {} for file {}", oldPosition, newPosition,
blockFile);
+ channel.position(newPosition);
+ }
+ }
+
+ public synchronized boolean read(ByteBuffer buffer) throws IOException {
+ Preconditions.assertTrue(isOpen(), "Not opened");
+ while (buffer.hasRemaining()) {
+ final int r = channel.read(buffer);
+ if (r == -1) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public synchronized void close() {
+ if (blockFile == null) {
+ return;
+ }
+ blockFile = null;
+ try {
+ channel.close();
+ channel = null;
+ } catch (IOException e) {
+ LOG.warn("Failed to close channel for {}", blockFile, e);
+ throw new RuntimeException(e);
+ }
+ try {
+ raf.close();
+ raf = null;
+ } catch (IOException e) {
+ LOG.warn("Failed to close RandomAccessFile for {}", blockFile, e);
+ }
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index c6e5d75b5ca..15879fb4764 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -62,7 +62,6 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
-import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -906,23 +905,17 @@ public static List<Validator> toValidatorList(Validator
validator) {
return datanodeToResponseMap;
}
- /**
- * Calls the container protocol to read a chunk.
- *
- * @param xceiverClient client to perform call
- * @param offset offset where block starts
- * @param blockID ID of the block
- * @param token a token for this block (may be null)
- * @throws IOException if there is an I/O error while performing the call
- */
- @SuppressWarnings("checkstyle:ParameterNumber")
- public static void readBlock(
- XceiverClientSpi xceiverClient, long offset, BlockID blockID, Token<?
extends TokenIdentifier> token,
- Map<DatanodeDetails, Integer> replicaIndexes, StreamingReaderSpi
streamObserver)
- throws IOException, InterruptedException {
- final ReadBlockRequestProto.Builder readBlockRequest =
- ReadBlockRequestProto.newBuilder()
- .setOffset(offset);
+ public static ContainerCommandRequestProto buildReadBlockCommandProto(
+ BlockID blockID, long offset, long length, int responseDataSize,
+ Token<? extends TokenIdentifier> token, Pipeline pipeline)
+ throws IOException {
+ final DatanodeDetails datanode = pipeline.getClosestNode();
+ final DatanodeBlockID datanodeBlockID = getDatanodeBlockID(blockID,
datanode, pipeline.getReplicaIndexes());
+ final ReadBlockRequestProto.Builder readBlockRequest =
ReadBlockRequestProto.newBuilder()
+ .setOffset(offset)
+ .setLength(length)
+ .setResponseDataSize(responseDataSize)
+ .setBlockID(datanodeBlockID);
final ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock)
.setContainerID(blockID.getContainerID());
@@ -930,23 +923,18 @@ public static void readBlock(
builder.setEncodedToken(token.encodeToUrlString());
}
- readBlock(xceiverClient, blockID, builder, readBlockRequest,
xceiverClient.getPipeline().getFirstNode(),
- replicaIndexes, streamObserver);
+ return builder.setDatanodeUuid(datanode.getUuidString())
+ .setReadBlock(readBlockRequest)
+ .build();
}
- private static void readBlock(XceiverClientSpi xceiverClient,
- BlockID blockID, ContainerCommandRequestProto.Builder builder,
ReadBlockRequestProto.Builder readBlockBuilder,
- DatanodeDetails datanode, Map<DatanodeDetails, Integer> replicaIndexes,
- StreamingReaderSpi streamObserver) throws IOException,
InterruptedException {
- final DatanodeBlockID.Builder datanodeBlockID =
blockID.getDatanodeBlockIDProtobufBuilder();
- int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
+ static DatanodeBlockID getDatanodeBlockID(BlockID blockID, DatanodeDetails
datanode,
+ Map<DatanodeDetails, Integer> replicaIndexes) {
+ final DatanodeBlockID.Builder b =
blockID.getDatanodeBlockIDProtobufBuilder();
+ final int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
if (replicaIndex > 0) {
- datanodeBlockID.setReplicaIndex(replicaIndex);
+ b.setReplicaIndex(replicaIndex);
}
- readBlockBuilder.setBlockID(datanodeBlockID);
- final ContainerCommandRequestProto request = builder
- .setDatanodeUuid(datanode.getUuidString())
- .setReadBlock(readBlockBuilder).build();
- xceiverClient.streamRead(request, streamObserver);
+ return b.build();
}
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index fbb29cfcd70..e911e1cb7a8 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -23,6 +23,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -270,6 +271,14 @@ protected static ByteString computeChecksum(ByteBuffer
data,
}
}
+ public static void verifySingleChecksum(ByteBuffer buffer, int offset, int
bytesPerChecksum,
+ ByteString checksum, ChecksumType checksumType) throws
OzoneChecksumException {
+ final ByteBuffer duplicated = buffer.duplicate();
+ duplicated.position(offset).limit(offset + bytesPerChecksum);
+ final ChecksumData cd = new ChecksumData(checksumType, bytesPerChecksum,
Collections.singletonList(checksum));
+ verifyChecksum(duplicated, cd, 0);
+ }
+
/**
* Computes the ChecksumData for the input data and verifies that it
* matches with that of the input checksumData.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 6dba6abf9d0..4e8becfb10c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -49,6 +49,7 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import
org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.token.NoopTokenVerifier;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
@@ -819,7 +820,7 @@ public StateMachine.DataChannel getStreamDataChannel(
@Override
public void streamDataReadOnly(ContainerCommandRequestProto msg,
StreamObserver<ContainerCommandResponseProto> streamObserver,
- DispatcherContext dispatcherContext) {
+ RandomAccessBlockFile blockFile, DispatcherContext dispatcherContext) {
Type cmdType = msg.getCmdType();
String traceID = msg.getTraceID();
Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID);
@@ -858,7 +859,7 @@ public void streamDataReadOnly(ContainerCommandRequestProto
msg,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
}
perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
- responseProto = handler.readBlock(msg, container, dispatcherContext,
streamObserver);
+ responseProto = handler.readBlock(msg, container, blockFile,
streamObserver);
long oPLatencyMS = Time.monotonicNow() - startTime;
metrics.incContainerOpsLatencies(cmdType, oPLatencyMS);
if (responseProto == null) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 8a4a675187d..8edf6c34b9f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.ratis.statemachine.StateMachine;
@@ -97,6 +98,7 @@ default StateMachine.DataChannel getStreamDataChannel(
default void streamDataReadOnly(
ContainerCommandRequestProto msg,
StreamObserver<ContainerCommandResponseProto> streamObserver,
+ RandomAccessBlockFile blockFile,
DispatcherContext dispatcherContext) {
throw new UnsupportedOperationException("streamDataReadOnly not
supported.");
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 0abcab5afea..af4670bc3a8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -29,6 +29,7 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
@@ -40,7 +41,6 @@
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.ratis.statemachine.StateMachine;
@@ -268,7 +268,7 @@ public void setClusterID(String clusterID) {
public abstract ContainerCommandResponseProto readBlock(
ContainerCommandRequestProto msg, Container container,
- DispatcherContext dispatcherContext,
+ RandomAccessBlockFile blockFile,
StreamObserver<ContainerCommandResponseProto> streamObserver);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 041958b4227..5a05b3f48ca 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -24,6 +24,7 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
@@ -31,6 +32,8 @@
import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,6 +98,15 @@ public StreamObserver<ContainerCommandRequestProto> send(
StreamObserver<ContainerCommandResponseProto> responseObserver) {
return new StreamObserver<ContainerCommandRequestProto>() {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final RandomAccessBlockFile blockFile = new
RandomAccessBlockFile();
+
+ boolean close() {
+ if (isClosed.compareAndSet(false, true)) {
+ blockFile.close();
+ return true;
+ }
+ return false;
+ }
@Override
public void onNext(ContainerCommandRequestProto request) {
@@ -105,7 +117,7 @@ public void onNext(ContainerCommandRequestProto request) {
try {
if (request.getCmdType() == Type.ReadBlock) {
- dispatcher.streamDataReadOnly(request, responseObserver, null);
+ dispatcher.streamDataReadOnly(request, responseObserver,
blockFile, context);
} else {
final ContainerCommandResponseProto resp =
dispatcher.dispatch(request, context);
responseObserver.onNext(resp);
@@ -113,7 +125,7 @@ public void onNext(ContainerCommandRequestProto request) {
} catch (Throwable e) {
LOG.error("Got exception when processing"
+ " ContainerCommandRequestProto {}", request, e);
- isClosed.set(true);
+ close();
responseObserver.onError(e);
} finally {
zeroCopyMessageMarshaller.release(request);
@@ -125,13 +137,20 @@ public void onNext(ContainerCommandRequestProto request) {
@Override
public void onError(Throwable t) {
+ close();
+ if (t instanceof StatusRuntimeException) {
+ if (((StatusRuntimeException) t).getStatus().getCode() ==
Status.Code.CANCELLED) {
+ return;
+ }
+ }
+
// for now we just log a msg
LOG.error("ContainerCommand send on error. Exception: ", t);
}
@Override
public void onCompleted() {
- if (isClosed.compareAndSet(false, true)) {
+ if (close()) {
LOG.debug("ContainerCommand send completed");
responseObserver.onCompleted();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 2f4177b2d06..60224078625 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -60,6 +60,8 @@
import static
org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline;
import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT;
import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
+import static org.apache.ratis.util.Preconditions.assertSame;
+import static org.apache.ratis.util.Preconditions.assertTrue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -70,9 +72,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -80,7 +80,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -101,6 +100,7 @@
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
@@ -114,6 +114,7 @@
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
@@ -2058,7 +2059,7 @@ public void deleteUnreferenced(Container container, long
localID)
@Override
public ContainerCommandResponseProto readBlock(
ContainerCommandRequestProto request, Container kvContainer,
- DispatcherContext dispatcherContext,
+ RandomAccessBlockFile blockFile,
StreamObserver<ContainerCommandResponseProto> streamObserver) {
if (kvContainer.getContainerData().getLayoutVersion() != FILE_PER_BLOCK) {
@@ -2074,67 +2075,121 @@ public ContainerCommandResponseProto readBlock(
return malformedRequest(request);
}
try {
- ReadBlockRequestProto readBlock = request.getReadBlock();
-
- BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
- // This is a new api the block should always be checked.
- BlockUtils.verifyReplicaIdx(kvContainer, blockID);
- BlockUtils.verifyBCSId(kvContainer, blockID);
-
- File blockFile =
FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused");
-
- BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
- List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
- // To get the chunksize, check the first chunk. Either there is only 1
chunk and its the largest, or there are
- // multiple chunks and they are all the same size except the last one.
- long bytesPerChunk = chunkInfos.get(0).getLen();
- // The bytes per checksum is stored in the checksum data of each chunk,
so check the first chunk as they all
- // must be the same.
- ContainerProtos.ChecksumType checksumType =
chunkInfos.get(0).getChecksumData().getType();
- ChecksumData checksumData = null;
- int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK;
- if (checksumType == ContainerProtos.ChecksumType.NONE) {
- checksumData = new ChecksumData(checksumType, 0);
- } else {
- bytesPerChecksum =
chunkInfos.get(0).getChecksumData().getBytesPerChecksum();
- }
- // We have to align the read to checksum boundaries, so whatever offset
is requested, we have to move back to the
- // previous checksum boundary.
- // eg if bytesPerChecksum is 512, and the requested offset is 600, we
have to move back to 512.
- // If the checksum type is NONE, we don't have to do this, but using no
checksums should be rare in practice and
- // it simplifies the code to always do this.
- long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() %
bytesPerChecksum;
- try (RandomAccessFile file = new RandomAccessFile(blockFile, "r");
- FileChannel channel = file.getChannel()) {
- ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum);
- channel.position(adjustedOffset);
- while (channel.read(buffer) != -1) {
- buffer.flip();
- if (checksumType != ContainerProtos.ChecksumType.NONE) {
- // As the checksums are stored "chunk by chunk", we need to figure
out which chunk we start reading from,
- // and its offset to pull out the correct checksum bytes for each
read.
- int chunkIndex = (int) (adjustedOffset / bytesPerChunk);
- int chunkOffset = (int) (adjustedOffset % bytesPerChunk);
- int checksumIndex = chunkOffset / bytesPerChecksum;
- ByteString checksum =
blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex);
- checksumData = new ChecksumData(checksumType, bytesPerChecksum,
Collections.singletonList(checksum));
- }
- streamObserver.onNext(getReadBlockResponse(request, checksumData,
buffer, adjustedOffset));
- buffer.clear();
-
- adjustedOffset += bytesPerChecksum;
- }
- }
+ readBlockImpl(request, blockFile, kvContainer, streamObserver, false);
// TODO metrics.incContainerBytesStats(Type.ReadBlock,
readBlock.getLen());
} catch (StorageContainerException ex) {
responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ioe) {
- responseProto = ContainerUtils.logAndReturnError(LOG,
- new StorageContainerException("Read Block failed", ioe,
IO_EXCEPTION), request);
+ final StorageContainerException sce = new StorageContainerException(
+ "Failed to readBlock " + request.getReadBlock(), ioe, IO_EXCEPTION);
+ responseProto = ContainerUtils.logAndReturnError(LOG, sce, request);
+ } catch (Exception e) {
+ final StorageContainerException sce = new StorageContainerException(
+ "Failed to readBlock " + request.getReadBlock(), e,
CONTAINER_INTERNAL_ERROR);
+ LOG.error("", sce);
+ responseProto = ContainerUtils.logAndReturnError(LOG, sce, request);
}
return responseProto;
}
+ private void readBlockImpl(ContainerCommandRequestProto request,
RandomAccessBlockFile blockFile,
+ Container kvContainer, StreamObserver<ContainerCommandResponseProto>
streamObserver, boolean verifyChecksum)
+ throws IOException {
+ final ReadBlockRequestProto readBlock = request.getReadBlock();
+ int responseDataSize = readBlock.getResponseDataSize();
+ if (responseDataSize == 0) {
+ responseDataSize = 1 << 20;
+ }
+
+ final BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
+ if (!blockFile.isOpen()) {
+ final File file =
FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused");
+ blockFile.open(file);
+ }
+
+ // This is a new api the block should always be checked.
+ BlockUtils.verifyReplicaIdx(kvContainer, blockID);
+ BlockUtils.verifyBCSId(kvContainer, blockID);
+
+ final BlockData blockData = getBlockManager().getBlock(kvContainer,
blockID);
+ final List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ final int bytesPerChunk = Math.toIntExact(chunkInfos.get(0).getLen());
+ final ChecksumType checksumType =
chunkInfos.get(0).getChecksumData().getType();
+ ChecksumData checksumData = null;
+ int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK;
+ if (checksumType == ContainerProtos.ChecksumType.NONE) {
+ checksumData = new ChecksumData(checksumType, 0);
+ } else {
+ bytesPerChecksum =
chunkInfos.get(0).getChecksumData().getBytesPerChecksum();
+ }
+ // We have to align the read to checksum boundaries, so whatever offset is
requested, we have to move back to the
+ // previous checksum boundary.
+ // eg if bytesPerChecksum is 512, and the requested offset is 600, we have
to move back to 512.
+ // If the checksum type is NONE, we don't have to do this, but using no
checksums should be rare in practice and
+ // it simplifies the code to always do this.
+ long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() %
bytesPerChecksum;
+
+ final ByteBuffer buffer = ByteBuffer.allocate(responseDataSize);
+ blockFile.position(adjustedOffset);
+ int totalDataLength = 0;
+ int numResponses = 0;
+ for (boolean shouldRead = true; totalDataLength < readBlock.getLength() &&
shouldRead;) {
+ shouldRead = blockFile.read(buffer);
+ buffer.flip();
+ final int readLength = buffer.remaining();
+ assertTrue(readLength > 0, () -> "readLength = " + readLength + " <= 0");
+ if (checksumType != ContainerProtos.ChecksumType.NONE) {
+ final List<ByteString> checksums = getChecksums(adjustedOffset,
readLength,
+ bytesPerChunk, bytesPerChecksum, chunkInfos);
+ LOG.debug("Read {} at adjustedOffset {}, readLength {}, bytesPerChunk
{}, bytesPerChecksum {}",
+ readBlock, adjustedOffset, readLength, bytesPerChunk,
bytesPerChecksum);
+ checksumData = new ChecksumData(checksumType, bytesPerChecksum,
checksums);
+ if (verifyChecksum) {
+ Checksum.verifyChecksum(buffer.duplicate(), checksumData, 0);
+ }
+ }
+ final ContainerCommandResponseProto response = getReadBlockResponse(
+ request, checksumData, buffer, adjustedOffset);
+ final int dataLength = response.getReadBlock().getData().size();
+ LOG.debug("server onNext response {}: dataLength={}, numChecksums={}",
+ numResponses, dataLength,
response.getReadBlock().getChecksumData().getChecksumsList().size());
+ streamObserver.onNext(response);
+ buffer.clear();
+
+ adjustedOffset += readLength;
+ totalDataLength += dataLength;
+ numResponses++;
+ }
+ }
+
+ static List<ByteString> getChecksums(long blockOffset, int readLength, int
bytesPerChunk, int bytesPerChecksum,
+ final List<ContainerProtos.ChunkInfo> chunks) {
+ assertSame(0, blockOffset % bytesPerChecksum, "blockOffset %
bytesPerChecksum");
+ final int numChecksums = readLength / bytesPerChecksum;
+ final List<ByteString> checksums = new ArrayList<>(numChecksums);
+ for (int i = 0; i < numChecksums; i++) {
+ // As the checksums are stored "chunk by chunk", we need to figure out
which chunk we start reading from,
+ // and its offset to pull out the correct checksum bytes for each read.
+ final int n = i * bytesPerChecksum;
+ final long offset = blockOffset + n;
+ final int c = Math.toIntExact(offset / bytesPerChunk);
+ final int chunkOffset = Math.toIntExact(offset % bytesPerChunk);
+ final int csi = chunkOffset / bytesPerChecksum;
+
+ assertTrue(c < chunks.size(),
+ () -> "chunkIndex = " + c + " >= chunk.size()" + chunks.size());
+ final ContainerProtos.ChunkInfo chunk = chunks.get(c);
+ assertSame(bytesPerChunk, chunk.getLen(), "bytesPerChunk");
+ final ContainerProtos.ChecksumData checksumDataProto =
chunks.get(c).getChecksumData();
+ assertSame(bytesPerChecksum, checksumDataProto.getBytesPerChecksum(),
"bytesPerChecksum");
+ final List<ByteString> checksumsList =
checksumDataProto.getChecksumsList();
+ assertTrue(csi < checksumsList.size(),
+ () -> "checksumIndex = " + csi + " >= checksumsList.size()" +
checksumsList.size());
+ checksums.add(checksumsList.get(csi));
+ }
+ return checksums;
+ }
+
@Override
public void addFinalizedBlock(Container container, long localID) {
KeyValueContainer keyValueContainer = (KeyValueContainer)container;
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 6b4d8f1bd7f..cb2b1fb27f3 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -401,6 +401,8 @@ message ListBlockResponseProto {
message ReadBlockRequestProto {
required DatanodeBlockID blockID = 1;
required uint64 offset = 2;
+ optional uint64 length = 3;
+ optional uint32 responseDataSize = 4;
}
message ReadBlockResponseProto {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
index bb66a303155..ab46d2fd218 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -31,13 +31,33 @@
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import
org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService;
import org.apache.hadoop.ozone.om.TestBucket;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
/**
* Tests {@link StreamBlockInputStream}.
*/
public class TestStreamBlockInputStream extends TestInputStreamBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestStreamBlockInputStream.class);
+
+ {
+ GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
Level.ERROR);
+ GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"),
Level.ERROR);
+ GenericTestUtils.setLogLevel(GrpcXceiverService.class, Level.ERROR);
+ }
+
/**
* Run the tests as a single test method to avoid needing a new mini-cluster
* for each test.
@@ -46,6 +66,53 @@ public class TestStreamBlockInputStream extends
TestInputStreamBase {
private byte[] inputData;
private TestBucket bucket;
+ @Test
+ void testReadKey() throws Exception {
+ try (MiniOzoneCluster cluster = newCluster()) {
+ cluster.waitForClusterToBeReady();
+
+ LOG.info("cluster ready");
+
+ OzoneConfiguration conf = cluster.getConf();
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamReadBlock(true);
+ OzoneConfiguration copy = new OzoneConfiguration(conf);
+ copy.setFromObject(clientConfig);
+ String keyName = getNewKeyName();
+ try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+ bucket = TestBucket.newBuilder(client).build();
+ inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH);
+ LOG.info("writeRandomBytes {} bytes", inputData.length);
+
+ for (int i = 1; i <= 10; i++) {
+ runTestReadKey(keyName, DATA_LENGTH / i);
+ }
+
+ for (int n = 4; n <= 16 << 10; n <<= 2) {
+ runTestReadKey(keyName, n << 10); // 4kB
+ }
+ }
+ }
+ }
+
+ private void runTestReadKey(String key, int bufferSize) throws Exception {
+ LOG.info("---------------------------------------------------------");
+ LOG.info("read {} bytes with bufferSize {}", DATA_LENGTH, bufferSize);
+ // Read the data fully into a large enough byte array
+ final byte[] buffer = new byte[bufferSize];
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+ int pos = 0;
+ for (; pos < DATA_LENGTH;) {
+ final int read = keyInputStream.read(buffer, 0, buffer.length);
+ for (int i = 0; i < read; i++) {
+ assertEquals(inputData[pos + i], buffer[i], "pos=" + pos + ", i=" +
i);
+ }
+ pos += read;
+ }
+ assertEquals(DATA_LENGTH, pos);
+ }
+ }
+
@Test
void testAll() throws Exception {
try (MiniOzoneCluster cluster = newCluster()) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
new file mode 100644
index 00000000000..134a87bf2ad
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.OutputStream;
+import java.security.MessageDigest;
+import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.om.TestBucket;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * Tests {@link StreamBlockInputStream}.
+ */
+public class TestStreamRead {
+ private TestBucket bucket;
+
+ {
+ GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.utils"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
Level.ERROR);
+ GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"),
Level.ERROR);
+ GenericTestUtils.setLogLevel(LoggerFactory.getLogger(CodecBuffer.class),
Level.ERROR);
+ }
+
+ static final int CHUNK_SIZE = 1 << 20; // 1MB
+ static final int FLUSH_SIZE = 2 * CHUNK_SIZE; // 2MB
+ static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; // 4MB
+
+ static final int BLOCK_SIZE = 128 << 20;
+ static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("256M");
+
+ static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
+ final OzoneConfiguration conf = new OzoneConfiguration();
+
+ OzoneClientConfig config = conf.getObject(OzoneClientConfig.class);
+ config.setBytesPerChecksum(bytesPerChecksum);
+ conf.setFromObject(config);
+
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 5);
+ conf.setQuietMode(true);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64,
StorageUnit.MB);
+
+ ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
+ .setBlockSize(BLOCK_SIZE)
+ .setChunkSize(CHUNK_SIZE)
+ .setStreamBufferFlushSize(FLUSH_SIZE)
+ .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+ .applyTo(conf);
+
+ return MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(3)
+ .build();
+ }
+
+ @Test
+ void testReadKey512() throws Exception {
+ final SizeInBytes bytesPerChecksum = SizeInBytes.valueOf(512);
+ runTestReadKey(KEY_SIZE, bytesPerChecksum);
+ }
+
+ @Test
+ void testReadKey16k() throws Exception {
+ final SizeInBytes bytesPerChecksum = SizeInBytes.valueOf("16k");
+ runTestReadKey(KEY_SIZE, bytesPerChecksum);
+ }
+
+ @Test
+ void testReadKey256k() throws Exception {
+ final SizeInBytes bytesPerChecksum = SizeInBytes.valueOf("256k");
+ runTestReadKey(KEY_SIZE, bytesPerChecksum);
+ }
+
+ void runTestReadKey(SizeInBytes keySize, SizeInBytes bytesPerChecksum)
throws Exception {
+ try (MiniOzoneCluster cluster = newCluster(bytesPerChecksum.getSizeInt()))
{
+ cluster.waitForClusterToBeReady();
+
+ System.out.println("cluster ready");
+
+ OzoneConfiguration conf = cluster.getConf();
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamReadBlock(true);
+ OzoneConfiguration copy = new OzoneConfiguration(conf);
+ copy.setFromObject(clientConfig);
+
+ final int n = 10;
+ final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
+ final SizeInBytes[] readBufferSizes = {
+ SizeInBytes.valueOf("4k"),
+ SizeInBytes.valueOf("1M"),
+ SizeInBytes.valueOf("8M"),
+ SizeInBytes.valueOf("32M"),
+ };
+
+ try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+ bucket = TestBucket.newBuilder(client).build();
+
+ for (int i = 0; i < n; i++) {
+ final String keyName = "key" + i;
+
System.out.println("---------------------------------------------------------");
+ System.out.printf("%s with %s bytes and %s bytesPerChecksum%n",
+ keyName, keySize, bytesPerChecksum);
+
+ final String md5 = createKey(bucket.delegate(), keyName, keySize,
writeBufferSize);
+ for (SizeInBytes readBufferSize : readBufferSizes) {
+ runTestReadKey(keyName, keySize, readBufferSize, null);
+ runTestReadKey(keyName, keySize, readBufferSize, md5);
+ }
+ }
+ }
+ }
+ }
+
+ static void print(String name, long keySizeByte, long elapsedNanos,
SizeInBytes bufferSize, String computedMD5) {
+ final double keySizeMb = keySizeByte * 1.0 / (1 << 20);
+ final double elapsedSeconds = elapsedNanos / 1_000_000_000.0;
+ System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f
MB, md5=%s)%n",
+ name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize,
keySizeMb, computedMD5);
+ }
+
+ static String createKey(OzoneBucket bucket, String keyName, SizeInBytes
keySize, SizeInBytes bufferSize)
+ throws Exception {
+ final byte[] buffer = new byte[bufferSize.getSizeInt()];
+ ThreadLocalRandom.current().nextBytes(buffer);
+
+ final long keySizeByte = keySize.getSize();
+ final long startTime = System.nanoTime();
+ try (OutputStream stream = bucket.createStreamKey(keyName, keySizeByte,
+ RatisReplicationConfig.getInstance(THREE), Collections.emptyMap())) {
+ for (long pos = 0; pos < keySizeByte;) {
+ final int writeSize = Math.toIntExact(Math.min(buffer.length,
keySizeByte - pos));
+ stream.write(buffer, 0, writeSize);
+ pos += writeSize;
+ }
+ }
+ final long elapsedNanos = System.nanoTime() - startTime;
+
+ final MessageDigest md5 = MessageDigest.getInstance("MD5");
+ for (long pos = 0; pos < keySizeByte;) {
+ final int writeSize = Math.toIntExact(Math.min(buffer.length,
keySizeByte - pos));
+ md5.update(buffer, 0, writeSize);
+ pos += writeSize;
+ }
+
+ final String computedMD5 = StringUtils.bytes2Hex(md5.digest());
+ print("createStreamKey", keySizeByte, elapsedNanos, bufferSize,
computedMD5);
+ return computedMD5;
+ }
+
+ private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes
bufferSize, String expectedMD5)
+ throws Exception {
+ final long keySizeByte = keySize.getSize();
+ final MessageDigest md5 = MessageDigest.getInstance("MD5");
+ // Read the data fully into a large enough byte array
+ final byte[] buffer = new byte[bufferSize.getSizeInt()];
+ final long startTime = System.nanoTime();
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+ int pos = 0;
+ for (; pos < keySizeByte;) {
+ final int read = keyInputStream.read(buffer, 0, buffer.length);
+ if (read == -1) {
+ break;
+ }
+
+ if (expectedMD5 != null) {
+ md5.update(buffer, 0, read);
+ }
+ pos += read;
+ }
+ assertEquals(keySizeByte, pos);
+ }
+ final long elapsedNanos = System.nanoTime() - startTime;
+
+ final String computedMD5;
+ if (expectedMD5 == null) {
+ computedMD5 = null;
+ } else {
+ computedMD5 = StringUtils.bytes2Hex(md5.digest());
+ assertEquals(expectedMD5, computedMD5);
+ }
+ print("readStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]