This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 56f15cac54 IGNITE-21626 Make each component of the Inbound pool 
striped (#3353)
56f15cac54 is described below

commit 56f15cac54753938ff5cc537d0cba242c9a34665
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Mar 6 14:44:05 2024 +0400

    IGNITE-21626 Make each component of the Inbound pool striped (#3353)
---
 .../replicator/ReplicationGroupStripes.java        |  2 +-
 .../thread/AbstractStripedThreadPoolExecutor.java  | 12 +--
 .../StripedExecutor.java}                          | 17 ++--
 .../thread/StripedScheduledThreadPoolExecutor.java |  2 +-
 .../internal/thread/StripedThreadPoolExecutor.java |  8 +-
 .../internal/network/DefaultMessagingService.java  | 39 ++++++---
 ...ipedExecutor.java => LazyStripedExecutors.java} | 57 +++++--------
 .../network/recovery/HandshakeManagerUtils.java    |  3 +-
 .../worker/CriticalSingleThreadExecutor.java       | 11 ++-
 .../worker/CriticalStripedThreadPoolExecutor.java  | 96 ++++++++++++++++++++++
 10 files changed, 177 insertions(+), 70 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
index d7a0f5d28c..840dea3ba2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
@@ -31,6 +31,6 @@ public class ReplicationGroupStripes {
      * @param stripedExecutor Striped executor from which to take a stripe.
      */
     public static ExecutorService stripeFor(ReplicationGroupId groupId, 
StripedThreadPoolExecutor stripedExecutor) {
-        return stripedExecutor.commandExecutor(groupId.hashCode());
+        return stripedExecutor.stripeExecutor(groupId.hashCode());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
index edf7f53a5c..e1341a17fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/AbstractStripedThreadPoolExecutor.java
@@ -45,7 +45,7 @@ public abstract class AbstractStripedThreadPoolExecutor<E 
extends ExecutorServic
      *
      * @param execs Executors.
      */
-    AbstractStripedThreadPoolExecutor(E[] execs) {
+    public AbstractStripedThreadPoolExecutor(E[] execs) {
         this.execs = execs;
     }
 
@@ -58,13 +58,13 @@ public abstract class AbstractStripedThreadPoolExecutor<E 
extends ExecutorServic
      * @throws NullPointerException If command is null.
      */
     public void execute(Runnable task, int idx) {
-        commandExecutor(idx).execute(task);
+        stripeExecutor(idx).execute(task);
     }
 
     /** {@inheritDoc} */
     @Override
     public void execute(Runnable task) {
-        commandExecutor(random.nextInt(execs.length)).execute(task);
+        stripeExecutor(random.nextInt(execs.length)).execute(task);
     }
 
     /**
@@ -78,7 +78,7 @@ public abstract class AbstractStripedThreadPoolExecutor<E 
extends ExecutorServic
      * @throws NullPointerException If the task is {@code null}.
      */
     public CompletableFuture<?> submit(Runnable task, int idx) {
-        return CompletableFuture.runAsync(task, commandExecutor(idx));
+        return CompletableFuture.runAsync(task, stripeExecutor(idx));
     }
 
     /** {@inheritDoc} */
@@ -199,7 +199,7 @@ public abstract class AbstractStripedThreadPoolExecutor<E 
extends ExecutorServic
      * @param idx Index of executor.
      * @return Executor.
      */
-    public E commandExecutor(int idx) {
+    public E stripeExecutor(int idx) {
         return execs[threadId(idx)];
     }
 
@@ -210,6 +210,8 @@ public abstract class AbstractStripedThreadPoolExecutor<E 
extends ExecutorServic
      * @return Stripped thread ID.
      */
     private int threadId(int idx) {
+        assert idx >= 0 : "Index is negative: " + idx;
+
         return idx < execs.length ? idx : idx % execs.length;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedExecutor.java
similarity index 56%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/thread/StripedExecutor.java
index d7a0f5d28c..f4da848a6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupStripes.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedExecutor.java
@@ -15,22 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.replicator;
+package org.apache.ignite.internal.thread;
 
 import java.util.concurrent.ExecutorService;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 
 /**
- * Logic used to calculate a stripe (or its index) to use to handle a 
replication request by its replication group ID.
+ * An executor that has stripes: that is, sub-executors to which work may be 
submitted by index.
  */
-public class ReplicationGroupStripes {
+public interface StripedExecutor extends ExecutorService {
     /**
-     * Returns an executor that will execute requests belonging to the given 
replication group ID.
+     * Returns an executor by an index.
      *
-     * @param groupId ID of the group.
-     * @param stripedExecutor Striped executor from which to take a stripe.
+     * @param idx Index of executor.
+     * @return Executor.
      */
-    public static ExecutorService stripeFor(ReplicationGroupId groupId, 
StripedThreadPoolExecutor stripedExecutor) {
-        return stripedExecutor.commandExecutor(groupId.hashCode());
-    }
+    ExecutorService stripeExecutor(int idx);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
index c976768e63..813216888c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedScheduledThreadPoolExecutor.java
@@ -80,7 +80,7 @@ public class StripedScheduledThreadPoolExecutor extends 
AbstractStripedThreadPoo
      * @throws NullPointerException If command or unit is null.
      */
     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit, int idx) {
-        return commandExecutor(idx).schedule(command, delay, unit);
+        return stripeExecutor(idx).schedule(command, delay, unit);
     }
 
     @Override
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
index 66286f1bd7..6ec29e66f5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/StripedThreadPoolExecutor.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid 
threads.
  */
-public class StripedThreadPoolExecutor extends 
AbstractStripedThreadPoolExecutor<ExecutorService> {
+public class StripedThreadPoolExecutor extends 
AbstractStripedThreadPoolExecutor<ExecutorService> implements StripedExecutor {
     /**
      * Create striped thread pool.
      *
@@ -41,7 +41,8 @@ public class StripedThreadPoolExecutor extends 
AbstractStripedThreadPoolExecutor
             int concurrencyLvl,
             ThreadFactory threadFactory,
             boolean allowCoreThreadTimeOut,
-            long keepAliveTime) {
+            long keepAliveTime
+    ) {
         super(createExecutors(concurrencyLvl, threadFactory, 
allowCoreThreadTimeOut, keepAliveTime));
     }
 
@@ -49,7 +50,8 @@ public class StripedThreadPoolExecutor extends 
AbstractStripedThreadPoolExecutor
             int concurrencyLvl,
             ThreadFactory threadFactory,
             boolean allowCoreThreadTimeOut,
-            long keepAliveTime) {
+            long keepAliveTime
+    ) {
         ExecutorService[] execs = new ExecutorService[concurrencyLvl];
 
         for (int i = 0; i < concurrencyLvl; i++) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 8c8d9d1fde..2071209884 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.network.NettyBootstrapFactory.isInNetwo
 import static 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService.createClassDescriptorsMessages;
 import static 
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
@@ -36,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -57,8 +57,10 @@ import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import 
org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
 import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.StripedExecutor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.worker.CriticalSingleThreadExecutor;
+import org.apache.ignite.internal.worker.CriticalStripedThreadPoolExecutor;
 import org.apache.ignite.internal.worker.CriticalWorker;
 import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
 import org.apache.ignite.lang.IgniteException;
@@ -71,6 +73,12 @@ import org.jetbrains.annotations.TestOnly;
 public class DefaultMessagingService extends AbstractMessagingService {
     private static final IgniteLogger LOG = 
Loggers.forClass(DefaultMessagingService.class);
 
+    /**
+     * Maximum number of stripes in the thread pool in which incoming network 
messages for the {@link ChannelType#DEFAULT} channel
+     * are handled.
+     */
+    private static final int DEFAULT_CHANNEL_INBOUND_WORKERS = 4;
+
     /** Network messages factory. */
     private final NetworkMessagesFactory factory;
 
@@ -100,7 +108,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     private final CriticalSingleThreadExecutor outboundExecutor;
 
     /** Executors for inbound messages. */
-    private final LazyStripedExecutor inboundExecutors;
+    private final LazyStripedExecutors inboundExecutors;
 
     // TODO: IGNITE-18493 - remove/move this
     @Nullable
@@ -134,7 +142,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         outboundExecutor = new CriticalSingleThreadExecutor(
                 IgniteThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
         );
-        inboundExecutors = new CriticalLazyStripedExecutor(nodeName, 
"MessagingService-inbound", criticalWorkerRegistry);
+        inboundExecutors = new CriticalLazyStripedExecutors(nodeName, 
"MessagingService-inbound", criticalWorkerRegistry);
     }
 
     /**
@@ -436,7 +444,9 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     }
 
     private Executor chooseExecutorInInboundPool(InNetworkObject obj) {
-        return inboundExecutors.stripeFor(obj.connectionIndex());
+        int stripeIndex = safeAbs(obj.sender().id().hashCode());
+
+        return inboundExecutors.executorFor(obj.connectionIndex(), 
stripeIndex);
     }
 
     /**
@@ -596,6 +606,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         IgniteUtils.shutdownAndAwaitTermination(outboundExecutor, 10, 
TimeUnit.SECONDS);
     }
 
+    private static int stripeCountForIndex(int executorIndex) {
+        return executorIndex == ChannelType.DEFAULT.id() ? 
DEFAULT_CHANNEL_INBOUND_WORKERS : 1;
+    }
+
     // TODO: IGNITE-18493 - remove/move this
     /**
      * Installs a predicate, it will be consulted with for each message being 
sent; when it returns {@code true}, the
@@ -635,7 +649,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         return connectionManager;
     }
 
-    private static class CriticalLazyStripedExecutor extends 
LazyStripedExecutor {
+    private static class CriticalLazyStripedExecutors extends 
LazyStripedExecutors {
         private final String nodeName;
         private final String poolName;
 
@@ -643,20 +657,23 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         private final List<CriticalWorker> registeredWorkers = new 
CopyOnWriteArrayList<>();
 
-        CriticalLazyStripedExecutor(String nodeName, String poolName, 
CriticalWorkerRegistry workerRegistry) {
+        CriticalLazyStripedExecutors(String nodeName, String poolName, 
CriticalWorkerRegistry workerRegistry) {
             this.nodeName = nodeName;
             this.poolName = poolName;
             this.workerRegistry = workerRegistry;
         }
 
         @Override
-        protected ExecutorService newSingleThreadExecutor(int stripeIndex) {
-            ThreadFactory threadFactory = IgniteThreadFactory.create(nodeName, 
poolName + "-" + stripeIndex, LOG, NOTHING_ALLOWED);
-            CriticalSingleThreadExecutor executor = new 
CriticalSingleThreadExecutor(threadFactory);
+        protected StripedExecutor newStripedExecutor(int executorIndex) {
+            int stripeCount = stripeCountForIndex(executorIndex);
 
-            workerRegistry.register(executor);
+            ThreadFactory threadFactory = IgniteThreadFactory.create(nodeName, 
poolName + "-" + executorIndex, LOG, NOTHING_ALLOWED);
+            CriticalStripedThreadPoolExecutor executor = new 
CriticalStripedThreadPoolExecutor(stripeCount, threadFactory, false, 0);
 
-            registeredWorkers.add(executor);
+            for (CriticalWorker worker : executor.workers()) {
+                workerRegistry.register(worker);
+                registeredWorkers.add(worker);
+            }
 
             return executor;
         }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutor.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutors.java
similarity index 63%
rename from 
modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutor.java
rename to 
modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutors.java
index 46cf6db14a..5d90c5070b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutor.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/LazyStripedExecutors.java
@@ -17,94 +17,77 @@
 
 package org.apache.ignite.internal.network;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.Objects;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.thread.StripedExecutor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 
 /**
- * Lazy striped executor indexed with short type. A thread is created on first 
execution with an index and remains active forever.
+ * Lazy collection of striped executors indexed with short type. A striped 
executor is created on first execution with an index and remains
+ * active forever (until this executor collection is closed).
  *
  * <p>After having been stopped, it never executes anything.
  */
-abstract class LazyStripedExecutor implements ManuallyCloseable {
+abstract class LazyStripedExecutors implements ManuallyCloseable {
     private static final Executor NO_OP_EXECUTOR = task -> {};
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
     private final AtomicBoolean closed = new AtomicBoolean();
-    private final AtomicReferenceArray<ExecutorService> array = new 
AtomicReferenceArray<>(Short.MAX_VALUE + 1);
-
-    /**
-     * Executes a command on a stripe with the given index. If the executor is 
stopped, does nothing.
-     *
-     * @param index Index of the stripe.
-     */
-    public void execute(short index, Runnable command) {
-        assert index >= 0 : "Index is negative: " + index;
-
-        if (!busyLock.enterBusy()) {
-            return;
-        }
-
-        try {
-            executorFor(index).execute(command);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
+    private final AtomicReferenceArray<StripedExecutor> array = new 
AtomicReferenceArray<>(Short.MAX_VALUE + 1);
 
     /**
      * Executes a command on a stripe with the given index. If the executor is 
stopped, returns a special executor that executes nothing.
      *
-     * @param index Index of the stripe.
+     * @param executorIndex Index of the stripe.
      */
-    public Executor stripeFor(short index) {
-        assert index >= 0 : "Index is negative: " + index;
+    public Executor executorFor(short executorIndex, int stripeIndex) {
+        assert executorIndex >= 0 : "Executor index is negative: " + 
executorIndex;
 
         if (!busyLock.enterBusy()) {
             return NO_OP_EXECUTOR;
         }
 
         try {
-            return executorFor(index);
+            return 
stripedExecutorFor(executorIndex).stripeExecutor(stripeIndex);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private Executor executorFor(short index) {
-        ExecutorService existing = array.get(index);
+    private StripedExecutor stripedExecutorFor(short executorIndex) {
+        StripedExecutor existing = array.get(executorIndex);
 
         if (existing != null) {
             return existing;
         }
 
         synchronized (array) {
-            existing = array.get(index);
+            existing = array.get(executorIndex);
             if (existing != null) {
                 return existing;
             }
 
-            ExecutorService newExecutor = newSingleThreadExecutor(index);
+            StripedExecutor newExecutor = newStripedExecutor(executorIndex);
 
-            array.set(index, newExecutor);
+            array.set(executorIndex, newExecutor);
 
             return newExecutor;
         }
     }
 
     /**
-     * Creates a new single thread executor to serve a stripe.
+     * Creates a new striped thread pool to serve an index.
      *
-     * @param stripeIndex Stripe index for which the executor is being created.
+     * @param executorIndex Executor index for which the executor is being 
created.
      */
-    protected abstract ExecutorService newSingleThreadExecutor(int 
stripeIndex);
+    protected abstract StripedExecutor newStripedExecutor(int executorIndex);
 
     @Override
     public void close() {
@@ -120,7 +103,7 @@ abstract class LazyStripedExecutor implements 
ManuallyCloseable {
                 .mapToObj(array::get)
                 .filter(Objects::nonNull)
                 .parallel()
-                .forEach(executorService -> 
IgniteUtils.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS));
+                .forEach(executorService -> 
IgniteUtils.shutdownAndAwaitTermination(executorService, 10, SECONDS));
     }
 
     /**
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index 461f546b69..d78ca5160f 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.network.recovery;
 
 import static java.util.Collections.emptyList;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -121,7 +122,7 @@ class HandshakeManagerUtils {
     private static EventLoop eventLoopForKey(ChannelKey channelKey, 
ChannelEventLoopsSource eventLoopsSource) {
         List<EventLoop> eventLoops = eventLoopsSource.channelEventLoops();
 
-        int index = (channelKey.hashCode() & Integer.MAX_VALUE) % 
eventLoops.size();
+        int index = safeAbs(channelKey.hashCode()) % eventLoops.size();
 
         return eventLoops.get(index);
     }
diff --git 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
index 20f64f1904..531c61642c 100644
--- 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
+++ 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.worker;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -31,8 +34,14 @@ public class CriticalSingleThreadExecutor extends 
ThreadPoolExecutor implements
     private volatile Thread lastSeenThread;
     private volatile long heartbeatNanos = NOT_MONITORED;
 
+    /** Constructor. */
     public CriticalSingleThreadExecutor(ThreadFactory threadFactory) {
-        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), 
threadFactory);
+        this(0, SECONDS, new LinkedBlockingQueue<>(), threadFactory);
+    }
+
+    /** Constructor. */
+    public CriticalSingleThreadExecutor(long keepAliveTime, TimeUnit unit, 
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+        super(1, 1, keepAliveTime, unit, workQueue, threadFactory);
     }
 
     @Override
diff --git 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
new file mode 100644
index 0000000000..107595b5e2
--- /dev/null
+++ 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.worker;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toUnmodifiableList;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
+import org.apache.ignite.internal.thread.StripedExecutor;
+import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+
+/**
+ * Same as {@link StripedThreadPoolExecutor}, but each stripe is a critical 
worker monitored for being blocked.
+ */
+public class CriticalStripedThreadPoolExecutor extends 
AbstractStripedThreadPoolExecutor<ExecutorService> implements StripedExecutor {
+    private final List<CriticalWorker> workers;
+
+    /**
+     * Create a blockage-monitored striped thread pool.
+     *
+     * @param concurrencyLvl Concurrency level (number of stripes).
+     * @param threadFactory Factory used to create threads.
+     * @param allowCoreThreadTimeOut Sets the policy governing whether core 
threads may time out and terminate if no tasks arrive within the
+     *     keep-alive time.
+     * @param keepAliveTime When the number of threads is greater than the 
core, this is the maximum time that excess idle threads
+     *     will wait for new tasks before terminating.
+     */
+    public CriticalStripedThreadPoolExecutor(
+            int concurrencyLvl,
+            ThreadFactory threadFactory,
+            boolean allowCoreThreadTimeOut,
+            long keepAliveTime
+    ) {
+        this(createExecutors(concurrencyLvl, threadFactory, 
allowCoreThreadTimeOut, keepAliveTime));
+    }
+
+    private CriticalStripedThreadPoolExecutor(CriticalSingleThreadExecutor[] 
executors) {
+        super(executors);
+
+        this.workers = Arrays.stream(executors)
+                .map(CriticalWorker.class::cast)
+                .collect(toUnmodifiableList());
+    }
+
+    private static CriticalSingleThreadExecutor[] createExecutors(
+            int concurrencyLvl,
+            ThreadFactory threadFactory,
+            boolean allowCoreThreadTimeOut,
+            long keepAliveTime
+    ) {
+        CriticalSingleThreadExecutor[] execs = new 
CriticalSingleThreadExecutor[concurrencyLvl];
+
+        for (int i = 0; i < concurrencyLvl; i++) {
+            CriticalSingleThreadExecutor executor = new 
CriticalSingleThreadExecutor(
+                    keepAliveTime,
+                    MILLISECONDS,
+                    new LinkedBlockingQueue<>(),
+                    threadFactory
+            );
+
+            executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+
+            execs[i] = executor;
+        }
+
+        return execs;
+    }
+
+    /**
+     * Returns workers corresponding to this thread pool.
+     */
+    public Collection<CriticalWorker> workers() {
+        return workers;
+    }
+}

Reply via email to