This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 654a11a4a0 IGNITE-22128 Balancing partitions across stripes (#3690)
654a11a4a0 is described below

commit 654a11a4a00919ce2b3ff4ca0e59e83377124dea
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri May 3 13:25:49 2024 +0300

    IGNITE-22128 Balancing partitions across stripes (#3690)
---
 .../raft/storage/impl/StripeAwareLogManager.java   | 21 ++++---
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  7 ++-
 .../raft/jraft/disruptor/StripedDisruptor.java     | 69 ++++++++++++++++++----
 .../raft/jraft/option/LogManagerOptions.java       | 11 ++++
 .../ignite/disruptor/StripedDisruptorTest.java     | 50 ++++++++++++++++
 5 files changed, 131 insertions(+), 27 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
index 4c7efa1ab5..66c77c3c71 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
@@ -45,7 +45,7 @@ public class StripeAwareLogManager extends LogManagerImpl {
     private LogStorage logStorage;
 
     /** Stripe, that corresponds to the current log storage instance. */
-    private final Stripe stripe;
+    private Stripe stripe;
 
     /** Size threshold of log entries list, that will trigger the flush upon 
the excess. */
     private int maxAppendBufferSize;
@@ -56,15 +56,6 @@ public class StripeAwareLogManager extends LogManagerImpl {
      */
     private boolean sharedLogStorage;
 
-    /**
-     * Constructor.
-     *
-     * @param stripe Stripe that corresponds to a worker thread in {@link 
LogManagerOptions#getLogManagerDisruptor()}.
-     */
-    public StripeAwareLogManager(Stripe stripe) {
-        this.stripe = stripe;
-    }
-
     @Override
     public boolean init(LogManagerOptions opts) {
         LogStorage logStorage = opts.getLogStorage();
@@ -73,7 +64,15 @@ public class StripeAwareLogManager extends LogManagerImpl {
         this.logStorage = logStorage;
         this.maxAppendBufferSize = 
opts.getRaftOptions().getMaxAppendBufferSize();
 
-        return super.init(opts);
+        boolean isInitSuccessfully = super.init(opts);
+
+        int stripe = 
opts.getLogManagerDisruptor().getStripe(opts.getNode().getNodeId());
+
+        assert stripe != -1;
+
+        this.stripe = opts.getLogStripes().get(stripe);
+
+        return isInitSuccessfully;
     }
 
     /**
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 963d25b6eb..b41929d240 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -611,9 +611,9 @@ public class NodeImpl implements Node, RaftServerService {
     private boolean initLogStorage() {
         Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
         this.logStorage = 
this.serviceFactory.createLogStorage(this.options.getLogUri(), 
this.raftOptions);
-        int stripe = options.getLogManagerDisruptor().getStripe(getNodeId());
-        this.logManager = new 
StripeAwareLogManager(options.getLogStripes().get(stripe));
-        final LogManagerOptions opts = new LogManagerOptions();
+        this.logManager = new StripeAwareLogManager();
+
+        LogManagerOptions opts = new LogManagerOptions();
         
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
         opts.setLogStorage(this.logStorage);
         opts.setConfigurationManager(this.configManager);
@@ -622,6 +622,7 @@ public class NodeImpl implements Node, RaftServerService {
         opts.setNodeMetrics(this.metrics);
         opts.setRaftOptions(this.raftOptions);
         opts.setLogManagerDisruptor(options.getLogManagerDisruptor());
+        opts.setLogStripes(options.getLogStripes());
 
         return this.logManager.init(opts);
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index bb27f01514..0010003dce 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -29,6 +29,7 @@ import com.lmax.disruptor.dsl.ProducerType;
 import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -48,6 +49,9 @@ public class StripedDisruptor<T extends NodeIdAware> {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(StripedDisruptor.class);
 
+    /** The counter is used to generate the next stripe to subscribe to in 
order to be a round-robin hash. */
+    private final AtomicInteger incrementalCounter = new AtomicInteger();
+
     /** Array of disruptors. Each Disruptor in the appropriate stripe. */
     private final Disruptor<T>[] disruptors;
 
@@ -152,7 +156,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
                 .setWaitStrategy(useYieldStrategy ? new YieldingWaitStrategy() 
: new BlockingWaitStrategy())
                 .build();
 
-            eventHandlers.add(new StripeEntryHandler());
+            eventHandlers.add(new StripeEntryHandler(i));
             exceptionHandlers.add(new StripeExceptionHandler(name));
 
             disruptor.handleEventsWith(eventHandlers.get(i));
@@ -202,12 +206,16 @@ public class StripedDisruptor<T extends NodeIdAware> {
      * @return Disruptor queue appropriate to the group.
      */
     public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, 
BiConsumer<T, Throwable> exceptionHandler) {
-        eventHandlers.get(getStripe(nodeId)).subscribe(nodeId, handler);
+        assert getStripe(nodeId) == -1 : "The double subscriber for the one 
replication group [nodeId=" + nodeId + "].";
+
+        int stripeId = nextStripeToSubscribe();
+
+        eventHandlers.get(stripeId).subscribe(nodeId, handler);
 
         if (exceptionHandler != null)
-            exceptionHandlers.get(getStripe(nodeId)).subscribe(nodeId, 
exceptionHandler);
+            exceptionHandlers.get(stripeId).subscribe(nodeId, 
exceptionHandler);
 
-        return queues[getStripe(nodeId)];
+        return queues[stripeId];
     }
 
     /**
@@ -216,18 +224,37 @@ public class StripedDisruptor<T extends NodeIdAware> {
      * @param nodeId Node id.
      */
     public void unsubscribe(NodeId nodeId) {
-        eventHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
-        exceptionHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
+        int stripeId = getStripe(nodeId);
+
+        assert stripeId != -1 : "The replication group has not subscribed yet 
[nodeId=" + nodeId + "].";
+
+        eventHandlers.get(stripeId).unsubscribe(nodeId);
+        exceptionHandlers.get(stripeId).unsubscribe(nodeId);
     }
 
     /**
-     * Determines a stripe by a node id and returns a stripe number.
+     * If the replication group is already subscribed, this method determines 
a stripe by a node id and returns a stripe number.
+     * If the replication group did not subscribed yet, this method returns 
{@code -1};
      *
      * @param nodeId Node id.
      * @return Stripe of the Striped disruptor.
      */
     public int getStripe(NodeId nodeId) {
-        return Math.abs(nodeId.hashCode() % stripes);
+        for (StripeEntryHandler handler : eventHandlers) {
+            if (handler.isSubscribed(nodeId)) {
+                return handler.stripeId;
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * Generates the next stripe number in a round-robin manner.
+     * @return The stripe number.
+     */
+    private int nextStripeToSubscribe() {
+        return Math.abs(incrementalCounter.getAndIncrement() % stripes);
     }
 
     /**
@@ -237,7 +264,11 @@ public class StripedDisruptor<T extends NodeIdAware> {
      * @return Disruptor queue appropriate to the group.
      */
     public RingBuffer<T> queue(NodeId nodeId) {
-        return queues[getStripe(nodeId)];
+        int stripeId = getStripe(nodeId);
+
+        assert stripeId != -1 : "The replication group has not subscribed yet 
[nodeId=" + nodeId + "].";
+
+        return queues[stripeId];
     }
 
     /**
@@ -245,16 +276,28 @@ public class StripedDisruptor<T extends NodeIdAware> {
      * It routs an event to the event handler for a group.
      */
     private class StripeEntryHandler implements EventHandler<T> {
-        private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers;
+        private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers = 
new ConcurrentHashMap<>();
 
         /** Size of the batch that is currently being handled. */
         private int currentBatchSize = 0;
 
+        /** Stripe id. */
+        private final int stripeId;
+
         /**
          * The constructor.
          */
-        StripeEntryHandler() {
-            subscribers = new ConcurrentHashMap<>();
+        StripeEntryHandler(int stripeId) {
+            this.stripeId = stripeId;
+        }
+
+        /**
+         * Checks the replication group is subscribed to this stripe or not.
+         * @param nodeId Replication group node id.
+         * @return True if the group is subscribed, false otherwise.
+         */
+        public boolean isSubscribed(NodeId nodeId) {
+            return subscribers.containsKey(nodeId);
         }
 
         /**
@@ -283,7 +326,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
             // TODO: IGNITE-20536 Need to add assert that handler is not null 
and to implement a no-op handler.
             if (handler != null) {
                 if (metrics != null && metrics.enabled()) {
-                    metrics.hitToStripe(getStripe(event.nodeId()));
+                    metrics.hitToStripe(stripeId);
 
                     if (endOfBatch) {
                         metrics.addBatchSize(currentBatchSize + 1);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
index fb859f37d8..53640ab7cc 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.raft.jraft.option;
 
+import java.util.List;
+import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
@@ -38,6 +40,15 @@ public class LogManagerOptions {
     private NodeMetrics nodeMetrics;
     private LogEntryCodecFactory logEntryCodecFactory = 
LogEntryV1CodecFactory.getInstance();
     private StripedDisruptor<LogManagerImpl.StableClosureEvent> 
logManagerDisruptor;
+    private List<Stripe> logStripes;
+
+    public void setLogStripes(List<Stripe> logStripes) {
+            this.logStripes = logStripes;
+    }
+
+    public List<Stripe> getLogStripes() {
+        return this.logStripes;
+    }
 
     public StripedDisruptor<LogManagerImpl.StableClosureEvent> 
getLogManagerDisruptor() {
         return logManagerDisruptor;
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index 8105dff254..06dd0614b8 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.disruptor;
 
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
 import java.util.ArrayList;
+import java.util.Random;
+import java.util.UUID;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -124,6 +127,53 @@ public class StripedDisruptorTest extends 
IgniteAbstractTest {
         disruptor.shutdown();
     }
 
+    /**
+     * Checks the distribution of subscribed handlers across stripes.
+     * The distribution algorithm has to distribute handlers as evenly as 
possible using the round-robin algorithm.
+     */
+    @Test
+    public void tesDistributionHandlers() {
+        Random random = new Random();
+
+        int stripes = random.nextInt(20);
+
+        StripedDisruptor<NodeIdAwareTestObj> disruptor = new 
StripedDisruptor<>("test", "test-disruptor",
+                16384,
+                NodeIdAwareTestObj::new,
+                stripes,
+                false,
+                false,
+                null);
+
+        int handlers = random.nextInt(100);
+
+        log.info("Handlers will be distributed across stripes [handlers={}, 
stripes={}]", handlers, stripes);
+
+        int[] distribution = new int[stripes];
+
+        for (int i = 0; i < handlers; i++) {
+            GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+            var nodeId = new NodeId("grp", new 
PeerId(UUID.randomUUID().toString()));
+
+            disruptor.subscribe(nodeId, handler);
+
+            int stripe = disruptor.getStripe(nodeId);
+
+            assertNotEquals(-1, stripe);
+
+            distribution[stripe]++;
+        }
+
+        log.info("Result distribution [distribution={}]", distribution);
+
+        int reference = distribution[0];
+
+        for (int i = 1; i < stripes; i++) {
+            assertTrue(distribution[i] == reference || distribution[i] + 1 == 
reference || distribution[i] - 1 == reference);
+        }
+    }
+
     /** Group event handler. */
     private static class GroupAwareTestObjHandler implements 
EventHandler<NodeIdAwareTestObj> {
         /** This is a container for the batch events. */

Reply via email to