This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 56f15cac54 IGNITE-21626 Make each component of the Inbound pool
striped (#3353)
56f15cac54 is described below
commit 56f15cac54753938ff5cc537d0cba242c9a34665
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Mar 6 14:44:05 2024 +0400
IGNITE-21626 Make each component of the Inbound pool striped (#3353)
---
.../replicator/ReplicationGroupStripes.java | 2 +-
.../thread/AbstractStripedThreadPoolExecutor.java | 12 +--
.../StripedExecutor.java} | 17 ++--
.../thread/StripedScheduledThreadPoolExecutor.java | 2 +-
.../internal/thread/StripedThreadPoolExecutor.java | 8 +-
.../internal/network/DefaultMessagingService.java | 39 ++++++---
...ipedExecutor.java => LazyStripedExecutors.java} | 57 +++++--------
.../network/recovery/HandshakeManagerUtils.java | 3 +-
.../worker/CriticalSingleThreadExecutor.java | 11 ++-
.../worker/CriticalStripedThreadPoolExecutor.java | 96 ++++++++++++++++++++++
10 files changed, 177 insertions(+), 70 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
index d7a0f5d28c..840dea3ba2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
@@ -31,6 +31,6 @@ public class ReplicationGroupStripes {
* @param stripedExecutor Striped executor from which to take a stripe.
*/
public static ExecutorService stripeFor(ReplicationGroupId groupId,
StripedThreadPoolExecutor stripedExecutor) {
- return stripedExecutor.commandExecutor(groupId.hashCode());
+ return stripedExecutor.stripeExecutor(groupId.hashCode());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
index edf7f53a5c..e1341a17fc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
@@ -45,7 +45,7 @@ public abstract class AbstractStripedThreadPoolExecutor<E
extends ExecutorServic
*
* @param execs Executors.
*/
- AbstractStripedThreadPoolExecutor(E[] execs) {
+ public AbstractStripedThreadPoolExecutor(E[] execs) {
this.execs = execs;
}
@@ -58,13 +58,13 @@ public abstract class AbstractStripedThreadPoolExecutor<E
extends ExecutorServic
* @throws NullPointerException If command is null.
*/
public void execute(Runnable task, int idx) {
- commandExecutor(idx).execute(task);
+ stripeExecutor(idx).execute(task);
}
/** {@inheritDoc} */
@Override
public void execute(Runnable task) {
- commandExecutor(random.nextInt(execs.length)).execute(task);
+ stripeExecutor(random.nextInt(execs.length)).execute(task);
}
/**
@@ -78,7 +78,7 @@ public abstract class AbstractStripedThreadPoolExecutor<E
extends ExecutorServic
* @throws NullPointerException If the task is {@code null}.
*/
public CompletableFuture<?> submit(Runnable task, int idx) {
- return CompletableFuture.runAsync(task, commandExecutor(idx));
+ return CompletableFuture.runAsync(task, stripeExecutor(idx));
}
/** {@inheritDoc} */
@@ -199,7 +199,7 @@ public abstract class AbstractStripedThreadPoolExecutor<E
extends ExecutorServic
* @param idx Index of executor.
* @return Executor.
*/
- public E commandExecutor(int idx) {
+ public E stripeExecutor(int idx) {
return execs[threadId(idx)];
}
@@ -210,6 +210,8 @@ public abstract class AbstractStripedThreadPoolExecutor<E
extends ExecutorServic
* @return Stripped thread ID.
*/
private int threadId(int idx) {
+ assert idx >= 0 : "Index is negative: " + idx;
+
return idx < execs.length ? idx : idx % execs.length;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedExecutor.java
similarity index 56%
copy from
modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/thread/StripedExecutor.java
index d7a0f5d28c..f4da848a6a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedExecutor.java
@@ -15,22 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.replicator;
+package org.apache.ignite.internal.thread;
import java.util.concurrent.ExecutorService;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
/**
- * Logic used to calculate a stripe (or its index) to use to handle a
replication request by its replication group ID.
+ * An executor that has stripes: that is, sub-executors to which work may be
submitted by index.
*/
-public class ReplicationGroupStripes {
+public interface StripedExecutor extends ExecutorService {
/**
- * Returns an executor that will execute requests belonging to the given
replication group ID.
+ * Returns an executor by an index.
*
- * @param groupId ID of the group.
- * @param stripedExecutor Striped executor from which to take a stripe.
+ * @param idx Index of executor.
+ * @return Executor.
*/
- public static ExecutorService stripeFor(ReplicationGroupId groupId,
StripedThreadPoolExecutor stripedExecutor) {
- return stripedExecutor.commandExecutor(groupId.hashCode());
- }
+ ExecutorService stripeExecutor(int idx);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
index c976768e63..813216888c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
@@ -80,7 +80,7 @@ public class StripedScheduledThreadPoolExecutor extends
AbstractStripedThreadPoo
* @throws NullPointerException If command or unit is null.
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit, int idx) {
- return commandExecutor(idx).schedule(command, delay, unit);
+ return stripeExecutor(idx).schedule(command, delay, unit);
}
@Override
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
index 66286f1bd7..6ec29e66f5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
/**
* An {@link ExecutorService} that executes submitted tasks using pooled grid
threads.
*/
-public class StripedThreadPoolExecutor extends
AbstractStripedThreadPoolExecutor<ExecutorService> {
+public class StripedThreadPoolExecutor extends
AbstractStripedThreadPoolExecutor<ExecutorService> implements StripedExecutor {
/**
* Create striped thread pool.
*
@@ -41,7 +41,8 @@ public class StripedThreadPoolExecutor extends
AbstractStripedThreadPoolExecutor
int concurrencyLvl,
ThreadFactory threadFactory,
boolean allowCoreThreadTimeOut,
- long keepAliveTime) {
+ long keepAliveTime
+ ) {
super(createExecutors(concurrencyLvl, threadFactory,
allowCoreThreadTimeOut, keepAliveTime));
}
@@ -49,7 +50,8 @@ public class StripedThreadPoolExecutor extends
AbstractStripedThreadPoolExecutor
int concurrencyLvl,
ThreadFactory threadFactory,
boolean allowCoreThreadTimeOut,
- long keepAliveTime) {
+ long keepAliveTime
+ ) {
ExecutorService[] execs = new ExecutorService[concurrencyLvl];
for (int i = 0; i < concurrencyLvl; i++) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 8c8d9d1fde..2071209884 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.network.NettyBootstrapFactory.isInNetwo
import static
org.apache.ignite.internal.network.serialization.PerSessionSerializationService.createClassDescriptorsMessages;
import static
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
@@ -36,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -57,8 +57,10 @@ import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import
org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.StripedExecutor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.worker.CriticalSingleThreadExecutor;
+import org.apache.ignite.internal.worker.CriticalStripedThreadPoolExecutor;
import org.apache.ignite.internal.worker.CriticalWorker;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
import org.apache.ignite.lang.IgniteException;
@@ -71,6 +73,12 @@ import org.jetbrains.annotations.TestOnly;
public class DefaultMessagingService extends AbstractMessagingService {
private static final IgniteLogger LOG =
Loggers.forClass(DefaultMessagingService.class);
+ /**
+ * Maximum number of stripes in the thread pool in which incoming network
messages for the {@link ChannelType#DEFAULT} channel
+ * are handled.
+ */
+ private static final int DEFAULT_CHANNEL_INBOUND_WORKERS = 4;
+
/** Network messages factory. */
private final NetworkMessagesFactory factory;
@@ -100,7 +108,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
private final CriticalSingleThreadExecutor outboundExecutor;
/** Executors for inbound messages. */
- private final LazyStripedExecutor inboundExecutors;
+ private final LazyStripedExecutors inboundExecutors;
// TODO: IGNITE-18493 - remove/move this
@Nullable
@@ -134,7 +142,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
outboundExecutor = new CriticalSingleThreadExecutor(
IgniteThreadFactory.create(nodeName,
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
);
- inboundExecutors = new CriticalLazyStripedExecutor(nodeName,
"MessagingService-inbound", criticalWorkerRegistry);
+ inboundExecutors = new CriticalLazyStripedExecutors(nodeName,
"MessagingService-inbound", criticalWorkerRegistry);
}
/**
@@ -436,7 +444,9 @@ public class DefaultMessagingService extends
AbstractMessagingService {
}
private Executor chooseExecutorInInboundPool(InNetworkObject obj) {
- return inboundExecutors.stripeFor(obj.connectionIndex());
+ int stripeIndex = safeAbs(obj.sender().id().hashCode());
+
+ return inboundExecutors.executorFor(obj.connectionIndex(),
stripeIndex);
}
/**
@@ -596,6 +606,10 @@ public class DefaultMessagingService extends
AbstractMessagingService {
IgniteUtils.shutdownAndAwaitTermination(outboundExecutor, 10,
TimeUnit.SECONDS);
}
+ private static int stripeCountForIndex(int executorIndex) {
+ return executorIndex == ChannelType.DEFAULT.id() ?
DEFAULT_CHANNEL_INBOUND_WORKERS : 1;
+ }
+
// TODO: IGNITE-18493 - remove/move this
/**
* Installs a predicate, it will be consulted with for each message being
sent; when it returns {@code true}, the
@@ -635,7 +649,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return connectionManager;
}
- private static class CriticalLazyStripedExecutor extends
LazyStripedExecutor {
+ private static class CriticalLazyStripedExecutors extends
LazyStripedExecutors {
private final String nodeName;
private final String poolName;
@@ -643,20 +657,23 @@ public class DefaultMessagingService extends
AbstractMessagingService {
private final List<CriticalWorker> registeredWorkers = new
CopyOnWriteArrayList<>();
- CriticalLazyStripedExecutor(String nodeName, String poolName,
CriticalWorkerRegistry workerRegistry) {
+ CriticalLazyStripedExecutors(String nodeName, String poolName,
CriticalWorkerRegistry workerRegistry) {
this.nodeName = nodeName;
this.poolName = poolName;
this.workerRegistry = workerRegistry;
}
@Override
- protected ExecutorService newSingleThreadExecutor(int stripeIndex) {
- ThreadFactory threadFactory = IgniteThreadFactory.create(nodeName,
poolName + "-" + stripeIndex, LOG, NOTHING_ALLOWED);
- CriticalSingleThreadExecutor executor = new
CriticalSingleThreadExecutor(threadFactory);
+ protected StripedExecutor newStripedExecutor(int executorIndex) {
+ int stripeCount = stripeCountForIndex(executorIndex);
- workerRegistry.register(executor);
+ ThreadFactory threadFactory = IgniteThreadFactory.create(nodeName,
poolName + "-" + executorIndex, LOG, NOTHING_ALLOWED);
+ CriticalStripedThreadPoolExecutor executor = new
CriticalStripedThreadPoolExecutor(stripeCount, threadFactory, false, 0);
- registeredWorkers.add(executor);
+ for (CriticalWorker worker : executor.workers()) {
+ workerRegistry.register(worker);
+ registeredWorkers.add(worker);
+ }
return executor;
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutor.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutors.java
similarity index 63%
rename from
modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutor.java
rename to
modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutors.java
index 46cf6db14a..5d90c5070b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutor.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutors.java
@@ -17,94 +17,77 @@
package org.apache.ignite.internal.network;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.util.Objects;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.IntStream;
import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.thread.StripedExecutor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
/**
- * Lazy striped executor indexed with short type. A thread is created on first
execution with an index and remains active forever.
+ * Lazy collection of striped executors indexed with short type. A striped
executor is created on first execution with an index and remains
+ * active forever (until this executor collection is closed).
*
* <p>After having been stopped, it never executes anything.
*/
-abstract class LazyStripedExecutor implements ManuallyCloseable {
+abstract class LazyStripedExecutors implements ManuallyCloseable {
private static final Executor NO_OP_EXECUTOR = task -> {};
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean closed = new AtomicBoolean();
- private final AtomicReferenceArray<ExecutorService> array = new
AtomicReferenceArray<>(Short.MAX_VALUE + 1);
-
- /**
- * Executes a command on a stripe with the given index. If the executor is
stopped, does nothing.
- *
- * @param index Index of the stripe.
- */
- public void execute(short index, Runnable command) {
- assert index >= 0 : "Index is negative: " + index;
-
- if (!busyLock.enterBusy()) {
- return;
- }
-
- try {
- executorFor(index).execute(command);
- } finally {
- busyLock.leaveBusy();
- }
- }
+ private final AtomicReferenceArray<StripedExecutor> array = new
AtomicReferenceArray<>(Short.MAX_VALUE + 1);
/**
* Executes a command on a stripe with the given index. If the executor is
stopped, returns a special executor that executes nothing.
*
- * @param index Index of the stripe.
+ * @param executorIndex Index of the stripe.
*/
- public Executor stripeFor(short index) {
- assert index >= 0 : "Index is negative: " + index;
+ public Executor executorFor(short executorIndex, int stripeIndex) {
+ assert executorIndex >= 0 : "Executor index is negative: " +
executorIndex;
if (!busyLock.enterBusy()) {
return NO_OP_EXECUTOR;
}
try {
- return executorFor(index);
+ return
stripedExecutorFor(executorIndex).stripeExecutor(stripeIndex);
} finally {
busyLock.leaveBusy();
}
}
- private Executor executorFor(short index) {
- ExecutorService existing = array.get(index);
+ private StripedExecutor stripedExecutorFor(short executorIndex) {
+ StripedExecutor existing = array.get(executorIndex);
if (existing != null) {
return existing;
}
synchronized (array) {
- existing = array.get(index);
+ existing = array.get(executorIndex);
if (existing != null) {
return existing;
}
- ExecutorService newExecutor = newSingleThreadExecutor(index);
+ StripedExecutor newExecutor = newStripedExecutor(executorIndex);
- array.set(index, newExecutor);
+ array.set(executorIndex, newExecutor);
return newExecutor;
}
}
/**
- * Creates a new single thread executor to serve a stripe.
+ * Creates a new striped thread pool to serve an index.
*
- * @param stripeIndex Stripe index for which the executor is being created.
+ * @param executorIndex Executor index for which the executor is being
created.
*/
- protected abstract ExecutorService newSingleThreadExecutor(int
stripeIndex);
+ protected abstract StripedExecutor newStripedExecutor(int executorIndex);
@Override
public void close() {
@@ -120,7 +103,7 @@ abstract class LazyStripedExecutor implements
ManuallyCloseable {
.mapToObj(array::get)
.filter(Objects::nonNull)
.parallel()
- .forEach(executorService ->
IgniteUtils.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS));
+ .forEach(executorService ->
IgniteUtils.shutdownAndAwaitTermination(executorService, 10, SECONDS));
}
/**
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index 461f546b69..d78ca5160f 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.network.recovery;
import static java.util.Collections.emptyList;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -121,7 +122,7 @@ class HandshakeManagerUtils {
private static EventLoop eventLoopForKey(ChannelKey channelKey,
ChannelEventLoopsSource eventLoopsSource) {
List<EventLoop> eventLoops = eventLoopsSource.channelEventLoops();
- int index = (channelKey.hashCode() & Integer.MAX_VALUE) %
eventLoops.size();
+ int index = safeAbs(channelKey.hashCode()) % eventLoops.size();
return eventLoops.get(index);
}
diff --git
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
index 20f64f1904..531c61642c 100644
---
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
+++
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.worker;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -31,8 +34,14 @@ public class CriticalSingleThreadExecutor extends
ThreadPoolExecutor implements
private volatile Thread lastSeenThread;
private volatile long heartbeatNanos = NOT_MONITORED;
+ /** Constructor. */
public CriticalSingleThreadExecutor(ThreadFactory threadFactory) {
- super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
threadFactory);
+ this(0, SECONDS, new LinkedBlockingQueue<>(), threadFactory);
+ }
+
+ /** Constructor. */
+ public CriticalSingleThreadExecutor(long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ super(1, 1, keepAliveTime, unit, workQueue, threadFactory);
}
@Override
diff --git
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
new file mode 100644
index 0000000000..107595b5e2
--- /dev/null
+++
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.worker;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toUnmodifiableList;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
+import org.apache.ignite.internal.thread.StripedExecutor;
+import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+
+/**
+ * Same as {@link StripedThreadPoolExecutor}, but each stripe is a critical
worker monitored for being blocked.
+ */
+public class CriticalStripedThreadPoolExecutor extends
AbstractStripedThreadPoolExecutor<ExecutorService> implements StripedExecutor {
+ private final List<CriticalWorker> workers;
+
+ /**
+ * Create a blockage-monitored striped thread pool.
+ *
+ * @param concurrencyLvl Concurrency level (number of stripes).
+ * @param threadFactory Factory used to create threads.
+ * @param allowCoreThreadTimeOut Sets the policy governing whether core
threads may time out and terminate if no tasks arrive within the
+ * keep-alive time.
+ * @param keepAliveTime When the number of threads is greater than the
core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ */
+ public CriticalStripedThreadPoolExecutor(
+ int concurrencyLvl,
+ ThreadFactory threadFactory,
+ boolean allowCoreThreadTimeOut,
+ long keepAliveTime
+ ) {
+ this(createExecutors(concurrencyLvl, threadFactory,
allowCoreThreadTimeOut, keepAliveTime));
+ }
+
+ private CriticalStripedThreadPoolExecutor(CriticalSingleThreadExecutor[]
executors) {
+ super(executors);
+
+ this.workers = Arrays.stream(executors)
+ .map(CriticalWorker.class::cast)
+ .collect(toUnmodifiableList());
+ }
+
+ private static CriticalSingleThreadExecutor[] createExecutors(
+ int concurrencyLvl,
+ ThreadFactory threadFactory,
+ boolean allowCoreThreadTimeOut,
+ long keepAliveTime
+ ) {
+ CriticalSingleThreadExecutor[] execs = new
CriticalSingleThreadExecutor[concurrencyLvl];
+
+ for (int i = 0; i < concurrencyLvl; i++) {
+ CriticalSingleThreadExecutor executor = new
CriticalSingleThreadExecutor(
+ keepAliveTime,
+ MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ threadFactory
+ );
+
+ executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+
+ execs[i] = executor;
+ }
+
+ return execs;
+ }
+
+ /**
+ * Returns workers corresponding to this thread pool.
+ */
+ public Collection<CriticalWorker> workers() {
+ return workers;
+ }
+}