szetszwo commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2461180608
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -504,6 +516,65 @@ private XceiverClientReply sendCommandWithRetry(
}
}
+ /**
+ * Starts a streaming read operation, intended to read entire blocks from
the datanodes. This method expects a
+ * {@link StreamObserver} to be passed in, which will be used to receive the
streamed data from the datanode.
+ * Upon successfully starting the streaming read, a {@link
StreamingReadResponse} is returned, which contains
+ * information about the datanode used for the read, and the request
observer that can be used to manage the stream
+ * (e.g., to cancel it if needed). A semaphore is acquired to limit the
number of concurrent streaming reads so upon
+ * successful return of this method, the caller must ensure to call {@link
#completeStreamRead(StreamingReadResponse)}
+ * to release the semaphore once the streaming read is complete.
+ * @param request The container command request to initiate the streaming
read.
+ * @param streamObserver The observer that will handle the streamed
responses.
+ * @return A {@link StreamingReadResponse} containing details of the
streaming read operation.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public StreamingReadResponse streamRead(ContainerCommandRequestProto request,
+ StreamObserver<ContainerCommandResponseProto> streamObserver) throws
IOException, InterruptedException {
+ List<DatanodeDetails> datanodeList = sortDatanodes(request);
+ IOException lastException = null;
+ for (DatanodeDetails dn : datanodeList) {
+ try {
+ checkOpen(dn);
+ semaphore.acquire();
+ XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+ if (stub == null) {
+ throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing command {} on datanode {}",
processForDebug(request), dn);
+ }
+ StreamObserver<ContainerCommandRequestProto> requestObserver = stub
+ .withDeadlineAfter(timeout, TimeUnit.SECONDS)
+ .send(streamObserver);
+ requestObserver.onNext(request);
+ requestObserver.onCompleted();
+ return new StreamingReadResponse(dn,
(ClientCallStreamObserver<ContainerCommandRequestProto>) requestObserver);
Review Comment:
> However its still not clear to me if I need to call onComplete() ...
It should not be. If we call `onComplete()` right after `onNext()`, a gRPC
stream can only send one request with a fixed length. Then, a stream won't be
able to support multiple read below or it has to read the entire blocks all the
time.
Suppose A block has 128 MB.
- Case 1
1. A client application opens the file.
2. It calls read() with 10KB initially
3. It calls read() with 50MB more.
4. ... // the client could call read() any number of times with an
arbitrary length.
- Case 2
1. A client application opens the file.
2. It calls read() with 10KB.
3. It leaves the file open for 1 minute.
4. It closes the file.
If we only have one request per stream, how to set the data length for both
cases? If we read the entire block all the time, Case 2 is inefficient.
Otherwise, we don't read the entire block. Then, it have to open multiple gRPC
streams for case 1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]