This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ecbe35e102 IGNITE-20869 Got rid of using CompletableFuture#orTimeout
method on operations hot path (#4261)
ecbe35e102 is described below
commit ecbe35e102fad3cdd7608aa8f0c3845862c0ed46
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Aug 27 16:45:08 2024 +0300
IGNITE-20869 Got rid of using CompletableFuture#orTimeout method on
operations hot path (#4261)
---
.../ignite/internal/client/TcpClientChannel.java | 68 +++++--
.../org/apache/ignite/client/ConnectionTest.java | 2 +
.../internal/future/timeout/TimeoutObject.java | 40 ++++
.../internal/future/timeout/TimeoutWorker.java | 109 +++++++++++
.../internal/network/DefaultMessagingService.java | 65 ++++++-
.../raft/jraft/rpc/impl/IgniteRpcClient.java | 29 +--
.../internal/benchmark/FutureTimeoutBenchmark.java | 208 +++++++++++++++++++++
7 files changed, 483 insertions(+), 38 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 0901fa94e4..5ffba47985 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.client;
import static
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
@@ -26,11 +28,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
+import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@@ -52,7 +56,10 @@ import
org.apache.ignite.internal.client.proto.ErrorExtensions;
import org.apache.ignite.internal.client.proto.HandshakeExtension;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
+import org.apache.ignite.internal.future.timeout.TimeoutObject;
+import org.apache.ignite.internal.future.timeout.TimeoutWorker;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ViewUtils;
import org.apache.ignite.lang.ErrorGroups.Table;
@@ -87,7 +94,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
private final AtomicLong reqId = new AtomicLong(1);
/** Pending requests. */
- private final Map<Long, ClientRequestFuture<?>> pendingReqs = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, ClientRequestFuture<?>> pendingReqs =
new ConcurrentHashMap<>();
/** Notification handlers. */
private final Map<Long, CompletableFuture<PayloadInputChannel>>
notificationHandlers = new ConcurrentHashMap<>();
@@ -104,6 +111,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** Executor for async operation listeners. */
private final Executor asyncContinuationExecutor;
+ /** Timeout worker. */
+ private final TimeoutWorker timeoutWorker;
+
/** Connect timeout in milliseconds. */
private final long connectTimeout;
@@ -141,6 +151,16 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
log = ClientUtils.logger(cfg.clientConfiguration(),
TcpClientChannel.class);
+ this.timeoutWorker = new TimeoutWorker(
+ log,
+ cfg.getAddress().getHostString() + cfg.getAddress().getPort(),
+ "TcpClientChannel-timeout-worker",
+ pendingReqs,
+ // Client-facing future will fail with a timeout, but internal
ClientRequestFuture will stay in the map -
+ // otherwise we'll fail with "protocol breakdown" error when a
late response arrives from the server.
+ false
+ );
+
asyncContinuationExecutor =
cfg.clientConfiguration().asyncContinuationExecutor() == null
? ForkJoinPool.commonPool()
: cfg.clientConfiguration().asyncContinuationExecutor();
@@ -158,6 +178,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
log.debug("Connection established [remoteAddress=" +
s.remoteAddress() + ']');
}
+ // TODO: IGNITE-23076 Start single timeout worker thread
for several client in one JVM.
+ new IgniteThread(timeoutWorker).start();
+
sock = s;
return handshakeAsync(DEFAULT_VERSION);
@@ -238,6 +261,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
// Ignore.
}
}
+
+ awaitForWorkersStop(List.of(timeoutWorker), true, log);
}
}
@@ -287,13 +312,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
notificationHandlers.put(id, notificationFut);
}
- ClientRequestFuture<T> fut = send(opCode, id, payloadWriter,
payloadReader, notificationFut);
+ ClientRequestFuture<T> fut = send(opCode, id, payloadWriter,
payloadReader, notificationFut, operationTimeout);
- // Client-facing future will fail with a timeout, but internal
ClientRequestFuture will stay in the map - otherwise
- // we'll fail with "protocol breakdown" error when a late response
arrives from the server.
- return operationTimeout <= 0
- ? fut
- : fut.orTimeout(operationTimeout, TimeUnit.MILLISECONDS);
+ return fut;
} catch (Throwable t) {
return CompletableFuture.failedFuture(t);
@@ -314,12 +335,15 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
long id,
@Nullable PayloadWriter payloadWriter,
@Nullable PayloadReader<T> payloadReader,
- @Nullable CompletableFuture<PayloadInputChannel> notificationFut) {
+ @Nullable CompletableFuture<PayloadInputChannel> notificationFut,
+ long timeout
+ ) {
if (closed()) {
throw new IgniteClientConnectionException(CONNECTION_ERR, "Channel
is closed", endpoint());
}
- ClientRequestFuture<T> fut = new ClientRequestFuture<>(payloadReader,
notificationFut);
+ ClientRequestFuture<T> fut = new ClientRequestFuture<>(payloadReader,
notificationFut, timeout);
+
pendingReqs.put(id, fut);
metrics.requestsActiveIncrement();
@@ -413,6 +437,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
ClientRequestFuture<?> pendingReq = pendingReqs.remove(resId);
+
if (pendingReq == null) {
log.error("Unexpected response ID [remoteAddress=" +
cfg.getAddress() + "]: " + resId);
@@ -555,7 +580,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** Client handshake. */
private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver)
throws IgniteClientConnectionException {
- ClientRequestFuture<Object> fut = new ClientRequestFuture<>(r ->
handshakeRes(r.in()), null);
+ ClientRequestFuture<Object> fut = new ClientRequestFuture<>(r ->
handshakeRes(r.in()), null, connectTimeout);
pendingReqs.put(-1L, fut);
handshakeReqAsync(ver).addListener(f -> {
@@ -565,10 +590,6 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
});
- if (connectTimeout > 0) {
- fut.orTimeout(connectTimeout, TimeUnit.MILLISECONDS);
- }
-
return fut
.handle((res, err) -> {
if (err != null) {
@@ -737,18 +758,33 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/**
* Client request future.
*/
- private static class ClientRequestFuture<T> extends CompletableFuture<T> {
+ private static class ClientRequestFuture<T> extends CompletableFuture<T>
implements TimeoutObject<CompletableFuture<T>> {
@Nullable
private final PayloadReader<T> payloadReader;
@Nullable
private final CompletableFuture<PayloadInputChannel> notificationFut;
+ private final long endTime;
+
private ClientRequestFuture(
@Nullable PayloadReader<T> payloadReader,
- @Nullable CompletableFuture<PayloadInputChannel>
notificationFut) {
+ @Nullable CompletableFuture<PayloadInputChannel>
notificationFut,
+ long timeout
+ ) {
this.payloadReader = payloadReader;
this.notificationFut = notificationFut;
+ this.endTime = timeout > 0 ? coarseCurrentTimeMillis() + timeout :
0;
+ }
+
+ @Override
+ public long endTime() {
+ return endTime;
+ }
+
+ @Override
+ public CompletableFuture<T> future() {
+ return this;
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index 3e4795819b..e129bb9104 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -34,6 +34,7 @@ import java.util.function.Function;
import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -109,6 +110,7 @@ public class ConnectionTest extends AbstractClientTest {
@SuppressWarnings("ThrowableNotThrown")
@Test
+ @WithSystemProperty(key = "IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", value =
"10")
public void
testNoResponseFromServerWithinOperationTimeoutThrowsException() {
Function<Integer, Integer> responseDelay = x -> x > 2 ? 100 : 0;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutObject.java
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutObject.java
new file mode 100644
index 0000000000..18bcda42bf
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future.timeout;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Timeout object interface.
+ * It is used to limit a time to wait on the future compilation.
+ */
+public interface TimeoutObject<T extends CompletableFuture<?>> {
+ /**
+ * Gets end timestamp.
+ *
+ * @return End timestamp in milliseconds.
+ */
+ long endTime();
+
+ /**
+ * Gets a target future.
+ *
+ * @return A future.
+ */
+ T future();
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
new file mode 100644
index 0000000000..e69dca0834
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future.timeout;
+
+import static org.apache.ignite.internal.lang.IgniteSystemProperties.getLong;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+
+/**
+ * Timeout object worker.
+ */
+public class TimeoutWorker extends IgniteWorker {
+ /** Worker sleep interval. */
+ private final long sleepInterval =
getLong("IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", 500);
+
+ /** Active operations. */
+ public final ConcurrentMap<Long, TimeoutObject<?>> requestsMap;
+
+ /** True means removing object from the operation map on timeout. */
+ private final boolean removeOnTimeout;
+
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ * @param igniteInstanceName Name of the Ignite instance this runnable is
used in.
+ * @param name Worker name. Note that in general thread name and worker
(runnable) name are two different things. The same
+ * worker can be executed by multiple threads and therefore for
logging and debugging purposes we separate the two.
+ * @param requestsMap Active operations.
+ * @param removeOnTimeout Remove operation from map.
+ */
+ public TimeoutWorker(
+ IgniteLogger log,
+ String igniteInstanceName,
+ String name,
+ ConcurrentMap requestsMap,
+ boolean removeOnTimeout
+ ) {
+ super(log, igniteInstanceName, name, null);
+
+ this.requestsMap = requestsMap;
+ this.removeOnTimeout = removeOnTimeout;
+ }
+
+ @Override
+ protected void body() {
+ try {
+ TimeoutObject<?> timeoutObject;
+
+ while (!isCancelled()) {
+ long now = coarseCurrentTimeMillis();
+
+ for (Entry<Long, TimeoutObject<?>> entry :
requestsMap.entrySet()) {
+ updateHeartbeat();
+
+ timeoutObject = entry.getValue();
+
+ assert timeoutObject != null : "Unexpected null in timeout
operation map.";
+
+ if (timeoutObject.endTime() > 0 && now >
timeoutObject.endTime()) {
+ CompletableFuture<?> fut = timeoutObject.future();
+
+ if (!fut.isDone()) {
+ fut.completeExceptionally(new TimeoutException());
+
+ if (removeOnTimeout) {
+ requestsMap.remove(entry.getKey(),
timeoutObject);
+ }
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (InterruptedException e) {
+ log.info("The timeout worker was interrupted, probably the
client is stopping.");
+ }
+
+ updateHeartbeat();
+ }
+
+ } catch (Throwable t) {
+ // TODO: IGNITE-23075 Call FH here.
+ throw new IgniteInternalException(t);
+ }
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index d9dd8e9cfb..e3d4dfcfde 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -23,7 +23,10 @@ import static
org.apache.ignite.internal.network.serialization.PerSessionSeriali
import static
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
@@ -44,6 +47,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Function;
+import org.apache.ignite.internal.future.timeout.TimeoutObject;
+import org.apache.ignite.internal.future.timeout.TimeoutWorker;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -58,9 +63,9 @@ import
org.apache.ignite.internal.network.recovery.StaleIdDetector;
import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import
org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
import org.apache.ignite.internal.thread.ExecutorChooser;
+import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.StripedExecutor;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.worker.CriticalSingleThreadExecutor;
import org.apache.ignite.internal.worker.CriticalStripedThreadPoolExecutor;
import org.apache.ignite.internal.worker.CriticalWorker;
@@ -101,7 +106,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
private volatile ConnectionManager connectionManager;
/** Collection that maps correlation id to the future for an invocation
request. */
- private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>>
requestsMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, TimeoutObjectImpl> requestsMap = new
ConcurrentHashMap<>();
/** Correlation id generator. */
private final AtomicLong correlationIdGenerator = new AtomicLong();
@@ -112,6 +117,9 @@ public class DefaultMessagingService extends
AbstractMessagingService {
/** Executors for inbound messages. */
private final LazyStripedExecutors inboundExecutors;
+ /** Network timeout worker thread. */
+ private final TimeoutWorker timeoutWorker;
+
// TODO: IGNITE-18493 - remove/move this
@Nullable
private volatile BiPredicate<String, NetworkMessage> dropMessagesPredicate;
@@ -153,7 +161,10 @@ public class DefaultMessagingService extends
AbstractMessagingService {
outboundExecutor = new CriticalSingleThreadExecutor(
IgniteThreadFactory.create(nodeName,
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
);
+
inboundExecutors = new CriticalLazyStripedExecutors(nodeName,
"MessagingService-inbound", criticalWorkerRegistry);
+
+ timeoutWorker = new TimeoutWorker(LOG, nodeName,
"MessagingService-timeout-worker", requestsMap, true);
}
/**
@@ -287,10 +298,9 @@ public class DefaultMessagingService extends
AbstractMessagingService {
long correlationId = createCorrelationId();
- CompletableFuture<NetworkMessage> responseFuture = new
CompletableFuture<NetworkMessage>()
- .orTimeout(timeout, TimeUnit.MILLISECONDS);
+ CompletableFuture<NetworkMessage> responseFuture = new
CompletableFuture<>();
- requestsMap.put(correlationId, responseFuture);
+ requestsMap.put(correlationId, new TimeoutObjectImpl(timeout > 0 ?
coarseCurrentTimeMillis() + timeout : 0, responseFuture));
InetSocketAddress recipientAddress =
resolveRecipientAddress(recipient);
@@ -560,9 +570,10 @@ public class DefaultMessagingService extends
AbstractMessagingService {
* @param correlationId Request's correlation id.
*/
private void onInvokeResponse(NetworkMessage response, Long correlationId)
{
- CompletableFuture<NetworkMessage> responseFuture =
requestsMap.remove(correlationId);
+ TimeoutObjectImpl responseFuture = requestsMap.remove(correlationId);
+
if (responseFuture != null) {
- responseFuture.complete(response);
+ responseFuture.future().complete(response);
}
}
@@ -601,6 +612,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
* Starts the service.
*/
public void start() {
+ new IgniteThread(timeoutWorker).start();
+
criticalWorkerRegistry.register(outboundExecutor);
topologyService.addEventHandler(new TopologyEventHandler() {
@@ -617,7 +630,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
public void stop() {
var exception = new NodeStoppingException();
- requestsMap.values().forEach(fut ->
fut.completeExceptionally(exception));
+ requestsMap.values().forEach(fut ->
fut.future().completeExceptionally(exception));
requestsMap.clear();
@@ -626,7 +639,9 @@ public class DefaultMessagingService extends
AbstractMessagingService {
recipientInetAddrByNodeId.clear();
inboundExecutors.close();
- IgniteUtils.shutdownAndAwaitTermination(outboundExecutor, 10,
TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(outboundExecutor, 10, TimeUnit.SECONDS);
+
+ awaitForWorkersStop(List.of(timeoutWorker), true, LOG);
}
private static int stripeCountForIndex(int executorIndex) {
@@ -711,6 +726,38 @@ public class DefaultMessagingService extends
AbstractMessagingService {
}
}
+ /**
+ * Timeout object wrapper for the completable future.
+ */
+ private static class TimeoutObjectImpl implements
TimeoutObject<CompletableFuture<NetworkMessage>> {
+ /** End time (milliseconds since Unix epoch). */
+ private final long endTime;
+
+ /** Target future. */
+ private final CompletableFuture<NetworkMessage> fut;
+
+ /**
+ * Constructor.
+ *
+ * @param endTime End timestamp in milliseconds.
+ * @param fut Target future.
+ */
+ public TimeoutObjectImpl(long endTime,
CompletableFuture<NetworkMessage> fut) {
+ this.endTime = endTime;
+ this.fut = fut;
+ }
+
+ @Override
+ public long endTime() {
+ return endTime;
+ }
+
+ @Override
+ public CompletableFuture<NetworkMessage> future() {
+ return fut;
+ }
+ }
+
/**
* Returns the resolved address of the target node, {@code null} if the
target node is the current node.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index 9ef1e6ba0d..6f3d821f0e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -87,23 +87,22 @@ public class IgniteRpcClient implements RpcClientEx {
) {
CompletableFuture<Message> fut = new CompletableFuture<>();
- fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS).
- whenComplete((res, err) -> {
- assert !(res == null && err == null) : res + " " + err;
+ fut.whenComplete((res, err) -> {
+ assert !(res == null && err == null) : res + " " + err;
- if (err == null && recordPred != null && recordPred.test(res,
this.toString()))
- recordedMsgs.add(new Object[] {res, this.toString(),
fut.hashCode(), System.currentTimeMillis(), null});
+ if (err == null && recordPred != null && recordPred.test(res,
this.toString()))
+ recordedMsgs.add(new Object[] {res, this.toString(),
fut.hashCode(), System.currentTimeMillis(), null});
- if (err instanceof ExecutionException)
- err = new RemotingException(err);
- else if (err instanceof TimeoutException) // Translate timeout
exception.
- err = new InvokeTimeoutException();
+ if (err instanceof ExecutionException)
+ err = new RemotingException(err);
+ else if (err instanceof TimeoutException) // Translate timeout
exception.
+ err = new InvokeTimeoutException();
- Throwable finalErr = err;
+ Throwable finalErr = err;
- // Avoid deadlocks if a closure has completed in the same
thread.
- Utils.runInThread(callback.executor(), () ->
callback.complete(res, finalErr));
- });
+ // Avoid deadlocks if a closure has completed in the same thread.
+ Utils.runInThread(callback.executor(), () ->
callback.complete(res, finalErr));
+ });
// Future hashcode used as corellation id.
if (recordPred != null && recordPred.test(request, peerId.toString()))
@@ -121,6 +120,10 @@ public class IgniteRpcClient implements RpcClientEx {
blockedMsgs.add(msgData);
+ if (timeoutMs > 0) {
+ fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
LOG.info("Blocked message to={} id={} msg={}",
peerId.toString(), msgData[2], S.toString(request));
return fut;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
new file mode 100644
index 0000000000..3b6a795bb7
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.future.timeout.TimeoutObject;
+import org.apache.ignite.internal.future.timeout.TimeoutWorker;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Future timeout benchmark - measures the latency of the assignment of the
future timeout in two ways:
+ * 1. Based on the embedded CompletableFuture#orTimeout.
+ * 2. Based on the additional thread that is scanning collection and
completing all the futures already have been explored.
+ *
+ * <p>Results on 11th Gen Intel® Core™ i7-1165G7 @ 2.80GHz, openjdk 11.0.24,
Windows 10 Pro:
+ * Benchmark (useFutureEmbeddedTimeout) Mode Cnt Score
Error Units
+ * FutureTimeoutBenchmark.test false avgt 20 0,760
± 0,002 us/op
+ * FutureTimeoutBenchmark.test true avgt 20 36,123
± 44,414 us/op
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+public class FutureTimeoutBenchmark {
+ private static AtomicLong ID_GEN = new AtomicLong();
+
+ /** Active operations. */
+ public ConcurrentMap<Long, TimeoutObjectImpl> requestsMap;
+
+ private TimeoutWorker timeoutWorker;
+
+ private ConcurrentMap<Long, CompletableFuture<?>> futs;
+
+ @Param({"false", "true"})
+ private boolean useFutureEmbeddedTimeout;
+
+ /**
+ * Prepare to start the benchmark.
+ */
+ @Setup
+ public void setUp() {
+ if (useFutureEmbeddedTimeout) {
+ futs = new ConcurrentHashMap<>();
+ } else {
+ requestsMap = new ConcurrentHashMap<>();
+ this.timeoutWorker = new TimeoutWorker(
+ Loggers.forClass(FutureTimeoutBenchmark.class),
+ "test-node",
+ "FutureTimeoutBenchmark-timeout-worker",
+ requestsMap,
+ // Client-facing future will fail with a timeout, but
internal ClientRequestFuture will stay in the map -
+ // otherwise we'll fail with "protocol breakdown" error
when a late response arrives from the server.
+ true
+ );
+
+
+ new IgniteThread(timeoutWorker).start();
+ }
+ }
+
+ /**
+ * Closes resources.
+ */
+ @TearDown
+ public void tearDown() throws InterruptedException {
+ if (useFutureEmbeddedTimeout) {
+ for (CompletableFuture<?> fut : futs.values()) {
+ if (!fut.isDone()) {
+ try {
+ fut.get(10, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ assert e.getCause() instanceof TimeoutException :
"Unexpected exception type: " + e.getCause().getClass();
+ } catch (TimeoutException e) {
+ // Ignore exception.
+ break;
+ }
+ }
+ }
+
+ futs.clear();
+ futs = null;
+ } else {
+ assert waitForCondition(requestsMap::isEmpty, 10_000);
+
+ awaitForWorkersStop(List.of(timeoutWorker), true, null);
+ }
+ }
+
+ /**
+ * Target method to benchmark.
+ */
+ @Benchmark
+ public void test() {
+ if (useFutureEmbeddedTimeout) {
+ for (int i = 0; i < 10; i++) {
+ var fut = new CompletableFuture<Void>();
+
+ futs.put(ID_GEN.incrementAndGet(), fut);
+
+ fut.orTimeout(10, TimeUnit.MILLISECONDS);
+ }
+
+ if (futs.size() > 100_000) {
+ futs.clear();
+ }
+ } else {
+ for (int i = 0; i < 10; i++) {
+ requestsMap.put(ID_GEN.incrementAndGet(), new
TimeoutObjectImpl(
+ System.currentTimeMillis() + 10,
+ new CompletableFuture()
+ ));
+ }
+
+ if (requestsMap.size() > 100_000) {
+ requestsMap.clear();
+ }
+ }
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + FutureTimeoutBenchmark.class.getSimpleName() +
".*")
+ .build();
+
+ new Runner(opt).run();
+ }
+
+ /**
+ * Timeout object wrapper for the completable future.
+ */
+ private static class TimeoutObjectImpl implements
TimeoutObject<CompletableFuture<Void>> {
+ /** End time (milliseconds since Unix epoch). */
+ private final long endTime;
+
+ /** Target future. */
+ private final CompletableFuture<Void> fut;
+
+ /**
+ * Constructor.
+ *
+ * @param endTime End timestamp in milliseconds.
+ * @param fut Target future.
+ */
+ public TimeoutObjectImpl(long endTime, CompletableFuture<Void> fut) {
+ this.endTime = endTime;
+ this.fut = fut;
+ }
+
+ @Override
+ public long endTime() {
+ return endTime;
+ }
+
+ @Override
+ public CompletableFuture<Void> future() {
+ return fut;
+ }
+ }
+}