[
https://issues.apache.org/jira/browse/HDFS-17552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867742#comment-17867742
]
ASF GitHub Bot commented on HDFS-17552:
---------------------------------------
szetszwo commented on code in PR #6888:
URL: https://github.com/apache/hadoop/pull/6888#discussion_r1686435548
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -283,6 +326,7 @@ static class Call {
boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;
+ private CompletableFuture<Object> completableFuture;
Review Comment:
This change is a good first step!
Ideally, the `completableFuture` field should replace the `done`,
`rpcResponse` and `error` fields. I can see that the replacement of `error`
may not be easy. Let's replace `done` and `rpcResponse` ?
```java
@@ -277,10 +320,9 @@ static class Call {
final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
- Writable rpcResponse; // null if rpc has error
+ private final CompletableFuture<Writable> rpcResponseFuture = new
CompletableFuture<>();
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
- boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;
@@ -313,9 +355,8 @@ public String toString() {
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
- protected synchronized void callComplete() {
- this.done = true;
- notify(); // notify caller
+ protected synchronized void callComplete(Writable rpcResponse) {
+ rpcResponseFuture.complete(rpcResponse);
if (externalHandler != null) {
synchronized (externalHandler) {
@@ -340,7 +381,7 @@ public synchronized void
setAlignmentContext(AlignmentContext ac) {
*/
public synchronized void setException(IOException error) {
this.error = error;
- callComplete();
+ callComplete(null);
}
/** Set the return value when there is no error.
@@ -349,8 +390,7 @@ public synchronized void setException(IOException error)
{
* @param rpcResponse return value of the rpc call.
*/
public synchronized void setRpcResponse(Writable rpcResponse) {
- this.rpcResponse = rpcResponse;
- callComplete();
+ callComplete(rpcResponse);
}
public synchronized Writable getRpcResponse() {
@@ -1495,39 +1535,19 @@ Writable call(RPC.RpcKind rpcKind, Writable
rpcRequest,
}
if (isAsynchronousMode()) {
- final AsyncGet<Writable, IOException> asyncGet
- = new AsyncGet<Writable, IOException>() {
- @Override
- public Writable get(long timeout, TimeUnit unit)
- throws IOException, TimeoutException{
- boolean done = true;
- try {
- final Writable w = getRpcResponse(call, connection, timeout,
unit);
- if (w == null) {
- done = false;
- throw new TimeoutException(call + " timed out "
- + timeout + " " + unit);
- }
- return w;
- } finally {
- if (done) {
- releaseAsyncCall();
- }
- }
- }
-
- @Override
- public boolean isDone() {
- synchronized (call) {
- return call.done;
- }
+ CompletableFuture<Writable> result =
call.rpcResponseFuture.thenApply(o -> {
+ try {
+ return getRpcResponse(call, connection);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ } finally {
+ releaseAsyncCall();
}
- };
-
- ASYNC_RPC_RESPONSE.set(asyncGet);
+ });
+ ASYNC_RPC_RESPONSE.set(result);
return null;
} else {
- return getRpcResponse(call, connection, -1, null);
+ return getRpcResponse(call, connection);
}
}
@@ -1564,19 +1584,17 @@ int getAsyncCallCount() {
}
/** @return the rpc response or, in case of timeout, null. */
- private Writable getRpcResponse(final Call call, final Connection
connection,
- final long timeout, final TimeUnit unit) throws IOException {
+ private Writable getRpcResponse(final Call call, final Connection
connection) throws IOException {
synchronized (call) {
- while (!call.done) {
- try {
- AsyncGet.Util.wait(call, timeout, unit);
- if (timeout >= 0 && !call.done) {
- return null;
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Call interrupted");
- }
+ final Writable response;
+ try {
+ response = call.rpcResponseFuture.get();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Call interrupted");
+ } catch (ExecutionException e) {
+ // currently, it never has ExecutionException
+ throw new IllegalStateException(e);
}
if (call.error != null) {
@@ -1593,7 +1611,7 @@ private Writable getRpcResponse(final Call call, final
Connection connection,
call.error);
}
} else {
- return call.getRpcResponse();
+ return response;
}
}
}
```
> [ARR] IPC client uses CompletableFuture to support asynchronous operations.
> ---------------------------------------------------------------------------
>
> Key: HDFS-17552
> URL: https://issues.apache.org/jira/browse/HDFS-17552
> Project: Hadoop HDFS
> Issue Type: Sub-task
> Reporter: Jian Zhang
> Assignee: Jian Zhang
> Priority: Major
> Labels: pull-request-available
> Attachments: HDFS-17552.patch
>
>
> h3. Description
> In the implementation of asynchronous Ipc.client, the main methods used
> include HADOOP-13226, HDFS-10224, etc.
> However, the existing implementation does not support `CompletableFuture`;
> instead, it relies on setting up callbacks, which can lead to the "callback
> hell" problem. Using `CompletableFuture` can better organize asynchronous
> callbacks. Therefore, on the basis of the existing implementation, by using
> `CompletableFuture`, once the `client.call` is completed, the asynchronous
> thread handles the response of this call without blocking the main thread.
>
> *Test*
> new UT TestAsyncIPC#testAsyncCallWithCompletableFuture()
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]