yandrey321 commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2547380335


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -504,6 +515,62 @@ private XceiverClientReply sendCommandWithRetry(
     }
   }
 
+  /**
+   * Starts a streaming read operation, intended to read entire blocks from 
the datanodes. This method expects a
+   * {@link StreamingReaderSpi} to be passed in, which will be used to receive 
the streamed data from the datanode.
+   * Upon successfully starting the streaming read, a {@link 
StreamingReadResponse} is set into the pass StreamObserver,
+   * which contains information about the datanode used for the read, and the 
request observer that can be used to
+   * manage the stream (e.g., to cancel it if needed). A semaphore is acquired 
to limit the number of concurrent
+   * streaming reads so upon successful return of this method, the caller must 
ensure to call
+   * {@link #completeStreamRead()} to release the semaphore once the streaming 
read is complete.
+   * @param request The container command request to initiate the streaming 
read.
+   * @param streamObserver The observer that will handle the streamed 
responses.=
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public void streamRead(ContainerCommandRequestProto request,
+      StreamingReaderSpi streamObserver) throws IOException, 
InterruptedException {
+    List<DatanodeDetails> datanodeList = sortDatanodes(request);
+    IOException lastException = null;
+    for (DatanodeDetails dn : datanodeList) {
+      try {
+        checkOpen(dn);
+        semaphore.acquire();
+        XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+        if (stub == null) {
+          throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Executing command {} on datanode {}", 
processForDebug(request), dn);
+        }
+        stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .streamBlock(request, streamObserver);
+        streamObserver.setStreamingDatanode(dn);
+        return;

Review Comment:
   do we do that only for one node?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to