sodonnel commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2465368649


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -504,6 +516,65 @@ private XceiverClientReply sendCommandWithRetry(
     }
   }
 
+  /**
+   * Starts a streaming read operation, intended to read entire blocks from 
the datanodes. This method expects a
+   * {@link StreamObserver} to be passed in, which will be used to receive the 
streamed data from the datanode.
+   * Upon successfully starting the streaming read, a {@link 
StreamingReadResponse} is returned, which contains
+   * information about the datanode used for the read, and the request 
observer that can be used to manage the stream
+   * (e.g., to cancel it if needed). A semaphore is acquired to limit the 
number of concurrent streaming reads so upon
+   * successful return of this method, the caller must ensure to call {@link 
#completeStreamRead(StreamingReadResponse)}
+   * to release the semaphore once the streaming read is complete.
+   * @param request The container command request to initiate the streaming 
read.
+   * @param streamObserver The observer that will handle the streamed 
responses.
+   * @return A {@link StreamingReadResponse} containing details of the 
streaming read operation.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public StreamingReadResponse streamRead(ContainerCommandRequestProto request,
+      StreamObserver<ContainerCommandResponseProto> streamObserver) throws 
IOException, InterruptedException {
+    List<DatanodeDetails> datanodeList = sortDatanodes(request);
+    IOException lastException = null;
+    for (DatanodeDetails dn : datanodeList) {
+      try {
+        checkOpen(dn);
+        semaphore.acquire();
+        XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+        if (stub == null) {
+          throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Executing command {} on datanode {}", 
processForDebug(request), dn);
+        }
+        StreamObserver<ContainerCommandRequestProto> requestObserver = stub
+            .withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .send(streamObserver);
+        requestObserver.onNext(request);
+        requestObserver.onCompleted();
+        return new StreamingReadResponse(dn, 
(ClientCallStreamObserver<ContainerCommandRequestProto>) requestObserver);

Review Comment:
   My observation is that the HDFS model works well and results in relatively 
simple code in the approach used here which performs very well. Due to the 
buffer sizes used in the downstream readers, attempting to read a large block 
by the requested sized reads will result in many calls back and forward to the 
server. If we make the server truly async, then on each of these calls for 
often 16kb of data, it will have to queue for a thread, get a thread, context 
switch the thread, open the block file, seek the block file, read 16kb ... and 
so on.
   
   Of course we can cache open files, but we cannot have just one cached entry 
per file as many clients could be reading a file from different positions and 
of course a client can seek effectively eliminating the cache. Then we have to 
manage the cache and worry about too many open files, etc.
   
   HDFS had had some problems with its thread per open file, and we have seen 
clusters with 32k or more active xceiver threads per DN. This is usually caused 
by long writers, rather than long readers. A long reader would probably be a 
rare case.
   
   There are things we can do to mitigate holding too many handlers on the 
server. Eg the client can unbuffer after some small configurable window. The 
server could time out. We could have a separate handler pool for reads vs other 
operations.
   
   I already talked with and about the approach here and we agreed the handler 
pool would need to be greatly increased. I don't see any big problem with 
having a very large handler pool of threads that are mostly blocked. Yes they 
have some overhead in terms of memory, but we are not exactly memory 
constrained on Ozone datanodes, similar to HDFS DNs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to