This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 56614d4e6a IGNITE-23076 Add shared timeout worker for all client
channels (#4730)
56614d4e6a is described below
commit 56614d4e6a140a1a15ce5a49402d2a57e898aa8d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Nov 18 15:23:08 2024 +0200
IGNITE-23076 Add shared timeout worker for all client channels (#4730)
Use shared timeout worker for all client channels within the JVM instead of
one worker per channel.
The worker starts on demand, and shuts down if there are no active channels
during some period of time.
---
.../internal/client/ClientTimeoutWorker.java | 91 ++++++++++++++++++++++
.../ignite/internal/client/TcpClientChannel.java | 36 ++++-----
.../apache/ignite/client/ClientMetricsTest.java | 2 +
.../org/apache/ignite/client/ConnectionTest.java | 2 +-
.../internal/future/timeout/TimeoutWorker.java | 16 ++--
.../internal/future/timeout/TimeoutWorkerTest.java | 2 +-
.../internal/network/DefaultMessagingService.java | 1 -
.../internal/benchmark/FutureTimeoutBenchmark.java | 3 -
8 files changed, 116 insertions(+), 37 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTimeoutWorker.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTimeoutWorker.java
new file mode 100644
index 0000000000..77e73cceee
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTimeoutWorker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.future.timeout.TimeoutWorker;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.jetbrains.annotations.Nullable;
+
+final class ClientTimeoutWorker {
+ static final ClientTimeoutWorker INSTANCE = new ClientTimeoutWorker();
+
+ private static final int emptyCountThreshold = 10;
+
+ private @Nullable ScheduledExecutorService executor = null;
+
+ private final Set<TcpClientChannel> channels =
ConcurrentHashMap.newKeySet();
+
+ private int emptyCount;
+
+ private ClientTimeoutWorker() {
+ // No-op.
+ }
+
+ synchronized void registerClientChannel(TcpClientChannel ch) {
+ channels.add(ch);
+ emptyCount = 0;
+
+ if (executor == null) {
+ executor = createExecutor();
+ emptyCount = 0;
+
+ long sleepInterval = TimeoutWorker.getSleepInterval();
+ executor.scheduleAtFixedRate(this::checkTimeouts, sleepInterval,
sleepInterval, MILLISECONDS);
+ }
+ }
+
+ private synchronized void shutdownIfEmpty() {
+ if (executor != null && channels.isEmpty()) {
+ emptyCount++;
+
+ if (emptyCount >= emptyCountThreshold) {
+ executor.shutdown();
+ executor = null;
+ }
+ }
+ }
+
+ private static ScheduledExecutorService createExecutor() {
+ return Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory(
+ "TcpClientChannel-timeout-worker",
+ Loggers.voidLogger()));
+ }
+
+ private void checkTimeouts() {
+ long now = coarseCurrentTimeMillis();
+
+ for (TcpClientChannel ch : channels) {
+ if (ch.closed()) {
+ channels.remove(ch);
+ }
+
+ ch.checkTimeouts(now);
+ }
+
+ shutdownIfEmpty();
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 1e9a64b8ee..d501566c7f 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -22,7 +22,6 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
@@ -33,6 +32,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -61,9 +61,7 @@ import
org.apache.ignite.internal.client.proto.HandshakeExtension;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.future.timeout.TimeoutObject;
-import org.apache.ignite.internal.future.timeout.TimeoutWorker;
import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ViewUtils;
@@ -116,9 +114,6 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** Executor for async operation listeners. */
private final Executor asyncContinuationExecutor;
- /** Timeout worker. */
- private final TimeoutWorker timeoutWorker;
-
/** Connect timeout in milliseconds. */
private final long connectTimeout;
@@ -159,17 +154,6 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
log = ClientUtils.logger(cfg.clientConfiguration(),
TcpClientChannel.class);
- this.timeoutWorker = new TimeoutWorker(
- log,
- cfg.getAddress().getHostString() + cfg.getAddress().getPort(),
- "TcpClientChannel-timeout-worker",
- pendingReqs,
- // Client-facing future will fail with a timeout, but internal
ClientRequestFuture will stay in the map -
- // otherwise we'll fail with "protocol breakdown" error when a
late response arrives from the server.
- false,
- null
- );
-
asyncContinuationExecutor =
cfg.clientConfiguration().asyncContinuationExecutor() == null
? ForkJoinPool.commonPool()
: cfg.clientConfiguration().asyncContinuationExecutor();
@@ -187,8 +171,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
log.debug("Connection established [remoteAddress=" +
s.remoteAddress() + ']');
}
- // TODO: IGNITE-23076 Start single timeout worker thread
for several client in one JVM.
- new IgniteThread(timeoutWorker).start();
+ ClientTimeoutWorker.INSTANCE.registerClientChannel(this);
sock = s;
@@ -270,8 +253,6 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
// Ignore.
}
}
-
- awaitForWorkersStop(List.of(timeoutWorker), true, log);
}
}
@@ -829,6 +810,19 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
});
}
+ void checkTimeouts(long now) {
+ for (Entry<Long, TimeoutObjectImpl> req : pendingReqs.entrySet()) {
+ TimeoutObject<CompletableFuture<ClientMessageUnpacker>>
timeoutObject = req.getValue();
+
+ if (timeoutObject != null && timeoutObject.endTime() > 0 && now >
timeoutObject.endTime()) {
+ // Client-facing future will fail with a timeout, but internal
ClientRequestFuture will stay in the map -
+ // otherwise we'll fail with "protocol breakdown" error when a
late response arrives from the server.
+ CompletableFuture<?> fut = timeoutObject.future();
+ fut.completeExceptionally(new TimeoutException());
+ }
+ }
+ }
+
/**
* Timeout object wrapper for the completable future.
*/
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index c438f8d53d..7e434cfb04 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.metrics.AbstractMetricSource;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.Table;
@@ -55,6 +56,7 @@ import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests client-side metrics (see also server-side metrics tests in {@link
ServerMetricsTest}).
*/
+@WithSystemProperty(key = "IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", value = "10")
public class ClientMetricsTest extends BaseIgniteAbstractTest {
private TestServer server;
private IgniteClient client;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index 44ad102909..0ff3ddf2de 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test;
/**
* Tests client connection to various addresses.
*/
+@WithSystemProperty(key = "IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", value = "10")
public class ConnectionTest extends AbstractClientTest {
@Test
public void testEmptyNodeAddress() {
@@ -111,7 +112,6 @@ public class ConnectionTest extends AbstractClientTest {
@SuppressWarnings("ThrowableNotThrown")
@Test
- @WithSystemProperty(key = "IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", value =
"10")
public void
testNoResponseFromServerWithinOperationTimeoutThrowsException() {
Function<Integer, Integer> responseDelay = x -> x > 2 ? 100 : 0;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
index 3dcc3d75a5..44a13ed53e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
@@ -36,14 +36,11 @@ import org.jetbrains.annotations.Nullable;
*/
public class TimeoutWorker extends IgniteWorker {
/** Worker sleep interval. */
- private final long sleepInterval =
getLong("IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", 500);
+ private final long sleepInterval = getSleepInterval();
/** Active operations. */
public final ConcurrentMap<Long, TimeoutObject<?>> requestsMap;
- /** True means removing object from the operation map on timeout. */
- private final boolean removeOnTimeout;
-
/** Closure to process throwables in the worker thread. */
@Nullable
private final FailureProcessor failureProcessor;
@@ -56,7 +53,6 @@ public class TimeoutWorker extends IgniteWorker {
* @param name Worker name. Note that in general thread name and worker
(runnable) name are two different things. The same
* worker can be executed by multiple threads and therefore for
logging and debugging purposes we separate the two.
* @param requestsMap Active operations.
- * @param removeOnTimeout Remove operation from map.
* @param failureProcessor Closure to process throwables in the worker
thread.
*/
public TimeoutWorker(
@@ -64,13 +60,11 @@ public class TimeoutWorker extends IgniteWorker {
String igniteInstanceName,
String name,
ConcurrentMap requestsMap,
- boolean removeOnTimeout,
@Nullable FailureProcessor failureProcessor
) {
super(log, igniteInstanceName, name, null);
this.requestsMap = requestsMap;
- this.removeOnTimeout = removeOnTimeout;
this.failureProcessor = failureProcessor;
}
@@ -95,9 +89,7 @@ public class TimeoutWorker extends IgniteWorker {
if (!fut.isDone()) {
fut.completeExceptionally(new TimeoutException());
- if (removeOnTimeout) {
- requestsMap.remove(entry.getKey(),
timeoutObject);
- }
+ requestsMap.remove(entry.getKey(), timeoutObject);
}
}
}
@@ -119,4 +111,8 @@ public class TimeoutWorker extends IgniteWorker {
}
}
}
+
+ public static long getSleepInterval() {
+ return getLong("IGNITE_TIMEOUT_WORKER_SLEEP_INTERVAL", 500);
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/future/timeout/TimeoutWorkerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/future/timeout/TimeoutWorkerTest.java
index e3c0796bcf..b51b6e6807 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/future/timeout/TimeoutWorkerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/future/timeout/TimeoutWorkerTest.java
@@ -52,7 +52,7 @@ public class TimeoutWorkerTest {
private static final IgniteLogger LOG =
Loggers.forClass(TimeoutWorkerTest.class);
private TimeoutWorker createTimeoutWorker(String name, ConcurrentMap
reqMap, FailureProcessor failureProcessor) {
- return new TimeoutWorker(LOG, "node", name, reqMap, true,
failureProcessor);
+ return new TimeoutWorker(LOG, "node", name, reqMap, failureProcessor);
}
@Test
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index ada50bdcad..deb130cfe3 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -173,7 +173,6 @@ public class DefaultMessagingService extends
AbstractMessagingService {
nodeName,
"MessagingService-timeout-worker",
requestsMap,
- true,
failureManager
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
index ae9a346428..f9f366139e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/FutureTimeoutBenchmark.java
@@ -94,9 +94,6 @@ public class FutureTimeoutBenchmark {
"test-node",
"FutureTimeoutBenchmark-timeout-worker",
requestsMap,
- // Client-facing future will fail with a timeout, but
internal ClientRequestFuture will stay in the map -
- // otherwise we'll fail with "protocol breakdown" error
when a late response arrives from the server.
- true,
null
);