[ 
https://issues.apache.org/jira/browse/HDFS-17552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867742#comment-17867742
 ] 

ASF GitHub Bot commented on HDFS-17552:
---------------------------------------

szetszwo commented on code in PR #6888:
URL: https://github.com/apache/hadoop/pull/6888#discussion_r1686435548


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -283,6 +326,7 @@ static class Call {
     boolean done;               // true when call is done
     private final Object externalHandler;
     private AlignmentContext alignmentContext;
+    private CompletableFuture<Object> completableFuture;

Review Comment:
   This change is a good first step!
   
   Ideally, the `completableFuture` field should replace the `done`, 
`rpcResponse` and `error` fields.  I can see that the replacement of `error` 
may not be easy.  Let's replace  `done` and `rpcResponse` ?
   
   ```java
   @@ -277,10 +320,9 @@ static class Call {
        final int id;               // call id
        final int retry;           // retry count
        final Writable rpcRequest;  // the serialized rpc request
   -    Writable rpcResponse;       // null if rpc has error
   +    private final CompletableFuture<Writable> rpcResponseFuture = new 
CompletableFuture<>();
        IOException error;          // exception, null if success
        final RPC.RpcKind rpcKind;      // Rpc EngineKind
   -    boolean done;               // true when call is done
        private final Object externalHandler;
        private AlignmentContext alignmentContext;
    
   @@ -313,9 +355,8 @@ public String toString() {
    
        /** Indicate when the call is complete and the
         * value or error are available.  Notifies by default.  */
   -    protected synchronized void callComplete() {
   -      this.done = true;
   -      notify();                                 // notify caller
   +    protected synchronized void callComplete(Writable rpcResponse) {
   +      rpcResponseFuture.complete(rpcResponse);
    
          if (externalHandler != null) {
            synchronized (externalHandler) {
   @@ -340,7 +381,7 @@ public synchronized void 
setAlignmentContext(AlignmentContext ac) {
         */
        public synchronized void setException(IOException error) {
          this.error = error;
   -      callComplete();
   +      callComplete(null);
        }
        
        /** Set the return value when there is no error. 
   @@ -349,8 +390,7 @@ public synchronized void setException(IOException error) 
{
         * @param rpcResponse return value of the rpc call.
         */
        public synchronized void setRpcResponse(Writable rpcResponse) {
   -      this.rpcResponse = rpcResponse;
   -      callComplete();
   +      callComplete(rpcResponse);
        }
        
        public synchronized Writable getRpcResponse() {
   @@ -1495,39 +1535,19 @@ Writable call(RPC.RpcKind rpcKind, Writable 
rpcRequest,
        }
    
        if (isAsynchronousMode()) {
   -      final AsyncGet<Writable, IOException> asyncGet
   -          = new AsyncGet<Writable, IOException>() {
   -        @Override
   -        public Writable get(long timeout, TimeUnit unit)
   -            throws IOException, TimeoutException{
   -          boolean done = true;
   -          try {
   -            final Writable w = getRpcResponse(call, connection, timeout, 
unit);
   -            if (w == null) {
   -              done = false;
   -              throw new TimeoutException(call + " timed out "
   -                  + timeout + " " + unit);
   -            }
   -            return w;
   -          } finally {
   -            if (done) {
   -              releaseAsyncCall();
   -            }
   -          }
   -        }
   -
   -        @Override
   -        public boolean isDone() {
   -          synchronized (call) {
   -            return call.done;
   -          }
   +      CompletableFuture<Writable> result = 
call.rpcResponseFuture.thenApply(o -> {
   +        try {
   +          return getRpcResponse(call, connection);
   +        } catch (IOException e) {
   +          throw new CompletionException(e);
   +        } finally {
   +          releaseAsyncCall();
            }
   -      };
   -
   -      ASYNC_RPC_RESPONSE.set(asyncGet);
   +      });
   +      ASYNC_RPC_RESPONSE.set(result);
          return null;
        } else {
   -      return getRpcResponse(call, connection, -1, null);
   +      return getRpcResponse(call, connection);
        }
      }
    
   @@ -1564,19 +1584,17 @@ int getAsyncCallCount() {
      }
    
      /** @return the rpc response or, in case of timeout, null. */
   -  private Writable getRpcResponse(final Call call, final Connection 
connection,
   -      final long timeout, final TimeUnit unit) throws IOException {
   +  private Writable getRpcResponse(final Call call, final Connection 
connection) throws IOException {
        synchronized (call) {
   -      while (!call.done) {
   -        try {
   -          AsyncGet.Util.wait(call, timeout, unit);
   -          if (timeout >= 0 && !call.done) {
   -            return null;
   -          }
   -        } catch (InterruptedException ie) {
   -          Thread.currentThread().interrupt();
   -          throw new InterruptedIOException("Call interrupted");
   -        }
   +      final Writable response;
   +      try {
   +        response = call.rpcResponseFuture.get();
   +      } catch (InterruptedException ie) {
   +        Thread.currentThread().interrupt();
   +        throw new InterruptedIOException("Call interrupted");
   +      } catch (ExecutionException e) {
   +        // currently, it never has ExecutionException
   +        throw new IllegalStateException(e);
          }
    
          if (call.error != null) {
   @@ -1593,7 +1611,7 @@ private Writable getRpcResponse(final Call call, final 
Connection connection,
                      call.error);
            }
          } else {
   -        return call.getRpcResponse();
   +        return response;
          }
        }
      }
   ```
   
   





> [ARR] IPC client uses CompletableFuture to support asynchronous operations.
> ---------------------------------------------------------------------------
>
>                 Key: HDFS-17552
>                 URL: https://issues.apache.org/jira/browse/HDFS-17552
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: Jian Zhang
>            Assignee: Jian Zhang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HDFS-17552.patch
>
>
> h3. Description
> In the implementation of asynchronous Ipc.client, the main methods used 
> include HADOOP-13226, HDFS-10224, etc.
> However, the existing implementation does not support `CompletableFuture`; 
> instead, it relies on setting up callbacks, which can lead to the "callback 
> hell" problem. Using `CompletableFuture` can better organize asynchronous 
> callbacks. Therefore, on the basis of the existing implementation, by using 
> `CompletableFuture`, once the `client.call` is completed, the asynchronous 
> thread handles the response of this call without blocking the main thread.
>  
> *Test*
> new UT  TestAsyncIPC#testAsyncCallWithCompletableFuture()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to