This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 08475e2  IGNITE-13595 Java thin client: Improve request processing 
latency - Fixes #8368.
08475e2 is described below

commit 08475e237cc12590840a42504082976f9c0a1fe3
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Mon Oct 19 20:30:57 2020 +0300

    IGNITE-13595 Java thin client: Improve request processing latency - Fixes 
#8368.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../internal/client/thin/TcpClientChannel.java     | 47 +++++++++-------------
 1 file changed, 18 insertions(+), 29 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 2f65073..4f3ee40 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -225,9 +225,9 @@ class TcpClientChannel implements ClientChannel {
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException {
-        long id = send(op, payloadWriter);
+        ClientRequestFuture fut = send(op, payloadWriter);
 
-        return receive(id, payloadReader);
+        return receive(fut, payloadReader);
     }
 
     /** {@inheritDoc} */
@@ -237,9 +237,9 @@ class TcpClientChannel implements ClientChannel {
             Function<PayloadInputChannel, T> payloadReader
     ) {
         try {
-            long id = send(op, payloadWriter);
+            ClientRequestFuture fut = send(op, payloadWriter);
 
-            return receiveAsync(id, payloadReader);
+            return receiveAsync(fut, payloadReader);
         } catch (Throwable t) {
             CompletableFuture<T> fut = new CompletableFuture<>();
             fut.completeExceptionally(t);
@@ -251,9 +251,9 @@ class TcpClientChannel implements ClientChannel {
     /**
      * @param op Operation.
      * @param payloadWriter Payload writer to stream or {@code null} if 
request has no payload.
-     * @return Request ID.
+     * @return Request future.
      */
-    private long send(ClientOperation op, Consumer<PayloadOutputChannel> 
payloadWriter)
+    private ClientRequestFuture send(ClientOperation op, 
Consumer<PayloadOutputChannel> payloadWriter)
         throws ClientException {
         long id = reqId.getAndIncrement();
 
@@ -266,7 +266,9 @@ class TcpClientChannel implements ClientChannel {
 
             initReceiverThread(); // Start the receiver thread with the first 
request.
 
-            pendingReqs.put(id, new ClientRequestFuture());
+            ClientRequestFuture fut = new ClientRequestFuture();
+
+            pendingReqs.put(id, fut);
 
             BinaryOutputStream req = payloadCh.out();
 
@@ -280,6 +282,8 @@ class TcpClientChannel implements ClientChannel {
             req.writeInt(0, req.position() - 4); // Actual size.
 
             write(req.array(), req.position());
+
+            return fut;
         }
         catch (Throwable t) {
             pendingReqs.remove(id);
@@ -289,21 +293,15 @@ class TcpClientChannel implements ClientChannel {
         finally {
             sndLock.unlock();
         }
-
-        return id;
     }
 
     /**
-     * @param reqId ID of the request to receive the response for.
+     * @param pendingReq Request future.
      * @param payloadReader Payload reader from stream.
      * @return Received operation payload or {@code null} if response has no 
payload.
      */
-    private <T> T receive(long reqId, Function<PayloadInputChannel, T> 
payloadReader)
+    private <T> T receive(ClientRequestFuture pendingReq, 
Function<PayloadInputChannel, T> payloadReader)
         throws ClientException {
-        ClientRequestFuture pendingReq = pendingReqs.get(reqId);
-
-        assert pendingReq != null : "Pending request future not found for 
request " + reqId;
-
         try {
             byte[] payload = pendingReq.get();
 
@@ -315,39 +313,30 @@ class TcpClientChannel implements ClientChannel {
         catch (IgniteCheckedException e) {
             throw convertException(e);
         }
-        finally {
-            pendingReqs.remove(reqId);
-        }
     }
 
     /**
      * Receives the response asynchronously.
      *
-     * @param reqId ID of the request to receive the response for.
+     * @param pendingReq Request future.
      * @param payloadReader Payload reader from stream.
      * @return Future for the operation.
      */
-    private <T> CompletableFuture<T> receiveAsync(long reqId, 
Function<PayloadInputChannel, T> payloadReader) {
-        ClientRequestFuture pendingReq = pendingReqs.get(reqId);
-
-        assert pendingReq != null : "Pending request future not found for 
request " + reqId;
-
+    private <T> CompletableFuture<T> receiveAsync(ClientRequestFuture 
pendingReq, Function<PayloadInputChannel, T> payloadReader) {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
         pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(() 
-> {
             try {
                 byte[] payload = payloadFut.get();
 
-                if (payload == null || payloadReader == null) {
+                if (payload == null || payloadReader == null)
                     fut.complete(null);
-                } else {
+                else {
                     T res = payloadReader.apply(new PayloadInputChannel(this, 
payload));
                     fut.complete(res);
                 }
             } catch (Throwable t) {
                 fut.completeExceptionally(convertException(t));
-            } finally {
-                pendingReqs.remove(reqId);
             }
         }));
 
@@ -483,7 +472,7 @@ class TcpClientChannel implements ClientChannel {
         }
 
         if (notificationOp == null) { // Respone received.
-            ClientRequestFuture pendingReq = pendingReqs.get(resId);
+            ClientRequestFuture pendingReq = pendingReqs.remove(resId);
 
             if (pendingReq == null)
                 throw new ClientProtocolError(String.format("Unexpected 
response ID [%s]", resId));

Reply via email to