This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ecbe35e102 IGNITE-20869 Got rid of using CompletableFuture#orTimeout 
method on operations hot path (#4261)
ecbe35e102 is described below

commit ecbe35e102fad3cdd7608aa8f0c3845862c0ed46
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Aug 27 16:45:08 2024 +0300

    IGNITE-20869 Got rid of using CompletableFuture#orTimeout method on 
operations hot path (#4261)
---
 .../ignite/internal/client/TcpClientChannel.java   |  68 +++++--
 .../org/apache/ignite/client/ConnectionTest.java   |   2 +
 .../internal/future/timeout/TimeoutObject.java     |  40 ++++
 .../internal/future/timeout/TimeoutWorker.java     | 109 +++++++++++
 .../internal/network/DefaultMessagingService.java  |  65 ++++++-
 .../raft/jraft/rpc/impl/IgniteRpcClient.java       |  29 +--
 .../internal/benchmark/FutureTimeoutBenchmark.java | 208 +++++++++++++++++++++
 7 files changed, 483 insertions(+), 38 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 0901fa94e4..5ffba47985 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.client;
 
 import static 
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
 
@@ -26,11 +28,13 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import java.net.InetSocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
@@ -52,7 +56,10 @@ import 
org.apache.ignite.internal.client.proto.ErrorExtensions;
 import org.apache.ignite.internal.client.proto.HandshakeExtension;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ResponseFlags;
+import org.apache.ignite.internal.future.timeout.TimeoutObject;
+import org.apache.ignite.internal.future.timeout.TimeoutWorker;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.IgniteThread;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.ViewUtils;
 import org.apache.ignite.lang.ErrorGroups.Table;
@@ -87,7 +94,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     private final AtomicLong reqId = new AtomicLong(1);
 
     /** Pending requests. */
-    private final Map<Long, ClientRequestFuture<?>> pendingReqs = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, ClientRequestFuture<?>> pendingReqs = 
new ConcurrentHashMap<>();
 
     /** Notification handlers. */
     private final Map<Long, CompletableFuture<PayloadInputChannel>> 
notificationHandlers = new ConcurrentHashMap<>();
@@ -104,6 +111,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** Executor for async operation listeners. */
     private final Executor asyncContinuationExecutor;
 
+    /** Timeout worker. */
+    private final TimeoutWorker timeoutWorker;
+
     /** Connect timeout in milliseconds. */
     private final long connectTimeout;
 
@@ -141,6 +151,16 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         log = ClientUtils.logger(cfg.clientConfiguration(), 
TcpClientChannel.class);
 
+        this.timeoutWorker = new TimeoutWorker(
+                log,
+                cfg.getAddress().getHostString() + cfg.getAddress().getPort(),
+                "TcpClientChannel-timeout-worker",
+                pendingReqs,
+                // Client-facing future will fail with a timeout, but internal 
ClientRequestFuture will stay in the map -
+                // otherwise we'll fail with "protocol breakdown" error when a 
late response arrives from the server.
+                false
+        );
+
         asyncContinuationExecutor = 
cfg.clientConfiguration().asyncContinuationExecutor() == null
                 ? ForkJoinPool.commonPool()
                 : cfg.clientConfiguration().asyncContinuationExecutor();
@@ -158,6 +178,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                         log.debug("Connection established [remoteAddress=" + 
s.remoteAddress() + ']');
                     }
 
+                    // TODO: IGNITE-23076 Start single timeout worker thread 
for several client in one JVM.
+                    new IgniteThread(timeoutWorker).start();
+
                     sock = s;
 
                     return handshakeAsync(DEFAULT_VERSION);
@@ -238,6 +261,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                     // Ignore.
                 }
             }
+
+            awaitForWorkersStop(List.of(timeoutWorker), true, log);
         }
     }
 
@@ -287,13 +312,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 notificationHandlers.put(id, notificationFut);
             }
 
