sodonnel commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2461050997


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -504,6 +516,65 @@ private XceiverClientReply sendCommandWithRetry(
     }
   }
 
+  /**
+   * Starts a streaming read operation, intended to read entire blocks from 
the datanodes. This method expects a
+   * {@link StreamObserver} to be passed in, which will be used to receive the 
streamed data from the datanode.
+   * Upon successfully starting the streaming read, a {@link 
StreamingReadResponse} is returned, which contains
+   * information about the datanode used for the read, and the request 
observer that can be used to manage the stream
+   * (e.g., to cancel it if needed). A semaphore is acquired to limit the 
number of concurrent streaming reads so upon
+   * successful return of this method, the caller must ensure to call {@link 
#completeStreamRead(StreamingReadResponse)}
+   * to release the semaphore once the streaming read is complete.
+   * @param request The container command request to initiate the streaming 
read.
+   * @param streamObserver The observer that will handle the streamed 
responses.
+   * @return A {@link StreamingReadResponse} containing details of the 
streaming read operation.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public StreamingReadResponse streamRead(ContainerCommandRequestProto request,
+      StreamObserver<ContainerCommandResponseProto> streamObserver) throws 
IOException, InterruptedException {
+    List<DatanodeDetails> datanodeList = sortDatanodes(request);
+    IOException lastException = null;
+    for (DatanodeDetails dn : datanodeList) {
+      try {
+        checkOpen(dn);
+        semaphore.acquire();
+        XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+        if (stub == null) {
+          throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Executing command {} on datanode {}", 
processForDebug(request), dn);
+        }
+        StreamObserver<ContainerCommandRequestProto> requestObserver = stub
+            .withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .send(streamObserver);
+        requestObserver.onNext(request);
+        requestObserver.onCompleted();
+        return new StreamingReadResponse(dn, 
(ClientCallStreamObserver<ContainerCommandRequestProto>) requestObserver);

Review Comment:
   I had to refactor things a bit to fix the failing checksum problem.
   
   However its still not clear to me if I need to call `onComplete()` on the 
client side stream observer at all, or if it will cause a resource leak if I 
don't. Finding useful GRPC docs is proving channeling and AI tools are giving 
inconsistent answers to me.
   
   It seems to be possible to call `cancel` after `onComplete`. But the docs 
(at least those that I can find, as its far from clear) suggest you should not 
call onError after onComplete. However I tested this out and it doesn't throw 
anything if you do.
   
   As it stands, if the stream is read to successful complete, onComplete will 
be called on the **response** observer. But onComplete is not called on the 
**request** observer.
   
   If it is cancelled, `cancel` will get call on the request observer and that 
should result in onError getting trigger on the response observer.
   
   On a checksum error, `onError` will get call on the request observer.
   
   Its all very confusing and making this quite difficult to reason about IMO. 
What we really want is a way of synchronously iterating the stream, but we we 
get with GRPC is all this complete async logic that I fear has much potential 
for problems.
   
   What seems safest is to call:
   
   ```
   requestObserver.onNext(readBlockRequest);
   // We are never going to send another request to say its complete
   requestObserver.onComplete()
   ```
   
   Later in the code, we might call cancel for a seek, or we could call cancel 
for a checksum violation rather than error, as onError is not supposed to be 
called after onComplete. That way we can avoid worrying about not calling 
onComplete on the request observer. As I said earlier, I have no idea if you 
must call it or not!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to