szetszwo commented on code in PR #6888:
URL: https://github.com/apache/hadoop/pull/6888#discussion_r1686435548
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -283,6 +326,7 @@ static class Call {
boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;
+ private CompletableFuture<Object> completableFuture;
Review Comment:
This change is a good first step!
Ideally, the `completableFuture` field should replace the `done`,
`rpcResponse` and `error` fields. I can see that the replacement of `error`
may not be easy. Let's replace `done` and `rpcResponse` ?
```java
@@ -277,10 +320,9 @@ static class Call {
final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
- Writable rpcResponse; // null if rpc has error
+ private final CompletableFuture<Writable> rpcResponseFuture = new
CompletableFuture<>();
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
- boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;
@@ -313,9 +355,8 @@ public String toString() {
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
- protected synchronized void callComplete() {
- this.done = true;
- notify(); // notify caller
+ protected synchronized void callComplete(Writable rpcResponse) {
+ rpcResponseFuture.complete(rpcResponse);
if (externalHandler != null) {
synchronized (externalHandler) {
@@ -340,7 +381,7 @@ public synchronized void
setAlignmentContext(AlignmentContext ac) {
*/
public synchronized void setException(IOException error) {
this.error = error;
- callComplete();
+ callComplete(null);
}
/** Set the return value when there is no error.
@@ -349,8 +390,7 @@ public synchronized void setException(IOException error)
{
* @param rpcResponse return value of the rpc call.
*/
public synchronized void setRpcResponse(Writable rpcResponse) {
- this.rpcResponse = rpcResponse;
- callComplete();
+ callComplete(rpcResponse);
}
public synchronized Writable getRpcResponse() {
@@ -1495,39 +1535,19 @@ Writable call(RPC.RpcKind rpcKind, Writable
rpcRequest,
}
if (isAsynchronousMode()) {
- final AsyncGet<Writable, IOException> asyncGet
- = new AsyncGet<Writable, IOException>() {
- @Override
- public Writable get(long timeout, TimeUnit unit)
- throws IOException, TimeoutException{
- boolean done = true;
- try {
- final Writable w = getRpcResponse(call, connection, timeout,
unit);
- if (w == null) {
- done = false;
- throw new TimeoutException(call + " timed out "
- + timeout + " " + unit);
- }
- return w;
- } finally {
- if (done) {
- releaseAsyncCall();
- }
- }
- }
-
- @Override
- public boolean isDone() {
- synchronized (call) {
- return call.done;
- }
+ CompletableFuture<Writable> result =
call.rpcResponseFuture.thenApply(o -> {
+ try {
+ return getRpcResponse(call, connection);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ } finally {
+ releaseAsyncCall();
}
- };
-
- ASYNC_RPC_RESPONSE.set(asyncGet);
+ });
+ ASYNC_RPC_RESPONSE.set(result);
return null;
} else {
- return getRpcResponse(call, connection, -1, null);
+ return getRpcResponse(call, connection);
}
}
@@ -1564,19 +1584,17 @@ int getAsyncCallCount() {
}
/** @return the rpc response or, in case of timeout, null. */
- private Writable getRpcResponse(final Call call, final Connection
connection,
- final long timeout, final TimeUnit unit) throws IOException {
+ private Writable getRpcResponse(final Call call, final Connection
connection) throws IOException {
synchronized (call) {
- while (!call.done) {
- try {
- AsyncGet.Util.wait(call, timeout, unit);
- if (timeout >= 0 && !call.done) {
- return null;
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException("Call interrupted");
- }
+ final Writable response;
+ try {
+ response = call.rpcResponseFuture.get();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Call interrupted");
+ } catch (ExecutionException e) {
+ // currently, it never has ExecutionException
+ throw new IllegalStateException(e);
}
if (call.error != null) {
@@ -1593,7 +1611,7 @@ private Writable getRpcResponse(final Call call, final
Connection connection,
call.error);
}
} else {
- return call.getRpcResponse();
+ return response;
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]