yandrey321 commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2547380335
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -504,6 +515,62 @@ private XceiverClientReply sendCommandWithRetry(
}
}
+ /**
+ * Starts a streaming read operation, intended to read entire blocks from
the datanodes. This method expects a
+ * {@link StreamingReaderSpi} to be passed in, which will be used to receive
the streamed data from the datanode.
+ * Upon successfully starting the streaming read, a {@link
StreamingReadResponse} is set into the pass StreamObserver,
+ * which contains information about the datanode used for the read, and the
request observer that can be used to
+ * manage the stream (e.g., to cancel it if needed). A semaphore is acquired
to limit the number of concurrent
+ * streaming reads so upon successful return of this method, the caller must
ensure to call
+ * {@link #completeStreamRead()} to release the semaphore once the streaming
read is complete.
+ * @param request The container command request to initiate the streaming
read.
+ * @param streamObserver The observer that will handle the streamed
responses.=
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public void streamRead(ContainerCommandRequestProto request,
+ StreamingReaderSpi streamObserver) throws IOException,
InterruptedException {
+ List<DatanodeDetails> datanodeList = sortDatanodes(request);
+ IOException lastException = null;
+ for (DatanodeDetails dn : datanodeList) {
+ try {
+ checkOpen(dn);
+ semaphore.acquire();
+ XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+ if (stub == null) {
+ throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing command {} on datanode {}",
processForDebug(request), dn);
+ }
+ stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)
+ .streamBlock(request, streamObserver);
+ streamObserver.setStreamingDatanode(dn);
+ return;
Review Comment:
do we do that only for one node?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]