sodonnel commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2481109699


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -504,6 +516,65 @@ private XceiverClientReply sendCommandWithRetry(
     }
   }
 
+  /**
+   * Starts a streaming read operation, intended to read entire blocks from 
the datanodes. This method expects a
+   * {@link StreamObserver} to be passed in, which will be used to receive the 
streamed data from the datanode.
+   * Upon successfully starting the streaming read, a {@link 
StreamingReadResponse} is returned, which contains
+   * information about the datanode used for the read, and the request 
observer that can be used to manage the stream
+   * (e.g., to cancel it if needed). A semaphore is acquired to limit the 
number of concurrent streaming reads so upon
+   * successful return of this method, the caller must ensure to call {@link 
#completeStreamRead(StreamingReadResponse)}
+   * to release the semaphore once the streaming read is complete.
+   * @param request The container command request to initiate the streaming 
read.
+   * @param streamObserver The observer that will handle the streamed 
responses.
+   * @return A {@link StreamingReadResponse} containing details of the 
streaming read operation.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public StreamingReadResponse streamRead(ContainerCommandRequestProto request,
+      StreamObserver<ContainerCommandResponseProto> streamObserver) throws 
IOException, InterruptedException {
+    List<DatanodeDetails> datanodeList = sortDatanodes(request);
+    IOException lastException = null;
+    for (DatanodeDetails dn : datanodeList) {
+      try {
+        checkOpen(dn);
+        semaphore.acquire();
+        XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+        if (stub == null) {
+          throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Executing command {} on datanode {}", 
processForDebug(request), dn);
+        }
+        StreamObserver<ContainerCommandRequestProto> requestObserver = stub
+            .withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .send(streamObserver);
+        requestObserver.onNext(request);
+        requestObserver.onCompleted();
+        return new StreamingReadResponse(dn, 
(ClientCallStreamObserver<ContainerCommandRequestProto>) requestObserver);

Review Comment:
   >  The server should open the file in the first gRPC onNext() call and close 
it in onComplete()/onError().
   
   How is this supposed to work? The server opens the file in the context of 
the handler thread, which is has to release after onNext() and hence close the 
open file down, along with all its offset information and checksum object is 
deserialized from rocksDB.
   
   > Increasing handler pool for performance makes sense. However, it is not a 
good solution to avoid deadlocks.
   
   The handler pool needs to be sized for the typical workload the server is 
supposed to handle. If our workload is large reads, by context switching and 
chunking things up, the re-initialization of the thread each time is going to 
require net more work as it has to repeat the setup for each call.
   
   > BTW, how large is enough? Would it slows down other operations for 
creating such a large handler pool?
   
   I don't know, but I have seen HDFS work just fine with 32k threads active, 
provided the open file limits are high enough.
   
   The only way to know for sure is to try it and see, and iterate on the 
approach as problems are identified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to