-            ClientRequestFuture<T> fut = send(opCode, id, payloadWriter, 
payloadReader, notificationFut);
+            ClientRequestFuture<T> fut = send(opCode, id, payloadWriter, 
payloadReader, notificationFut, operationTimeout);
 
-            // Client-facing future will fail with a timeout, but internal 
ClientRequestFuture will stay in the map - otherwise
-            // we'll fail with "protocol breakdown" error when a late response 
arrives from the server.
-            return operationTimeout <= 0
-                    ? fut
-                    : fut.orTimeout(operationTimeout, TimeUnit.MILLISECONDS);
+            return fut;
 
         } catch (Throwable t) {
             return CompletableFuture.failedFuture(t);
@@ -314,12 +335,15 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             long id,
             @Nullable PayloadWriter payloadWriter,
             @Nullable PayloadReader<T> payloadReader,
-            @Nullable CompletableFuture<PayloadInputChannel> notificationFut) {
+            @Nullable CompletableFuture<PayloadInputChannel> notificationFut,
+            long timeout
+    ) {
         if (closed()) {
             throw new IgniteClientConnectionException(CONNECTION_ERR, "Channel 
is closed", endpoint());
         }
 
-        ClientRequestFuture<T> fut = new ClientRequestFuture<>(payloadReader, 
notificationFut);
+        ClientRequestFuture<T> fut = new ClientRequestFuture<>(payloadReader, 
notificationFut, timeout);
+
         pendingReqs.put(id, fut);
 
         metrics.requestsActiveIncrement();
@@ -413,6 +437,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         }
 
         ClientRequestFuture<?> pendingReq = pendingReqs.remove(resId);
+
         if (pendingReq == null) {
             log.error("Unexpected response ID [remoteAddress=" + 
cfg.getAddress() + "]: " + resId);
 
@@ -555,7 +580,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** Client handshake. */
     private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver)
             throws IgniteClientConnectionException {
-        ClientRequestFuture<Object> fut = new ClientRequestFuture<>(r -> 
handshakeRes(r.in()), null);
+        ClientRequestFuture<Object> fut = new ClientRequestFuture<>(r -> 
handshakeRes(r.in()), null, connectTimeout);
         pendingReqs.put(-1L, fut);
 
         handshakeReqAsync(ver).addListener(f -> {
@@ -565,10 +590,6 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             }
         });
 
-        if (connectTimeout > 0) {
-            fut.orTimeout(connectTimeout, TimeUnit.MILLISECONDS);
-        }
-
         return fut
                 .handle((res, err) -> {
                     if (err != null) {
@@ -737,18 +758,33 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /**
      * Client request future.
      */
-    private static class ClientRequestFuture<T> extends CompletableFuture<T> {
+    private static class ClientRequestFuture<T> extends CompletableFuture<T> 
implements TimeoutObject<CompletableFuture<T>> {
         @Nullable
         private final PayloadReader<T> payloadReader;
 
         @Nullable
         private final CompletableFuture<PayloadInputChannel> notificationFut;
 
+        private final long endTime;
+
         private ClientRequestFuture(
                 @Nullable PayloadReader<T> payloadReader,
-                @Nullable CompletableFuture<PayloadInputChannel> 
notificationFut) {
+                @Nullable CompletableFuture<PayloadInputChannel> 
notificationFut,
+                long timeout
+        ) {
             this.payloadReader = payloadReader;
             this.notificationFut = notificationFut;
+            this.endTime = timeout > 0 ? coarseCurrentTimeMillis() + timeout : 
0;
+        }
+
+        @Override
+        public long endTime() {
+            return endTime;
+        }
+
+        @Override
+        public CompletableFuture<T> future() {
+            return this;
         }
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index 3e4795819b..e129bb9104 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -34,6 +34,7 @@ import java.util.function.Function;
 import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.lang.IgniteException;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -109,6 +110,7 @@ public class ConnectionTest extends AbstractClientTest {
 
     @SuppressWarnings("ThrowableNotThrown")
     @Test
+    @WithSystemProperty(key = "IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", value = 
"10")
     public void 
testNoResponseFromServerWithinOperationTimeoutThrowsException() {
         Function<Integer, Integer> responseDelay = x -> x > 2 ? 100 : 0;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutObject.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutObject.java
new file mode 100644
index 0000000000..18bcda42bf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future.timeout;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Timeout object interface.
+ * It is used to limit a time to wait on the future compilation.
+ */
+public interface TimeoutObject<T extends CompletableFuture<?>> {
+    /**
+     * Gets end timestamp.
+     *
+     * @return End timestamp in milliseconds.
+     */
+    long endTime();
+
+    /**
+     * Gets a target future.
+     *
+     * @return A future.
+     */
+    T future();
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
new file mode 100644
index 0000000000..e69dca0834
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future.timeout;
+
+import static org.apache.ignite.internal.lang.IgniteSystemProperties.getLong;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+
+/**
+ * Timeout object worker.
+ */
+public class TimeoutWorker extends IgniteWorker {
+    /** Worker sleep interval. */
+    private final long sleepInterval = 
getLong("IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", 500);
+
+    /** Active operations. */
+    public final ConcurrentMap<Long, TimeoutObject<?>> requestsMap;
+
+    /** True means removing object from the operation map on timeout. */
+    private final boolean removeOnTimeout;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance this runnable is 
used in.
+     * @param name Worker name. Note that in general thread name and worker 
(runnable) name are two different things. The same
+     *         worker can be executed by multiple threads and therefore for 
logging and debugging purposes we separate the two.
+     * @param requestsMap Active operations.
+     * @param removeOnTimeout Remove operation from map.
+     */
+    public TimeoutWorker(
+            IgniteLogger log,
+            String igniteInstanceName,
+            String name,
+            ConcurrentMap requestsMap,
+            boolean removeOnTimeout
+    ) {
+        super(log, igniteInstanceName, name, null);
+
+        this.requestsMap = requestsMap;
+        this.removeOnTimeout = removeOnTimeout;
+    }
+
+    @Override
+    protected void body() {
+        try {
+            TimeoutObject<?> timeoutObject;
+
+            while (!isCancelled()) {
+                long now = coarseCurrentTimeMillis();
+
+                for (Entry<Long, TimeoutObject<?>> entry : 
requestsMap.entrySet()) {
+                    updateHeartbeat();
+
+                    timeoutObject = entry.getValue();
+
+                    assert timeoutObject != null : "Unexpected null in timeout 
operation map.";
+
+                    if (timeoutObject.endTime() > 0 && now > 
timeoutObject.endTime()) {
+                        CompletableFuture<?> fut = timeoutObject.future();
+
+                        if (!fut.isDone()) {
+                            fut.completeExceptionally(new TimeoutException());
+
+                            if (removeOnTimeout) {
+                                requestsMap.remove(entry.getKey(), 
timeoutObject);
+                            }
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(sleepInterval);
+                } catch (InterruptedException e) {
+                    log.info("The timeout worker was interrupted, probably the 
client is stopping.");
+                }
+
+                updateHeartbeat();
+            }
+
+        } catch (Throwable t) {
+            // TODO: IGNITE-23075 Call FH here.
+            throw new IgniteInternalException(t);
+        }
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index d9dd8e9cfb..e3d4dfcfde 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -23,7 +23,10 @@ import static 
org.apache.ignite.internal.network.serialization.PerSessionSeriali
 import static 
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
 import static 
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
 import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
@@ -44,6 +47,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
+import org.apache.ignite.internal.future.timeout.TimeoutObject;
+import org.apache.ignite.internal.future.timeout.TimeoutWorker;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -58,9 +63,9 @@ import 
org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import 
org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
 import org.apache.ignite.internal.thread.ExecutorChooser;
+import org.apache.ignite.internal.thread.IgniteThread;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.StripedExecutor;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.worker.CriticalSingleThreadExecutor;
 import org.apache.ignite.internal.worker.CriticalStripedThreadPoolExecutor;
 import org.apache.ignite.internal.worker.CriticalWorker;
@@ -101,7 +106,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     private volatile ConnectionManager connectionManager;
 
     /** Collection that maps correlation id to the future for an invocation 
request. */
-    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, TimeoutObjectImpl> requestsMap = new 
ConcurrentHashMap<>();
 
     /** Correlation id generator. */
     private final AtomicLong correlationIdGenerator = new AtomicLong();
@@ -112,6 +117,9 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     /** Executors for inbound messages. */
     private final LazyStripedExecutors inboundExecutors;
 
+    /** Network timeout worker thread. */
+    private final TimeoutWorker timeoutWorker;
+
     // TODO: IGNITE-18493 - remove/move this
     @Nullable
     private volatile BiPredicate<String, NetworkMessage> dropMessagesPredicate;
@@ -153,7 +161,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         outboundExecutor = new CriticalSingleThreadExecutor(
                 IgniteThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
         );
+
         inboundExecutors = new CriticalLazyStripedExecutors(nodeName, 
"MessagingService-inbound", criticalWorkerRegistry);
+
+        timeoutWorker = new TimeoutWorker(LOG, nodeName, 
"MessagingService-timeout-worker", requestsMap, true);
     }
 
     /**
@@ -287,10 +298,9 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         long correlationId = createCorrelationId();
 
-        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<NetworkMessage>()
-                .orTimeout(timeout, TimeUnit.MILLISECONDS);
+        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<>();
 
-        requestsMap.put(correlationId, responseFuture);
+        requestsMap.put(correlationId, new TimeoutObjectImpl(timeout > 0 ? 
coarseCurrentTimeMillis() + timeout : 0, responseFuture));
 
         InetSocketAddress recipientAddress = 
resolveRecipientAddress(recipient);
 
@@ -560,9 +570,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      * @param correlationId Request's correlation id.
      */
     private void onInvokeResponse(NetworkMessage response, Long correlationId) 
{
-        CompletableFuture<NetworkMessage> responseFuture = 
requestsMap.remove(correlationId);
+        TimeoutObjectImpl responseFuture = requestsMap.remove(correlationId);
+
         if (responseFuture != null) {
-            responseFuture.complete(response);
+            responseFuture.future().complete(response);
         }
     }
 
@@ -601,6 +612,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      * Starts the service.
      */
     public void start() {
+        new IgniteThread(timeoutWorker).start();
+
         criticalWorkerRegistry.register(outboundExecutor);
 
         topologyService.addEventHandler(new TopologyEventHandler() {
@@ -617,7 +630,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     public void stop() {
         var exception = new NodeStoppingException();
 
-        requestsMap.values().forEach(fut -> 
fut.completeExceptionally(exception));
+        requestsMap.values().forEach(fut -> 
fut.future().completeExceptionally(exception));
 
         requestsMap.clear();
 
@@ -626,7 +639,9 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         recipientInetAddrByNodeId.clear();
 
         inboundExecutors.close();
-        IgniteUtils.shutdownAndAwaitTermination(outboundExecutor, 10, 
TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(outboundExecutor, 10, TimeUnit.SECONDS);
+
+        awaitForWorkersStop(List.of(timeoutWorker), true, LOG);
     }
 
     private static int stripeCountForIndex(int executorIndex) {
@@ -711,6 +726,38 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         }
     }
 
+    /**
+     * Timeout object wrapper for the completable future.
+     */
+    private static class TimeoutObjectImpl implements 
TimeoutObject<CompletableFuture<NetworkMessage>> {
+        /** End time (milliseconds since Unix epoch). */
+        private final long endTime;
+
+        /** Target future. */
+        private final CompletableFuture<NetworkMessage> fut;
+
+        /**
+         * Constructor.
+         *
+         * @param endTime End timestamp in milliseconds.
+         * @param fut Target future.
+         */
+        public TimeoutObjectImpl(long endTime, 
CompletableFuture<NetworkMessage> fut) {
+            this.endTime = endTime;
+            this.fut = fut;
+        }
+
+        @Override
+        public long endTime() {
+            return endTime;
+        }
+
+        @Override
+        public CompletableFuture<NetworkMessage> future() {
+            return fut;
+        }
+    }
+
     /**
      * Returns the resolved address of the target node, {@code null} if the 
target node is the current node.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index 9ef1e6ba0d..6f3d821f0e 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -87,23 +87,22 @@ public class IgniteRpcClient implements RpcClientEx {
     ) {
         CompletableFuture<Message> fut = new CompletableFuture<>();
 
-        fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS).
-            whenComplete((res, err) -> {
-                assert !(res == null && err == null) : res + " " + err;
+        fut.whenComplete((res, err) -> {
+            assert !(res == null && err == null) : res + " " + err;
 
-                if (err == null && recordPred != null && recordPred.test(res, 
this.toString()))
-                    recordedMsgs.add(new Object[] {res, this.toString(), 
fut.hashCode(), System.currentTimeMillis(), null});
+            if (err == null && recordPred != null && recordPred.test(res, 
this.toString()))
+                recordedMsgs.add(new Object[] {res, this.toString(), 
fut.hashCode(), System.currentTimeMillis(), null});
 
-                if (err instanceof ExecutionException)
-                    err = new RemotingException(err);
-                else if (err instanceof TimeoutException) // Translate timeout 
exception.
-                    err = new InvokeTimeoutException();
+            if (err instanceof ExecutionException)
+                err = new RemotingException(err);
+            else if (err instanceof TimeoutException) // Translate timeout 
exception.
+                err = new InvokeTimeoutException();
 
-                Throwable finalErr = err;
+            Throwable finalErr = err;
 
-                // Avoid deadlocks if a closure has completed in the same 
thread.
-                Utils.runInThread(callback.executor(), () -> 
callback.complete(res, finalErr));
-            });
+            // Avoid deadlocks if a closure has completed in the same thread.
+            Utils.runInThread(callback.executor(), () -> 
callback.complete(res, finalErr));
+        });
 
         // Future hashcode used as corellation id.
         if (recordPred != null && recordPred.test(request, peerId.toString()))
@@ -121,6 +120,10 @@ public class IgniteRpcClient implements RpcClientEx {
 
                 blockedMsgs.add(msgData);
 
+                if (timeoutMs > 0) {
+                    fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS);
+                }
+
                 LOG.info("Blocked message to={} id={} msg={}", 
peerId.toString(), msgData[2], S.toString(request));
 
                 return fut;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
new file mode 100644
index 0000000000..3b6a795bb7
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.future.timeout.TimeoutObject;
+import org.apache.ignite.internal.future.timeout.TimeoutWorker;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Future timeout benchmark - measures the latency of the assignment of the 
future timeout in two ways:
+ * 1. Based on the embedded CompletableFuture#orTimeout.
+ * 2. Based on the additional thread that is scanning collection and 
completing all the futures already have been explored.
+ *
+ * <p>Results on 11th Gen Intel® Core™ i7-1165G7 @ 2.80GHz, openjdk 11.0.24, 
Windows 10 Pro:
+ * Benchmark                    (useFutureEmbeddedTimeout)  Mode  Cnt   Score  
  Error  Units
+ * FutureTimeoutBenchmark.test                       false  avgt   20   0,760 
±  0,002  us/op
+ * FutureTimeoutBenchmark.test                        true  avgt   20  36,123 
± 44,414  us/op
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+public class FutureTimeoutBenchmark {
+    private static AtomicLong ID_GEN = new AtomicLong();
+
+    /** Active operations. */
+    public ConcurrentMap<Long, TimeoutObjectImpl> requestsMap;
+
+    private TimeoutWorker timeoutWorker;
+
+    private ConcurrentMap<Long, CompletableFuture<?>> futs;
+
+    @Param({"false", "true"})
+    private boolean useFutureEmbeddedTimeout;
+
+    /**
+     * Prepare to start the benchmark.
+     */
+    @Setup
+    public void setUp() {
+        if (useFutureEmbeddedTimeout) {
+            futs = new ConcurrentHashMap<>();
+        } else {
+            requestsMap = new ConcurrentHashMap<>();
+            this.timeoutWorker = new TimeoutWorker(
+                    Loggers.forClass(FutureTimeoutBenchmark.class),
+                    "test-node",
+                    "FutureTimeoutBenchmark-timeout-worker",
+                    requestsMap,
+                    // Client-facing future will fail with a timeout, but 
internal ClientRequestFuture will stay in the map -
+                    // otherwise we'll fail with "protocol breakdown" error 
when a late response arrives from the server.
+                    true
+            );
+
+
+            new IgniteThread(timeoutWorker).start();
+        }
+    }
+
+    /**
+     * Closes resources.
+     */
+    @TearDown
+    public void tearDown() throws InterruptedException {
+        if (useFutureEmbeddedTimeout) {
+            for (CompletableFuture<?> fut : futs.values()) {
+                if (!fut.isDone()) {
+                    try {
+                        fut.get(10, TimeUnit.SECONDS);
+                    } catch (ExecutionException e) {
+                        assert e.getCause() instanceof TimeoutException : 
"Unexpected exception type: " + e.getCause().getClass();
+                    } catch (TimeoutException e) {
+                        // Ignore exception.
+                        break;
+                    }
+                }
+            }
+
+            futs.clear();
+            futs = null;
+        } else {
+            assert waitForCondition(requestsMap::isEmpty, 10_000);
+
+            awaitForWorkersStop(List.of(timeoutWorker), true, null);
+        }
+    }
+
+    /**
+     * Target method to benchmark.
+     */
+    @Benchmark
+    public void test() {
+        if (useFutureEmbeddedTimeout) {
+            for (int i = 0; i < 10; i++) {
+                var fut = new CompletableFuture<Void>();
+
+                futs.put(ID_GEN.incrementAndGet(), fut);
+
+                fut.orTimeout(10, TimeUnit.MILLISECONDS);
+            }
+
+            if (futs.size() > 100_000) {
+                futs.clear();
+            }
+        } else {
+            for (int i = 0; i < 10; i++) {
+                requestsMap.put(ID_GEN.incrementAndGet(), new 
TimeoutObjectImpl(
+                        System.currentTimeMillis() + 10,
+                        new CompletableFuture()
+                ));
+            }
+
+            if (requestsMap.size() > 100_000) {
+                requestsMap.clear();
+            }
+        }
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + FutureTimeoutBenchmark.class.getSimpleName() + 
".*")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    /**
+     * Timeout object wrapper for the completable future.
+     */
+    private static class TimeoutObjectImpl implements 
TimeoutObject<CompletableFuture<Void>> {
+        /** End time (milliseconds since Unix epoch). */
+        private final long endTime;
+
+        /** Target future. */
+        private final CompletableFuture<Void> fut;
+
+        /**
+         * Constructor.
+         *
+         * @param endTime End timestamp in milliseconds.
+         * @param fut Target future.
+         */
+        public TimeoutObjectImpl(long endTime, CompletableFuture<Void> fut) {
+            this.endTime = endTime;
+            this.fut = fut;
+        }
+
+        @Override
+        public long endTime() {
+            return endTime;
+        }
+
+        @Override
+        public CompletableFuture<Void> future() {
+            return fut;
+        }
+    }
+}


Reply via email to