szetszwo commented on code in PR #10522:
URL: https://github.com/apache/ozone/pull/10522#discussion_r3462901343


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java:
##########
@@ -430,15 +432,27 @@ ReadBlockResponseProto poll() throws IOException {
           return null; // Stream ended, queue is empty
         }
 
-        final long elapsedNanos = System.nanoTime() - startTime;
-        if (elapsedNanos >= readTimeoutNanos) {
+        if (System.nanoTime() >= deadlineNs) {

Review Comment:
   
   
   When using  System.nanoTime(), we should always use the difference of two 
nanoTime value since [System.nanoTime() has arbitrary origin 
time](https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--).
  So 
   - keeping startTime and compute elapsedNanos is correct.
   - comparing System.nanoTime() >= deadlineNs is incorrect.
   - comparing  System.nanoTime() > 0 is incorrect.
   
   



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -567,12 +569,38 @@ private XceiverClientReply sendCommandWithRetry(
 
   @Override
   public void streamRead(ContainerCommandRequestProto request,
-      StreamingReadResponse streamObserver) {
+      StreamingReadResponse streamObserver) throws IOException {
+    final ClientCallStreamObserver<ContainerCommandRequestProto> obs = 
streamObserver.getRequestObserver();
+
+    if (!obs.isReady()) {
+      LOG.debug("->{}: flow control stall (isReady=false) for block={} 
offset={} length={}. Waiting.",
+          streamObserver,
+          request.getReadBlock().getBlockID().getLocalID(),
+          request.getReadBlock().getOffset(),
+          request.getReadBlock().getLength());
+      final long now = System.nanoTime();
+      final long callerDeadlineNs = streamObserver.getReadDeadlineNs();
+      final long waitTimeoutNanos = callerDeadlineNs > 0 ? Math.max(0, 
callerDeadlineNs - now)
+          : TimeUnit.SECONDS.toNanos(timeout);
+      final long deadlineNs = callerDeadlineNs > 0 ? callerDeadlineNs : now + 
waitTimeoutNanos;
+      while (!obs.isReady() && System.nanoTime() < deadlineNs) {
+        LockSupport.parkNanos(10_000_000L);
+        if (Thread.currentThread().isInterrupted()) {
+          Thread.currentThread().interrupt();
+          throw new InterruptedIOException("Interrupted while waiting for 
stream to become ready: " + streamObserver);
+        }
+      }

Review Comment:
   The calculation can be simplified as below:
   ```java
         for(long startTime = System.nanoTime(); !obs.isReady() && 
System.nanoTime() - startTime < readTimeoutNanos; ) {
           LockSupport.parkNanos(10_000_000L);
           if (Thread.currentThread().isInterrupted()) {
             Thread.currentThread().interrupt();
             throw new InterruptedIOException("Interrupted while waiting for 
stream to become ready: " + streamObserver);
           }
         }
   ```



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java:
##########
@@ -447,6 +461,7 @@ private ByteBuffer read(int length, boolean preRead) throws 
IOException {
         return responseQueue.isEmpty() ? null : readFromQueue();
       }
 
+      refreshReadDeadline();

Review Comment:
   This seems not needed since readBlock -> readBlockImpl -> setReadDeadlineNs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to