This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ced0ebba09 IGNITE-15568 Striped Disruptor doesn't work with JRaft 
event handlers properly (#3811)
ced0ebba09 is described below

commit ced0ebba0969ad1b75ee94ca5a252aef15d97955
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon May 27 18:11:43 2024 +0300

    IGNITE-15568 Striped Disruptor doesn't work with JRaft event handlers 
properly (#3811)
---
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |  32 ++-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  18 +-
 .../raft/jraft/core/ReadOnlyServiceImpl.java       |  23 +-
 .../raft/jraft/disruptor/DisruptorBuilder.java     |   7 +-
 .../{NodeIdAware.java => DisruptorEventType.java}  |  20 +-
 .../ignite/raft/jraft/disruptor/NodeIdAware.java   |  26 +-
 .../raft/jraft/disruptor/StripedDisruptor.java     | 234 ++++++++++--------
 .../raft/jraft/storage/impl/LogManagerImpl.java    |  16 +-
 .../ignite/disruptor/StripedDisruptorTest.java     | 261 ++++++++++++++++++++-
 9 files changed, 469 insertions(+), 168 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index a157eee51c..73843ef562 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
 import org.apache.ignite.raft.jraft.closure.TaskClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
 import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
@@ -97,10 +98,7 @@ public class FSMCallerImpl implements FSMCaller {
     /**
      * Apply task for disruptor.
      */
-    public static class ApplyTask implements NodeIdAware {
-        /** Raft node id. */
-        NodeId nodeId;
-
+    public static class ApplyTask extends NodeIdAware {
         public TaskType type;
         // union fields
         public long committedIndex;
@@ -111,11 +109,9 @@ public class FSMCallerImpl implements FSMCaller {
         public CountDownLatch shutdownLatch;
 
         @Override
-        public NodeId nodeId() {
-            return nodeId;
-        }
-
         public void reset() {
+            super.reset();
+
             this.type = null;
             this.committedIndex = 0;
             this.term = 0;
@@ -123,7 +119,6 @@ public class FSMCallerImpl implements FSMCaller {
             this.leaderChangeCtx = null;
             this.done = null;
             this.shutdownLatch = null;
-            this.nodeId = null;
         }
     }
 
@@ -205,6 +200,7 @@ public class FSMCallerImpl implements FSMCaller {
 
             Utils.runInThread(this.node.getOptions().getCommonExecutor(), () 
-> this.taskQueue.publishEvent((task, sequence) -> {
                 task.reset();
+
                 task.nodeId = this.nodeId;
                 task.type = TaskType.SHUTDOWN;
                 task.shutdownLatch = latch;
@@ -237,6 +233,8 @@ public class FSMCallerImpl implements FSMCaller {
     public boolean onCommitted(final long committedIndex) {
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.COMMITTED;
             task.committedIndex = committedIndex;
         });
@@ -250,6 +248,8 @@ public class FSMCallerImpl implements FSMCaller {
         final CountDownLatch latch = new CountDownLatch(1);
         enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.FLUSH;
             task.shutdownLatch = latch;
         });
@@ -260,6 +260,8 @@ public class FSMCallerImpl implements FSMCaller {
     public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.SNAPSHOT_LOAD;
             task.done = done;
         });
@@ -269,6 +271,8 @@ public class FSMCallerImpl implements FSMCaller {
     public boolean onSnapshotSave(final SaveSnapshotClosure done) {
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.SNAPSHOT_SAVE;
             task.done = done;
         });
@@ -278,6 +282,8 @@ public class FSMCallerImpl implements FSMCaller {
     public boolean onLeaderStop(final Status status) {
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.LEADER_STOP;
             task.status = new Status(status);
         });
@@ -287,6 +293,8 @@ public class FSMCallerImpl implements FSMCaller {
     public boolean onLeaderStart(final long term) {
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.LEADER_START;
             task.term = term;
         });
@@ -296,6 +304,8 @@ public class FSMCallerImpl implements FSMCaller {
     public boolean onStartFollowing(final LeaderChangeContext ctx) {
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.START_FOLLOWING;
             task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), 
ctx.getTerm(), ctx.getStatus());
         });
@@ -305,6 +315,8 @@ public class FSMCallerImpl implements FSMCaller {
     public boolean onStopFollowing(final LeaderChangeContext ctx) {
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.STOP_FOLLOWING;
             task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), 
ctx.getTerm(), ctx.getStatus());
         });
@@ -343,6 +355,8 @@ public class FSMCallerImpl implements FSMCaller {
         final OnErrorClosure c = new OnErrorClosure(error);
         return enqueueTask((task, sequence) -> {
             task.nodeId = this.nodeId;
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.type = TaskType.ERROR;
             task.done = c;
         });
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index b41929d240..9d836a27b2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -65,7 +65,8 @@ import 
org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
-import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType
+;import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.Ballot;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
@@ -259,22 +260,16 @@ public class NodeImpl implements Node, RaftServerService {
     /**
      * Node service event.
      */
-    public static class LogEntryAndClosure implements NodeIdAware {
-        /** Raft node id. */
-        NodeId nodeId;
-
+    public static class LogEntryAndClosure extends NodeIdAware {
         LogEntry entry;
         Closure done;
         long expectedTerm;
         CountDownLatch shutdownLatch;
 
         @Override
-        public NodeId nodeId() {
-            return nodeId;
-        }
-
         public void reset() {
-            this.nodeId = null;
+            super.reset();
+
             this.entry = null;
             this.done = null;
             this.expectedTerm = 0;
@@ -1849,6 +1844,7 @@ public class NodeImpl implements Node, RaftServerService {
 
         final EventTranslator<LogEntryAndClosure> translator = (event, 
sequence) -> {
             event.reset();
+
             event.nodeId = getNodeId();
             event.done = task.getDone();
             event.entry = entry;
@@ -3128,6 +3124,8 @@ public class NodeImpl implements Node, RaftServerService {
                     Utils.runInThread(this.getOptions().getCommonExecutor(),
                         () -> this.applyQueue.publishEvent((event, sequence) 
-> {
                             event.nodeId = getNodeId();
+                            event.handler = null;
+                            event.evtType = DisruptorEventType.REGULAR;
                             event.shutdownLatch = latch;
                         }));
                 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index f97eea2ae2..4914833ce9 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.raft.jraft.FSMCaller.LastAppliedLogIndexListener;
 import org.apache.ignite.raft.jraft.ReadOnlyService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
 import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.NodeId;
@@ -86,27 +87,21 @@ public class ReadOnlyServiceImpl implements 
ReadOnlyService, LastAppliedLogIndex
 
     private static final IgniteLogger LOG = 
Loggers.forClass(ReadOnlyServiceImpl.class);
 
-    public static class ReadIndexEvent implements NodeIdAware {
-        /** Raft node id. */
-        NodeId nodeId;
-
+    public static class ReadIndexEvent extends NodeIdAware {
         Bytes requestContext;
         ReadIndexClosure done;
         CountDownLatch shutdownLatch;
         long startTime;
 
-        private void reset() {
-            this.nodeId = null;
+        @Override
+        public void reset() {
+            super.reset();
+
             this.requestContext = null;
             this.done = null;
             this.shutdownLatch = null;
             this.startTime = 0L;
         }
-
-        @Override
-        public NodeId nodeId() {
-            return nodeId;
-        }
     }
 
     private static class ReadIndexEventFactory implements 
EventFactory<ReadIndexEvent> {
@@ -309,6 +304,8 @@ public class ReadOnlyServiceImpl implements 
ReadOnlyService, LastAppliedLogIndex
         Utils.runInThread(this.node.getOptions().getCommonExecutor(),
             () -> this.readIndexQueue.publishEvent((event, sequence) -> {
                 event.nodeId = this.node.getNodeId();
+                event.handler = null;
+                event.evtType = DisruptorEventType.REGULAR;
                 event.shutdownLatch = this.shutdownLatch;
             }));
     }
@@ -332,6 +329,8 @@ public class ReadOnlyServiceImpl implements 
ReadOnlyService, LastAppliedLogIndex
         try {
             EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
                 event.nodeId = this.node.getNodeId();
+                event.handler = null;
+                event.evtType = DisruptorEventType.REGULAR;
                 event.done = closure;
                 event.requestContext = new Bytes(reqCtx);
                 event.startTime = Utils.monotonicMs();
@@ -417,6 +416,8 @@ public class ReadOnlyServiceImpl implements 
ReadOnlyService, LastAppliedLogIndex
         final CountDownLatch latch = new CountDownLatch(1);
         this.readIndexQueue.publishEvent((task, sequence) -> {
             task.nodeId = this.node.getNodeId();
+            task.handler = null;
+            task.evtType = DisruptorEventType.REGULAR;
             task.shutdownLatch = latch;
         });
         latch.await();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
index a35392eaa2..bcc799d8bc 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
@@ -14,14 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.raft.jraft.disruptor;
 
-import java.util.concurrent.ThreadFactory;
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import java.util.concurrent.ThreadFactory;
 import org.apache.ignite.raft.jraft.util.Requires;
 
 /**
@@ -91,8 +92,6 @@ public class DisruptorBuilder<T> {
         Requires.requireNonNull(this.eventFactory, "Event factory not set");
         Requires.requireNonNull(this.threadFactory, "Thread factory not set");
 
-        return new Disruptor<>(this.eventFactory, this.ringBufferSize, 
this.threadFactory, this.producerType,
-            this.waitStrategy);
+        return new Disruptor<>(this.eventFactory, this.ringBufferSize, 
this.threadFactory, this.producerType, this.waitStrategy);
     }
-
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorEventType.java
similarity index 65%
copy from 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
copy to 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorEventType.java
index fd91823dee..62040606c2 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorEventType.java
@@ -17,17 +17,21 @@
 
 package org.apache.ignite.raft.jraft.disruptor;
 
+import com.lmax.disruptor.EventHandler;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 
 /**
- * Interface provides Raft node id.
- * It allows to determine a stripe in Striped disruptor.
+ * There are different types of striped disruptor events. The disruptor uses 
some events for technical purposes,
+  * so it is necessary to distinguish types.
  */
-public interface NodeIdAware {
+public enum DisruptorEventType {
     /**
-     * Gets a Raft node id.
-     *
-     * @return Raft node id.
-     */
-    NodeId nodeId();
+    * This event type matches the technical striped disruptor event, look at 
{@link StripedDisruptor#subscribe( NodeId, EventHandler)}.
+    */
+    SUBSCRIBE,
+
+    /**
+    * This event type matches the regular event in the striped disruptor.
+    */
+    REGULAR
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
index fd91823dee..8ebe955a53 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
@@ -17,17 +17,35 @@
 
 package org.apache.ignite.raft.jraft.disruptor;
 
+import static 
org.apache.ignite.raft.jraft.disruptor.DisruptorEventType.REGULAR;
+import com.lmax.disruptor.EventHandler;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 
 /**
- * Interface provides Raft node id.
- * It allows to determine a stripe in Striped disruptor.
+ * Interface provides Raft node id. It allows to determine a stripe in Striped 
disruptor.
  */
-public interface NodeIdAware {
+public abstract class NodeIdAware {
+    /** Raft node id. */
+    public NodeId nodeId;
+
+    /** The event handler is used to {@link DisruptorEventType#SUBSCRIBE} in 
other cases, it should be {@code null}. */
+    public EventHandler<NodeIdAware> handler;
+
+    /** Disruptor event type. */
+    public DisruptorEventType evtType;
+
     /**
      * Gets a Raft node id.
      *
      * @return Raft node id.
      */
-    NodeId nodeId();
+    public NodeId nodeId() {
+        return nodeId;
+    }
+
+    public void reset() {
+        nodeId = null;
+        handler = null;
+        evtType = REGULAR;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index 0010003dce..d3eea69136 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -14,9 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.raft.jraft.disruptor;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.raft.jraft.disruptor.DisruptorEventType.SUBSCRIBE;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventFactory;
@@ -27,6 +29,8 @@ import com.lmax.disruptor.YieldingWaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,15 +44,25 @@ import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Stripe Disruptor is a set of queues which process several independent 
groups in one queue (in the stripe).
- * It makes fewer threads that the groups and gives the same sequential 
guaranties and a close performance.
+ * Stripe Disruptor is a set of queues which process several independent 
groups in one queue (in the stripe). It makes fewer threads that
+ * the groups and gives the same sequential guaranties and a close performance.
  *
  * @param <T> Event type. This event should implement {@link NodeIdAware} 
interface.
  */
 public class StripedDisruptor<T extends NodeIdAware> {
+    /**
+     * It is an id that does not represent any node to batch events in one 
stripe although {@link NodeId} may vary.
+     * This is a cached event in case the disruptor supports batching,
+     * because the {@link DisruptorEventType#SUBSCRIBE} event might be a 
finale one and have to be handled.
+     */
+    private final NodeId FAKE_NODE_ID = new NodeId(null, null);
+
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(StripedDisruptor.class);
 
+    /** The map stores a matching node id for the stripe that sends messages 
to the node. */
+    private ConcurrentHashMap<NodeId, Integer> stripeMapper = new 
ConcurrentHashMap<>();
+
     /** The counter is used to generate the next stripe to subscribe to in 
order to be a round-robin hash. */
     private final AtomicInteger incrementalCounter = new AtomicInteger();
 
@@ -58,10 +72,10 @@ public class StripedDisruptor<T extends NodeIdAware> {
     /** Array of Ring buffer. It placed according to disruptors in the array. 
*/
     private final RingBuffer<T>[] queues;
 
-    /** Disruptor event handler array. It placed according to disruptors in 
the array.*/
+    /** Disruptor event handler array. It placed according to disruptors in 
the array. */
     private final ArrayList<StripeEntryHandler> eventHandlers;
 
-    /** Disruptor error handler array. It placed according to disruptors in 
the array.*/
+    /** Disruptor error handler array. It placed according to disruptors in 
the array. */
     private final ArrayList<StripeExceptionHandler> exceptionHandlers;
 
     /** Amount of stripes. */
@@ -72,12 +86,8 @@ public class StripedDisruptor<T extends NodeIdAware> {
 
     private final DisruptorMetrics metrics;
 
-    /**
-     * If {@code false}, this stripe will always pass {@code true} into {@link 
EventHandler#onEvent(Object, long, boolean)}.
-     * Otherwise, the data will be provided with batches.
-     */
-    // TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching 
tasks until IGNITE-15568 has fixed.
-    private final boolean supportsBatches;
+    /** If it is true, the disruptor batch shares across all subscribers. 
Otherwise, the batch sends for each one. */
+    private final boolean sharedStripe;
 
     /**
      * @param nodeName Name of the Ignite node.
@@ -85,8 +95,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
      * @param bufferSize Buffer size for each Disruptor.
      * @param eventFactory Event factory for the Striped disruptor.
      * @param stripes Amount of stripes.
-     * @param supportsBatches If {@code false}, this stripe will always pass 
{@code true} into
-     *      {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise, 
the data will be provided with batches.
+     * @param sharedStripe If it is true, the disruptor batch shares across 
all subscribers. Otherwise, the batch sends for each one.
      * @param useYieldStrategy If {@code true}, the yield strategy is to be 
used, otherwise the blocking strategy.
      * @param metrics Metrics.
      */
@@ -96,7 +105,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
             int bufferSize,
             EventFactory<T> eventFactory,
             int stripes,
-            boolean supportsBatches,
+            boolean sharedStripe,
             boolean useYieldStrategy,
             @Nullable DisruptorMetrics metrics
     ) {
@@ -107,7 +116,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
                 bufferSize,
                 eventFactory,
                 stripes,
-                supportsBatches,
+                sharedStripe,
                 useYieldStrategy,
                 metrics
         );
@@ -120,8 +129,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
      * @param bufferSize Buffer size for each Disruptor.
      * @param eventFactory Event factory for the Striped disruptor.
      * @param stripes Amount of stripes.
-     * @param supportsBatches If {@code false}, this stripe will always pass 
{@code true} into
-     *      {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise, 
the data will be provided with batches.
+     * @param sharedStripe If it is true, the disruptor batch shares across 
all subscribers. Otherwise, the batch sends for each one.
      * @param useYieldStrategy If {@code true}, the yield strategy is to be 
used, otherwise the blocking strategy.
      * @param raftMetrics Metrics.
      */
@@ -132,7 +140,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
             int bufferSize,
             EventFactory<T> eventFactory,
             int stripes,
-            boolean supportsBatches,
+            boolean sharedStripe,
             boolean useYieldStrategy,
             @Nullable DisruptorMetrics raftMetrics
     ) {
@@ -142,19 +150,19 @@ public class StripedDisruptor<T extends NodeIdAware> {
         exceptionHandlers = new ArrayList<>(stripes);
         this.stripes = stripes;
         this.name = NamedThreadFactory.threadPrefix(nodeName, poolName);
-        this.supportsBatches = supportsBatches;
+        this.sharedStripe = sharedStripe;
         this.metrics = raftMetrics;
 
         for (int i = 0; i < stripes; i++) {
             String stripeName = format("{}_stripe_{}", poolName, i);
 
             Disruptor<T> disruptor = DisruptorBuilder.<T>newInstance()
-                .setRingBufferSize(bufferSize)
-                .setEventFactory(eventFactory)
-                .setThreadFactory(threadFactorySupplier.apply(nodeName, 
stripeName))
-                .setProducerType(ProducerType.MULTI)
-                .setWaitStrategy(useYieldStrategy ? new YieldingWaitStrategy() 
: new BlockingWaitStrategy())
-                .build();
+                    .setRingBufferSize(bufferSize)
+                    .setEventFactory(eventFactory)
+                    .setThreadFactory(threadFactorySupplier.apply(nodeName, 
stripeName))
+                    .setProducerType(ProducerType.MULTI)
+                    .setWaitStrategy(useYieldStrategy ? new 
YieldingWaitStrategy() : new BlockingWaitStrategy())
+                    .build();
 
             eventHandlers.add(new StripeEntryHandler(i));
             exceptionHandlers.add(new StripeExceptionHandler(name));
@@ -172,8 +180,9 @@ public class StripedDisruptor<T extends NodeIdAware> {
      */
     public void shutdown() {
         for (int i = 0; i < stripes; i++) {
-            if (disruptors[i] != null)
+            if (disruptors[i] != null) {
                 disruptors[i].shutdown();
+            }
 
             // Help GC to collect unused resources.
             queues[i] = null;
@@ -185,8 +194,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
     }
 
     /**
-     * Subscribes an event handler to one stripe of the Striped disruptor.
-     * The stripe is determined by a group id.
+     * Subscribes an event handler to one stripe of the Striped disruptor. The 
stripe is determined by a group id.
      *
      * @param nodeId Node id.
      * @param handler Event handler for the group specified.
@@ -197,8 +205,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
     }
 
     /**
-     * Subscribes an event handler and a exception handler to one stripe of 
the Striped disruptor.
-     * The stripe is determined by a group id.
+     * Subscribes an event handler and a exception handler to one stripe of 
the Striped disruptor. The stripe is determined by a group id.
      *
      * @param nodeId Node id.
      * @param handler Event handler for the group specified.
@@ -210,10 +217,19 @@ public class StripedDisruptor<T extends NodeIdAware> {
 
         int stripeId = nextStripeToSubscribe();
 
-        eventHandlers.get(stripeId).subscribe(nodeId, handler);
+        stripeMapper.put(nodeId, stripeId);
 
-        if (exceptionHandler != null)
+        queues[stripeId].publishEvent((event, sequence) -> {
+            event.reset();
+
+            event.evtType = SUBSCRIBE;
+            event.nodeId = nodeId;
+            event.handler = (EventHandler<NodeIdAware>) handler;
+        });
+
+        if (exceptionHandler != null) {
             exceptionHandlers.get(stripeId).subscribe(nodeId, 
exceptionHandler);
+        }
 
         return queues[stripeId];
     }
@@ -228,29 +244,33 @@ public class StripedDisruptor<T extends NodeIdAware> {
 
         assert stripeId != -1 : "The replication group has not subscribed yet 
[nodeId=" + nodeId + "].";
 
-        eventHandlers.get(stripeId).unsubscribe(nodeId);
+        stripeMapper.remove(nodeId);
+
+        queues[stripeId].publishEvent((event, sequence) -> {
+            event.reset();
+
+            event.evtType = SUBSCRIBE;
+            event.nodeId = nodeId;
+            event.handler = null;
+        });
+
         exceptionHandlers.get(stripeId).unsubscribe(nodeId);
     }
 
     /**
-     * If the replication group is already subscribed, this method determines 
a stripe by a node id and returns a stripe number.
-     * If the replication group did not subscribed yet, this method returns 
{@code -1};
+     * If the replication group is already subscribed, this method determines 
a stripe by a node id and returns a stripe number. If the
+     * replication group did not subscribed yet, this method returns {@code 
-1};
      *
      * @param nodeId Node id.
      * @return Stripe of the Striped disruptor.
      */
     public int getStripe(NodeId nodeId) {
-        for (StripeEntryHandler handler : eventHandlers) {
-            if (handler.isSubscribed(nodeId)) {
-                return handler.stripeId;
-            }
-        }
-
-        return -1;
+        return stripeMapper.getOrDefault(nodeId, -1);
     }
 
     /**
      * Generates the next stripe number in a round-robin manner.
+     *
      * @return The stripe number.
      */
     private int nextStripeToSubscribe() {
@@ -258,28 +278,16 @@ public class StripedDisruptor<T extends NodeIdAware> {
     }
 
     /**
-     * Determines a Disruptor queue by a group id.
-     *
-     * @param nodeId Node id.
-     * @return Disruptor queue appropriate to the group.
-     */
-    public RingBuffer<T> queue(NodeId nodeId) {
-        int stripeId = getStripe(nodeId);
-
-        assert stripeId != -1 : "The replication group has not subscribed yet 
[nodeId=" + nodeId + "].";
-
-        return queues[stripeId];
-    }
-
-    /**
-     * Event handler for stripe of the Striped disruptor.
-     * It routs an event to the event handler for a group.
+     * Event handler for stripe of the Striped disruptor. It routes an event 
to the event handler for a group.
      */
     private class StripeEntryHandler implements EventHandler<T> {
-        private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers = 
new ConcurrentHashMap<>();
+        private final Map<NodeId, EventHandler<T>> subscribers = new 
HashMap<>();
+
+        /** The cache is used to correct handling the disruptor batch. */
+        private final Map<NodeId, T> eventCache = new HashMap<>();
 
-        /** Size of the batch that is currently being handled. */
-        private int currentBatchSize = 0;
+        /** Current batch sizes. */
+        private final Map<NodeId, Integer> currentBatchSizes = new HashMap<>();
 
         /** Stripe id. */
         private final int stripeId;
@@ -291,62 +299,75 @@ public class StripedDisruptor<T extends NodeIdAware> {
             this.stripeId = stripeId;
         }
 
-        /**
-         * Checks the replication group is subscribed to this stripe or not.
-         * @param nodeId Replication group node id.
-         * @return True if the group is subscribed, false otherwise.
-         */
-        public boolean isSubscribed(NodeId nodeId) {
-            return subscribers.containsKey(nodeId);
-        }
+        /** {@inheritDoc} */
+        @Override
+        public void onEvent(T event, long sequence, boolean endOfBatch) throws 
Exception {
+            if (event.evtType == SUBSCRIBE) {
+                if (event.handler == null) {
+                    subscribers.remove(event.nodeId());
+                } else {
+                    subscribers.put(event.nodeId(), (EventHandler<T>) 
event.handler);
+                }
+            } else {
+                internalBatching(event, sequence);
+            }
 
-        /**
-         * Subscribes a group to appropriate events for it.
-         *
-         * @param nodeId Node id.
-         * @param handler Event handler for the group specified.
-         */
-        void subscribe(NodeId nodeId, EventHandler<T> handler) {
-            subscribers.put(nodeId, handler);
+            if (endOfBatch) {
+                for (Map.Entry<NodeId, T> grpEvent : eventCache.entrySet()) {
+                    EventHandler<T> grpHandler = 
subscribers.get(grpEvent.getValue().nodeId());
+
+                    if (grpHandler != null) {
+                        if (metrics != null && metrics.enabled()) {
+                            metrics.hitToStripe(stripeId);
+
+                            
metrics.addBatchSize(currentBatchSizes.getOrDefault(grpEvent.getKey(), 0) + 1);
+                        }
+
+                        grpHandler.onEvent(grpEvent.getValue(), sequence, 
true);
+                    }
+                }
+
+                currentBatchSizes.clear();
+                eventCache.clear();
+            }
         }
 
         /**
-         * Unsubscribes a group for any event.
+         * Processes the event with intermediate cache to batch internally for 
each subscriber for the stripe.
          *
-         * @param nodeId Node id.
+         * @param event Disruptor event to process.
+         * @param sequence Number in the sequence of the element.
+         * @throws Exception Throw when some handler fails.
          */
-        void unsubscribe(NodeId nodeId) {
-            subscribers.remove(nodeId);
-        }
+        private void internalBatching(T event, long sequence) throws Exception 
{
+            NodeId pushNodeId = sharedStripe ? FAKE_NODE_ID : event.nodeId();
 
-        /** {@inheritDoc} */
-        @Override public void onEvent(T event, long sequence, boolean 
endOfBatch) throws Exception {
-            EventHandler<T> handler = subscribers.get(event.nodeId());
+            T prevEvent = eventCache.put(pushNodeId, event);
 
-            // TODO: IGNITE-20536 Need to add assert that handler is not null 
and to implement a no-op handler.
-            if (handler != null) {
-                if (metrics != null && metrics.enabled()) {
-                    metrics.hitToStripe(stripeId);
+            if (prevEvent != null) {
+                EventHandler<T> grpHandler = 
subscribers.get(prevEvent.nodeId());
+
+                if (grpHandler != null) {
+                    if (metrics != null && metrics.enabled()) {
+                        metrics.hitToStripe(stripeId);
 
-                    if (endOfBatch) {
-                        metrics.addBatchSize(currentBatchSize + 1);
+                        currentBatchSizes.compute(pushNodeId, (nodeId, cnt) -> 
{
+                            if (cnt == null) {
+                                return 1;
+                            }
 
-                        currentBatchSize = 0;
-                    } else {
-                        currentBatchSize ++;
+                            return cnt + 1;
+                        });
                     }
-                }
 
-                handler.onEvent(event, sequence, endOfBatch || 
subscribers.size() > 1 && !supportsBatches);
-            } else {
-                LOG.warn(format("Group of the event is unsupported [nodeId={}, 
event={}]", event.nodeId(), event));
+                    grpHandler.onEvent(prevEvent, sequence, false);
+                }
             }
         }
     }
 
     /**
-     * Striped disruptor exception handler.
-     * It prints into log when an exception has occurred and route it to the 
handler for group.
+     * Striped disruptor exception handler. It prints into log when an 
exception has occurred and route it to the handler for group.
      */
     private class StripeExceptionHandler implements ExceptionHandler<T> {
         /** Name of the Disruptor instance. */
@@ -383,31 +404,36 @@ public class StripedDisruptor<T extends NodeIdAware> {
         }
 
         /** {@inheritDoc} */
-        @Override public void handleOnStartException(Throwable ex) {
+        @Override
+        public void handleOnStartException(Throwable ex) {
             LOG.error("Fail to start disruptor [name={}]", ex, name);
         }
 
         /** {@inheritDoc} */
-        @Override public void handleOnShutdownException(Throwable ex) {
+        @Override
+        public void handleOnShutdownException(Throwable ex) {
             LOG.error("Fail to shutdown disruptor [name={}]", ex, name);
 
         }
 
         /** {@inheritDoc} */
-        @Override public void handleEventException(Throwable ex, long 
sequence, T event) {
+        @Override
+        public void handleEventException(Throwable ex, long sequence, T event) 
{
             NodeId nodeId = event.nodeId();
 
             BiConsumer<T, Throwable> handler = nodeId == null ? null : 
subscribers.get(nodeId);
 
             LOG.error("Handle disruptor event error [name={}, event={}, 
hasHandler={}]", ex, name, event, handler != null);
 
-            if (handler != null)
+            if (handler != null) {
                 handler.accept(event, ex);
+            }
         }
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
+    @Override
+    public String toString() {
         return format("{} [name={}]", StripedDisruptor.class.getSimpleName(), 
name);
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 49e72c4f1f..7952f03ff3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
 import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
@@ -103,20 +104,14 @@ public class LogManagerImpl implements LogManager {
         LAST_LOG_ID // get last log id
     }
 
-    public static class StableClosureEvent implements NodeIdAware {
-        /** Raft node id. */
-        NodeId nodeId;
-
+    public static class StableClosureEvent extends NodeIdAware {
         StableClosure done;
         EventType type;
 
         @Override
-        public NodeId nodeId() {
-            return nodeId;
-        }
+        public void reset() {
+            super.reset();
 
-        void reset() {
-            this.nodeId = null;
             this.done = null;
             this.type = null;
         }
@@ -219,6 +214,7 @@ public class LogManagerImpl implements LogManager {
         this.shutDownLatch = new CountDownLatch(1);
         Utils.runInThread(nodeOptions.getCommonExecutor(), () -> 
this.diskQueue.publishEvent((event, sequence) -> {
             event.reset();
+
             event.nodeId = this.nodeId;
             event.type = EventType.SHUTDOWN;
         }));
@@ -336,6 +332,7 @@ public class LogManagerImpl implements LogManager {
             // publish event out of lock
             this.diskQueue.publishEvent((event, sequence) -> {
               event.reset();
+
               event.nodeId = this.nodeId;
               event.type = EventType.OTHER;
               event.done = done;
@@ -362,6 +359,7 @@ public class LogManagerImpl implements LogManager {
         }
         this.diskQueue.publishEvent((event, sequence) -> {
             event.reset();
+
             event.nodeId = this.nodeId;
             event.type = type;
             event.done = done;
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index a6c78dc8b8..cda112fd14 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -17,15 +17,25 @@
 
 package org.apache.ignite.disruptor;
 
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
 import com.lmax.disruptor.RingBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -34,6 +44,8 @@ import 
org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Tests for striped disruptor.
@@ -70,11 +82,15 @@ public class StripedDisruptorTest extends 
IgniteAbstractTest {
             int finalInt = i;
 
             taskQueue1.tryPublishEvent((event, sequence) -> {
+                event.reset();
+
                 event.nodeId = nodeId1;
                 event.num = finalInt;
             });
 
             taskQueue2.tryPublishEvent((event, sequence) -> {
+                event.reset();
+
                 event.nodeId = nodeId2;
                 event.num = finalInt;
             });
@@ -117,6 +133,8 @@ public class StripedDisruptorTest extends 
IgniteAbstractTest {
             int finalInt = i;
 
             taskQueue.publishEvent((event, sequence) -> {
+                event.reset();
+
                 event.nodeId = nodeId;
                 event.num = finalInt;
             });
@@ -174,6 +192,234 @@ public class StripedDisruptorTest extends 
IgniteAbstractTest {
         }
     }
 
+    @Test
+    public void testOneSubscriberBatching() throws Exception {
+        Random random = new Random();
+
+        int stripes = random.nextInt(20) + 1;
+
+        StripedDisruptor<NodeIdAwareTestObj> disruptor = new 
StripedDisruptor<>("test", "test-disruptor",
+                16384,
+                NodeIdAwareTestObj::new,
+                stripes,
+                false,
+                false,
+                null);
+
+        HashMap<NodeId, RingBuffer<NodeIdAwareTestObj>> queues = new 
HashMap<>();
+        GroupAwareTestObjHandler[] handlers = new 
GroupAwareTestObjHandler[stripes];
+        NodeId[] nodesIds = new NodeId[stripes];
+
+        for (int i = 0; i < stripes; i++) {
+            GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+            var nodeId = new NodeId("grp", new PeerId(String.valueOf(i)));
+
+            queues.put(nodeId, disruptor.subscribe(nodeId, handler));
+            handlers[i] = handler;
+            nodesIds[i] = nodeId;
+        }
+
+        int batchSize = random.nextInt(50) + 1;
+
+        for (NodeId nodeId : nodesIds) {
+            EventTranslator<NodeIdAwareTestObj>[] eventTranslators = new 
EventTranslator[batchSize];
+
+            for (int i = 0; i < batchSize; i++) {
+                int finalI = i;
+
+                eventTranslators[i] = (event, sequence) -> {
+                    event.reset();
+
+                    event.nodeId = nodeId;
+                    event.num = finalI;
+                };
+            }
+
+            queues.get(nodeId).publishEvents(eventTranslators);
+        }
+
+        assertTrue(IgniteTestUtils.waitForCondition(() -> {
+            for (GroupAwareTestObjHandler handler : handlers) {
+                if (handler.applied != batchSize || handler.batchesApplied != 
1) {
+                    return false;
+                }
+            }
+
+            return true;
+        }, 10_000));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testMultipleSubscriberBatching(boolean supportBatching) throws 
Exception {
+        Random random = new Random();
+
+        int totalHandlers = random.nextInt(20) + 1;
+
+        StripedDisruptor<NodeIdAwareTestObj> disruptor = new 
StripedDisruptor<>("test", "test-disruptor",
+                16384,
+                NodeIdAwareTestObj::new,
+                1,
+                supportBatching,
+                false,
+                null);
+
+        RingBuffer<NodeIdAwareTestObj> queue = null;
+        GroupAwareTestObjHandler[] handlers = new 
GroupAwareTestObjHandler[totalHandlers];
+        NodeId[] nodesIds = new NodeId[totalHandlers];
+
+        for (int i = 0; i < totalHandlers; i++) {
+            GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+            var nodeId = new NodeId("grp", new PeerId(String.valueOf(i)));
+
+            // Any queue can use here, because the striped disruptor has the 
only stripe.
+            queue = disruptor.subscribe(nodeId, handler);
+
+            handlers[i] = handler;
+            nodesIds[i] = nodeId;
+
+            assertEquals(0, disruptor.getStripe(nodeId));
+        }
+
+        int batchSize = random.nextInt(50) + 1;
+
+        EventTranslator<NodeIdAwareTestObj>[] eventTranslators = new 
EventTranslator[totalHandlers * batchSize];
+
+        for (int i = 0; i < totalHandlers * batchSize; i++) {
+            int finalI = i;
+
+            eventTranslators[i] = (event, sequence) -> {
+                event.reset();
+
+                event.nodeId = nodesIds[finalI % totalHandlers];
+                event.num = finalI;
+            };
+        }
+
+        queue.publishEvents(eventTranslators);
+
+        if (supportBatching) {
+            assertTrue(IgniteTestUtils.waitForCondition(() -> {
+                int batchCommited = 0;
+
+                for (GroupAwareTestObjHandler handler : handlers) {
+                    if (handler.applied == batchSize && handler.batchesApplied 
== 1) {
+                        batchCommited++;
+                    }
+                }
+
+                return batchCommited == 1;
+            }, 10_000));
+        } else {
+            assertTrue(IgniteTestUtils.waitForCondition(() -> {
+                for (GroupAwareTestObjHandler handler : handlers) {
+                    if (handler.applied != batchSize || handler.batchesApplied 
!= 1) {
+                        return false;
+                    }
+                }
+
+                return true;
+            }, 10_000));
+        }
+    }
+
+    @Test
+    public void testConcurrentSubscribe() throws Exception {
+        Random random = new Random();
+
+        int totalHandlers = random.nextInt(20) + 1;
+
+        StripedDisruptor<NodeIdAwareTestObj> disruptor = new 
StripedDisruptor<>("test", "test-disruptor",
+                16384,
+                NodeIdAwareTestObj::new,
+                1,
+                false,
+                false,
+                null);
+
+        RingBuffer<NodeIdAwareTestObj> queue = null;
+        GroupAwareTestObjHandler[] handlers = new 
GroupAwareTestObjHandler[totalHandlers];
+        NodeId[] nodesIds = new NodeId[totalHandlers];
+
+        for (int i = 0; i < totalHandlers; i++) {
+            GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+            var nodeId = new NodeId("grp", new PeerId(String.valueOf(i)));
+
+            // Any queue can use here, because the striped disruptor has the 
only stripe.
+            queue = disruptor.subscribe(nodeId, handler);
+
+            handlers[i] = handler;
+            nodesIds[i] = nodeId;
+
+            assertEquals(0, disruptor.getStripe(nodeId));
+        }
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        int unstableSubscriberNumber = random.nextInt(totalHandlers);
+        NodeId unstableSubscriber = nodesIds[unstableSubscriberNumber];
+        GroupAwareTestObjHandler unstableSubscriberHandler = 
handlers[unstableSubscriberNumber];
+
+        CompletableFuture<Void> stopTreadCompleted = 
IgniteTestUtils.runAsync(() -> {
+            while (!stop.get()) {
+                if (disruptor.getStripe(unstableSubscriber) == -1) {
+                    disruptor.subscribe(unstableSubscriber, 
unstableSubscriberHandler);
+                } else {
+                    disruptor.unsubscribe(unstableSubscriber);
+                }
+            }
+        });
+
+        Map<NodeId, Integer> appliedMap = new HashMap<>(totalHandlers);
+
+        for (int iter = 0; iter < 100_000; iter++) {
+            int batchSize = random.nextInt(10) + 1;
+
+            List<EventTranslator<NodeIdAwareTestObj>> eventTranslators = 
random.ints(
+                    batchSize,
+                    0,
+                    totalHandlers
+            ).mapToObj(value -> {
+                appliedMap.compute(nodesIds[value], (nodeId, count) -> {
+                    if (count == null) {
+                        return 1;
+                    }
+                    return count + 1;
+                });
+
+                return (EventTranslator<NodeIdAwareTestObj>) (event, sequence) 
-> {
+                    event.reset();
+
+                    event.nodeId = nodesIds[value];
+                    event.num = value;
+                };
+            }).collect(Collectors.toList());
+
+            queue.publishEvents(eventTranslators.toArray(new 
EventTranslator[0]));
+        }
+
+        stop.set(true);
+
+        assertThat(stopTreadCompleted, willCompleteSuccessfully());
+
+        assertTrue(IgniteTestUtils.waitForCondition(() -> {
+            for (int i = 0; i < totalHandlers; i++) {
+                if (i == unstableSubscriberNumber) {
+                    continue;
+                }
+
+                if (handlers[i].applied != 
appliedMap.getOrDefault(nodesIds[i], 0)) {
+                    return false;
+                }
+            }
+
+            return true;
+        }, 10_000));
+    }
+
     /** Group event handler. */
     private static class GroupAwareTestObjHandler implements 
EventHandler<NodeIdAwareTestObj> {
         /** This is a container for the batch events. */
@@ -182,6 +428,9 @@ public class StripedDisruptorTest extends 
IgniteAbstractTest {
         /** Counter of applied events. */
         int applied = 0;
 
+        /** Amount of applied batches. */
+        int batchesApplied = 0;
+
         /** {@inheritDoc} */
         @Override
         public void onEvent(NodeIdAwareTestObj event, long sequence, boolean 
endOfBatch) {
@@ -190,6 +439,8 @@ public class StripedDisruptorTest extends 
IgniteAbstractTest {
             if (endOfBatch) {
                 applied += batch.size();
 
+                batchesApplied++;
+
                 batch.clear();
             }
         }
@@ -198,16 +449,8 @@ public class StripedDisruptorTest extends 
IgniteAbstractTest {
     /**
      * Group aware object implementation to test the striped disruptor.
      */
-    private static class NodeIdAwareTestObj implements NodeIdAware {
-        /** Node id. */
-        NodeId nodeId;
-
+    private static class NodeIdAwareTestObj extends NodeIdAware {
         /** Any integer number. */
         int num;
-
-        @Override
-        public NodeId nodeId() {
-            return nodeId;
-        }
     }
 }

Reply via email to