This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 08475e2 IGNITE-13595 Java thin client: Improve request processing
latency - Fixes #8368.
08475e2 is described below
commit 08475e237cc12590840a42504082976f9c0a1fe3
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Mon Oct 19 20:30:57 2020 +0300
IGNITE-13595 Java thin client: Improve request processing latency - Fixes
#8368.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../internal/client/thin/TcpClientChannel.java | 47 +++++++++-------------
1 file changed, 18 insertions(+), 29 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 2f65073..4f3ee40 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -225,9 +225,9 @@ class TcpClientChannel implements ClientChannel {
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException {
- long id = send(op, payloadWriter);
+ ClientRequestFuture fut = send(op, payloadWriter);
- return receive(id, payloadReader);
+ return receive(fut, payloadReader);
}
/** {@inheritDoc} */
@@ -237,9 +237,9 @@ class TcpClientChannel implements ClientChannel {
Function<PayloadInputChannel, T> payloadReader
) {
try {
- long id = send(op, payloadWriter);
+ ClientRequestFuture fut = send(op, payloadWriter);
- return receiveAsync(id, payloadReader);
+ return receiveAsync(fut, payloadReader);
} catch (Throwable t) {
CompletableFuture<T> fut = new CompletableFuture<>();
fut.completeExceptionally(t);
@@ -251,9 +251,9 @@ class TcpClientChannel implements ClientChannel {
/**
* @param op Operation.
* @param payloadWriter Payload writer to stream or {@code null} if
request has no payload.
- * @return Request ID.
+ * @return Request future.
*/
- private long send(ClientOperation op, Consumer<PayloadOutputChannel>
payloadWriter)
+ private ClientRequestFuture send(ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter)
throws ClientException {
long id = reqId.getAndIncrement();
@@ -266,7 +266,9 @@ class TcpClientChannel implements ClientChannel {
initReceiverThread(); // Start the receiver thread with the first
request.
- pendingReqs.put(id, new ClientRequestFuture());
+ ClientRequestFuture fut = new ClientRequestFuture();
+
+ pendingReqs.put(id, fut);
BinaryOutputStream req = payloadCh.out();
@@ -280,6 +282,8 @@ class TcpClientChannel implements ClientChannel {
req.writeInt(0, req.position() - 4); // Actual size.
write(req.array(), req.position());
+
+ return fut;
}
catch (Throwable t) {
pendingReqs.remove(id);
@@ -289,21 +293,15 @@ class TcpClientChannel implements ClientChannel {
finally {
sndLock.unlock();
}
-
- return id;
}
/**
- * @param reqId ID of the request to receive the response for.
+ * @param pendingReq Request future.
* @param payloadReader Payload reader from stream.
* @return Received operation payload or {@code null} if response has no
payload.
*/
- private <T> T receive(long reqId, Function<PayloadInputChannel, T>
payloadReader)
+ private <T> T receive(ClientRequestFuture pendingReq,
Function<PayloadInputChannel, T> payloadReader)
throws ClientException {
- ClientRequestFuture pendingReq = pendingReqs.get(reqId);
-
- assert pendingReq != null : "Pending request future not found for
request " + reqId;
-
try {
byte[] payload = pendingReq.get();
@@ -315,39 +313,30 @@ class TcpClientChannel implements ClientChannel {
catch (IgniteCheckedException e) {
throw convertException(e);
}
- finally {
- pendingReqs.remove(reqId);
- }
}
/**
* Receives the response asynchronously.
*
- * @param reqId ID of the request to receive the response for.
+ * @param pendingReq Request future.
* @param payloadReader Payload reader from stream.
* @return Future for the operation.
*/
- private <T> CompletableFuture<T> receiveAsync(long reqId,
Function<PayloadInputChannel, T> payloadReader) {
- ClientRequestFuture pendingReq = pendingReqs.get(reqId);
-
- assert pendingReq != null : "Pending request future not found for
request " + reqId;
-
+ private <T> CompletableFuture<T> receiveAsync(ClientRequestFuture
pendingReq, Function<PayloadInputChannel, T> payloadReader) {
CompletableFuture<T> fut = new CompletableFuture<>();
pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(()
-> {
try {
byte[] payload = payloadFut.get();
- if (payload == null || payloadReader == null) {
+ if (payload == null || payloadReader == null)
fut.complete(null);
- } else {
+ else {
T res = payloadReader.apply(new PayloadInputChannel(this,
payload));
fut.complete(res);
}
} catch (Throwable t) {
fut.completeExceptionally(convertException(t));
- } finally {
- pendingReqs.remove(reqId);
}
}));
@@ -483,7 +472,7 @@ class TcpClientChannel implements ClientChannel {
}
if (notificationOp == null) { // Respone received.
- ClientRequestFuture pendingReq = pendingReqs.get(resId);
+ ClientRequestFuture pendingReq = pendingReqs.remove(resId);
if (pendingReq == null)
throw new ClientProtocolError(String.format("Unexpected
response ID [%s]", resId));