sodonnel commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2463018964


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -504,6 +516,65 @@ private XceiverClientReply sendCommandWithRetry(
     }
   }
 
+  /**
+   * Starts a streaming read operation, intended to read entire blocks from 
the datanodes. This method expects a
+   * {@link StreamObserver} to be passed in, which will be used to receive the 
streamed data from the datanode.
+   * Upon successfully starting the streaming read, a {@link 
StreamingReadResponse} is returned, which contains
+   * information about the datanode used for the read, and the request 
observer that can be used to manage the stream
+   * (e.g., to cancel it if needed). A semaphore is acquired to limit the 
number of concurrent streaming reads so upon
+   * successful return of this method, the caller must ensure to call {@link 
#completeStreamRead(StreamingReadResponse)}
+   * to release the semaphore once the streaming read is complete.
+   * @param request The container command request to initiate the streaming 
read.
+   * @param streamObserver The observer that will handle the streamed 
responses.
+   * @return A {@link StreamingReadResponse} containing details of the 
streaming read operation.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public StreamingReadResponse streamRead(ContainerCommandRequestProto request,
+      StreamObserver<ContainerCommandResponseProto> streamObserver) throws 
IOException, InterruptedException {
+    List<DatanodeDetails> datanodeList = sortDatanodes(request);
+    IOException lastException = null;
+    for (DatanodeDetails dn : datanodeList) {
+      try {
+        checkOpen(dn);
+        semaphore.acquire();
+        XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+        if (stub == null) {
+          throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Executing command {} on datanode {}", 
processForDebug(request), dn);
+        }
+        StreamObserver<ContainerCommandRequestProto> requestObserver = stub
+            .withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .send(streamObserver);
+        requestObserver.onNext(request);
+        requestObserver.onCompleted();
+        return new StreamingReadResponse(dn, 
(ClientCallStreamObserver<ContainerCommandRequestProto>) requestObserver);

Review Comment:
   There is also an option for the client to "unbuffer" if it sits idle for too 
long to terminate the server thread, while saving its position.
   
   I feel that the common use case is that clients consume large streams of a 
block or even the entire block, rather than opening and holding the file open 
without reading for long periods of time.
   
   I also have seen that the common read case is not "50MB", but more like 4kb 
- 64kb and in these cases the overhead of starting the read over including 
opening a seeking the block file can be significant. Many downstream 
InputStreams operate with quite small buffers by default, including the Hadoop 
FS ones. So we tend to see a lot of tiny sequential reads as they consume the 
block from start to end.
   
   One struggle is that there are at least 2 types of readers. The HBase style 
which wants to read a length at an offset scattered all over the file and the 
Hive / Spark type workload that scans a block from start to finish. In between 
is the ORC / Parquet reader where it seeks a bit to read some metadata and then 
does relatively large sequences of reads, but quite likely into a small buffer, 
pulling and processing the rows piece by piece.
   
   From this, it probably makes sense to have different modes of operation, but 
how that fits into existing interfaces is another problem! I don't think the 
same code path can be optimal for such different usage patterns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to