This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ced0ebba09 IGNITE-15568 Striped Disruptor doesn't work with JRaft
event handlers properly (#3811)
ced0ebba09 is described below
commit ced0ebba0969ad1b75ee94ca5a252aef15d97955
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon May 27 18:11:43 2024 +0300
IGNITE-15568 Striped Disruptor doesn't work with JRaft event handlers
properly (#3811)
---
.../ignite/raft/jraft/core/FSMCallerImpl.java | 32 ++-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 18 +-
.../raft/jraft/core/ReadOnlyServiceImpl.java | 23 +-
.../raft/jraft/disruptor/DisruptorBuilder.java | 7 +-
.../{NodeIdAware.java => DisruptorEventType.java} | 20 +-
.../ignite/raft/jraft/disruptor/NodeIdAware.java | 26 +-
.../raft/jraft/disruptor/StripedDisruptor.java | 234 ++++++++++--------
.../raft/jraft/storage/impl/LogManagerImpl.java | 16 +-
.../ignite/disruptor/StripedDisruptorTest.java | 261 ++++++++++++++++++++-
9 files changed, 469 insertions(+), 168 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index a157eee51c..73843ef562 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
@@ -97,10 +98,7 @@ public class FSMCallerImpl implements FSMCaller {
/**
* Apply task for disruptor.
*/
- public static class ApplyTask implements NodeIdAware {
- /** Raft node id. */
- NodeId nodeId;
-
+ public static class ApplyTask extends NodeIdAware {
public TaskType type;
// union fields
public long committedIndex;
@@ -111,11 +109,9 @@ public class FSMCallerImpl implements FSMCaller {
public CountDownLatch shutdownLatch;
@Override
- public NodeId nodeId() {
- return nodeId;
- }
-
public void reset() {
+ super.reset();
+
this.type = null;
this.committedIndex = 0;
this.term = 0;
@@ -123,7 +119,6 @@ public class FSMCallerImpl implements FSMCaller {
this.leaderChangeCtx = null;
this.done = null;
this.shutdownLatch = null;
- this.nodeId = null;
}
}
@@ -205,6 +200,7 @@ public class FSMCallerImpl implements FSMCaller {
Utils.runInThread(this.node.getOptions().getCommonExecutor(), ()
-> this.taskQueue.publishEvent((task, sequence) -> {
task.reset();
+
task.nodeId = this.nodeId;
task.type = TaskType.SHUTDOWN;
task.shutdownLatch = latch;
@@ -237,6 +233,8 @@ public class FSMCallerImpl implements FSMCaller {
public boolean onCommitted(final long committedIndex) {
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.COMMITTED;
task.committedIndex = committedIndex;
});
@@ -250,6 +248,8 @@ public class FSMCallerImpl implements FSMCaller {
final CountDownLatch latch = new CountDownLatch(1);
enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.FLUSH;
task.shutdownLatch = latch;
});
@@ -260,6 +260,8 @@ public class FSMCallerImpl implements FSMCaller {
public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.SNAPSHOT_LOAD;
task.done = done;
});
@@ -269,6 +271,8 @@ public class FSMCallerImpl implements FSMCaller {
public boolean onSnapshotSave(final SaveSnapshotClosure done) {
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.SNAPSHOT_SAVE;
task.done = done;
});
@@ -278,6 +282,8 @@ public class FSMCallerImpl implements FSMCaller {
public boolean onLeaderStop(final Status status) {
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
@@ -287,6 +293,8 @@ public class FSMCallerImpl implements FSMCaller {
public boolean onLeaderStart(final long term) {
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.LEADER_START;
task.term = term;
});
@@ -296,6 +304,8 @@ public class FSMCallerImpl implements FSMCaller {
public boolean onStartFollowing(final LeaderChangeContext ctx) {
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.START_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(),
ctx.getTerm(), ctx.getStatus());
});
@@ -305,6 +315,8 @@ public class FSMCallerImpl implements FSMCaller {
public boolean onStopFollowing(final LeaderChangeContext ctx) {
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.STOP_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(),
ctx.getTerm(), ctx.getStatus());
});
@@ -343,6 +355,8 @@ public class FSMCallerImpl implements FSMCaller {
final OnErrorClosure c = new OnErrorClosure(error);
return enqueueTask((task, sequence) -> {
task.nodeId = this.nodeId;
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.type = TaskType.ERROR;
task.done = c;
});
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index b41929d240..9d836a27b2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -65,7 +65,8 @@ import
org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
-import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType
+;import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.Ballot;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
@@ -259,22 +260,16 @@ public class NodeImpl implements Node, RaftServerService {
/**
* Node service event.
*/
- public static class LogEntryAndClosure implements NodeIdAware {
- /** Raft node id. */
- NodeId nodeId;
-
+ public static class LogEntryAndClosure extends NodeIdAware {
LogEntry entry;
Closure done;
long expectedTerm;
CountDownLatch shutdownLatch;
@Override
- public NodeId nodeId() {
- return nodeId;
- }
-
public void reset() {
- this.nodeId = null;
+ super.reset();
+
this.entry = null;
this.done = null;
this.expectedTerm = 0;
@@ -1849,6 +1844,7 @@ public class NodeImpl implements Node, RaftServerService {
final EventTranslator<LogEntryAndClosure> translator = (event,
sequence) -> {
event.reset();
+
event.nodeId = getNodeId();
event.done = task.getDone();
event.entry = entry;
@@ -3128,6 +3124,8 @@ public class NodeImpl implements Node, RaftServerService {
Utils.runInThread(this.getOptions().getCommonExecutor(),
() -> this.applyQueue.publishEvent((event, sequence)
-> {
event.nodeId = getNodeId();
+ event.handler = null;
+ event.evtType = DisruptorEventType.REGULAR;
event.shutdownLatch = latch;
}));
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index f97eea2ae2..4914833ce9 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.raft.jraft.FSMCaller.LastAppliedLogIndexListener;
import org.apache.ignite.raft.jraft.ReadOnlyService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.NodeId;
@@ -86,27 +87,21 @@ public class ReadOnlyServiceImpl implements
ReadOnlyService, LastAppliedLogIndex
private static final IgniteLogger LOG =
Loggers.forClass(ReadOnlyServiceImpl.class);
- public static class ReadIndexEvent implements NodeIdAware {
- /** Raft node id. */
- NodeId nodeId;
-
+ public static class ReadIndexEvent extends NodeIdAware {
Bytes requestContext;
ReadIndexClosure done;
CountDownLatch shutdownLatch;
long startTime;
- private void reset() {
- this.nodeId = null;
+ @Override
+ public void reset() {
+ super.reset();
+
this.requestContext = null;
this.done = null;
this.shutdownLatch = null;
this.startTime = 0L;
}
-
- @Override
- public NodeId nodeId() {
- return nodeId;
- }
}
private static class ReadIndexEventFactory implements
EventFactory<ReadIndexEvent> {
@@ -309,6 +304,8 @@ public class ReadOnlyServiceImpl implements
ReadOnlyService, LastAppliedLogIndex
Utils.runInThread(this.node.getOptions().getCommonExecutor(),
() -> this.readIndexQueue.publishEvent((event, sequence) -> {
event.nodeId = this.node.getNodeId();
+ event.handler = null;
+ event.evtType = DisruptorEventType.REGULAR;
event.shutdownLatch = this.shutdownLatch;
}));
}
@@ -332,6 +329,8 @@ public class ReadOnlyServiceImpl implements
ReadOnlyService, LastAppliedLogIndex
try {
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
event.nodeId = this.node.getNodeId();
+ event.handler = null;
+ event.evtType = DisruptorEventType.REGULAR;
event.done = closure;
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
@@ -417,6 +416,8 @@ public class ReadOnlyServiceImpl implements
ReadOnlyService, LastAppliedLogIndex
final CountDownLatch latch = new CountDownLatch(1);
this.readIndexQueue.publishEvent((task, sequence) -> {
task.nodeId = this.node.getNodeId();
+ task.handler = null;
+ task.evtType = DisruptorEventType.REGULAR;
task.shutdownLatch = latch;
});
latch.await();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
index a35392eaa2..bcc799d8bc 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
@@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.raft.jraft.disruptor;
-import java.util.concurrent.ThreadFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import java.util.concurrent.ThreadFactory;
import org.apache.ignite.raft.jraft.util.Requires;
/**
@@ -91,8 +92,6 @@ public class DisruptorBuilder<T> {
Requires.requireNonNull(this.eventFactory, "Event factory not set");
Requires.requireNonNull(this.threadFactory, "Thread factory not set");
- return new Disruptor<>(this.eventFactory, this.ringBufferSize,
this.threadFactory, this.producerType,
- this.waitStrategy);
+ return new Disruptor<>(this.eventFactory, this.ringBufferSize,
this.threadFactory, this.producerType, this.waitStrategy);
}
-
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorEventType.java
similarity index 65%
copy from
modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
copy to
modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorEventType.java
index fd91823dee..62040606c2 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorEventType.java
@@ -17,17 +17,21 @@
package org.apache.ignite.raft.jraft.disruptor;
+import com.lmax.disruptor.EventHandler;
import org.apache.ignite.raft.jraft.entity.NodeId;
/**
- * Interface provides Raft node id.
- * It allows to determine a stripe in Striped disruptor.
+ * There are different types of striped disruptor events. The disruptor uses
some events for technical purposes,
+ * so it is necessary to distinguish types.
*/
-public interface NodeIdAware {
+public enum DisruptorEventType {
/**
- * Gets a Raft node id.
- *
- * @return Raft node id.
- */
- NodeId nodeId();
+ * This event type matches the technical striped disruptor event, look at
{@link StripedDisruptor#subscribe( NodeId, EventHandler)}.
+ */
+ SUBSCRIBE,
+
+ /**
+ * This event type matches the regular event in the striped disruptor.
+ */
+ REGULAR
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
index fd91823dee..8ebe955a53 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/NodeIdAware.java
@@ -17,17 +17,35 @@
package org.apache.ignite.raft.jraft.disruptor;
+import static
org.apache.ignite.raft.jraft.disruptor.DisruptorEventType.REGULAR;
+import com.lmax.disruptor.EventHandler;
import org.apache.ignite.raft.jraft.entity.NodeId;
/**
- * Interface provides Raft node id.
- * It allows to determine a stripe in Striped disruptor.
+ * Interface provides Raft node id. It allows to determine a stripe in Striped
disruptor.
*/
-public interface NodeIdAware {
+public abstract class NodeIdAware {
+ /** Raft node id. */
+ public NodeId nodeId;
+
+ /** The event handler is used to {@link DisruptorEventType#SUBSCRIBE} in
other cases, it should be {@code null}. */
+ public EventHandler<NodeIdAware> handler;
+
+ /** Disruptor event type. */
+ public DisruptorEventType evtType;
+
/**
* Gets a Raft node id.
*
* @return Raft node id.
*/
- NodeId nodeId();
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ public void reset() {
+ nodeId = null;
+ handler = null;
+ evtType = REGULAR;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index 0010003dce..d3eea69136 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.raft.jraft.disruptor;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.raft.jraft.disruptor.DisruptorEventType.SUBSCRIBE;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
@@ -27,6 +29,8 @@ import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,15 +44,25 @@ import org.apache.ignite.raft.jraft.entity.NodeId;
import org.jetbrains.annotations.Nullable;
/**
- * Stripe Disruptor is a set of queues which process several independent
groups in one queue (in the stripe).
- * It makes fewer threads that the groups and gives the same sequential
guaranties and a close performance.
+ * Stripe Disruptor is a set of queues which process several independent
groups in one queue (in the stripe). It makes fewer threads that
+ * the groups and gives the same sequential guaranties and a close performance.
*
* @param <T> Event type. This event should implement {@link NodeIdAware}
interface.
*/
public class StripedDisruptor<T extends NodeIdAware> {
+ /**
+ * It is an id that does not represent any node to batch events in one
stripe although {@link NodeId} may vary.
+ * This is a cached event in case the disruptor supports batching,
+ * because the {@link DisruptorEventType#SUBSCRIBE} event might be a
finale one and have to be handled.
+ */
+ private final NodeId FAKE_NODE_ID = new NodeId(null, null);
+
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(StripedDisruptor.class);
+ /** The map stores a matching node id for the stripe that sends messages
to the node. */
+ private ConcurrentHashMap<NodeId, Integer> stripeMapper = new
ConcurrentHashMap<>();
+
/** The counter is used to generate the next stripe to subscribe to in
order to be a round-robin hash. */
private final AtomicInteger incrementalCounter = new AtomicInteger();
@@ -58,10 +72,10 @@ public class StripedDisruptor<T extends NodeIdAware> {
/** Array of Ring buffer. It placed according to disruptors in the array.
*/
private final RingBuffer<T>[] queues;
- /** Disruptor event handler array. It placed according to disruptors in
the array.*/
+ /** Disruptor event handler array. It placed according to disruptors in
the array. */
private final ArrayList<StripeEntryHandler> eventHandlers;
- /** Disruptor error handler array. It placed according to disruptors in
the array.*/
+ /** Disruptor error handler array. It placed according to disruptors in
the array. */
private final ArrayList<StripeExceptionHandler> exceptionHandlers;
/** Amount of stripes. */
@@ -72,12 +86,8 @@ public class StripedDisruptor<T extends NodeIdAware> {
private final DisruptorMetrics metrics;
- /**
- * If {@code false}, this stripe will always pass {@code true} into {@link
EventHandler#onEvent(Object, long, boolean)}.
- * Otherwise, the data will be provided with batches.
- */
- // TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching
tasks until IGNITE-15568 has fixed.
- private final boolean supportsBatches;
+ /** If it is true, the disruptor batch shares across all subscribers.
Otherwise, the batch sends for each one. */
+ private final boolean sharedStripe;
/**
* @param nodeName Name of the Ignite node.
@@ -85,8 +95,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
* @param bufferSize Buffer size for each Disruptor.
* @param eventFactory Event factory for the Striped disruptor.
* @param stripes Amount of stripes.
- * @param supportsBatches If {@code false}, this stripe will always pass
{@code true} into
- * {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise,
the data will be provided with batches.
+ * @param sharedStripe If it is true, the disruptor batch shares across
all subscribers. Otherwise, the batch sends for each one.
* @param useYieldStrategy If {@code true}, the yield strategy is to be
used, otherwise the blocking strategy.
* @param metrics Metrics.
*/
@@ -96,7 +105,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
int bufferSize,
EventFactory<T> eventFactory,
int stripes,
- boolean supportsBatches,
+ boolean sharedStripe,
boolean useYieldStrategy,
@Nullable DisruptorMetrics metrics
) {
@@ -107,7 +116,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
bufferSize,
eventFactory,
stripes,
- supportsBatches,
+ sharedStripe,
useYieldStrategy,
metrics
);
@@ -120,8 +129,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
* @param bufferSize Buffer size for each Disruptor.
* @param eventFactory Event factory for the Striped disruptor.
* @param stripes Amount of stripes.
- * @param supportsBatches If {@code false}, this stripe will always pass
{@code true} into
- * {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise,
the data will be provided with batches.
+ * @param sharedStripe If it is true, the disruptor batch shares across
all subscribers. Otherwise, the batch sends for each one.
* @param useYieldStrategy If {@code true}, the yield strategy is to be
used, otherwise the blocking strategy.
* @param raftMetrics Metrics.
*/
@@ -132,7 +140,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
int bufferSize,
EventFactory<T> eventFactory,
int stripes,
- boolean supportsBatches,
+ boolean sharedStripe,
boolean useYieldStrategy,
@Nullable DisruptorMetrics raftMetrics
) {
@@ -142,19 +150,19 @@ public class StripedDisruptor<T extends NodeIdAware> {
exceptionHandlers = new ArrayList<>(stripes);
this.stripes = stripes;
this.name = NamedThreadFactory.threadPrefix(nodeName, poolName);
- this.supportsBatches = supportsBatches;
+ this.sharedStripe = sharedStripe;
this.metrics = raftMetrics;
for (int i = 0; i < stripes; i++) {
String stripeName = format("{}_stripe_{}", poolName, i);
Disruptor<T> disruptor = DisruptorBuilder.<T>newInstance()
- .setRingBufferSize(bufferSize)
- .setEventFactory(eventFactory)
- .setThreadFactory(threadFactorySupplier.apply(nodeName,
stripeName))
- .setProducerType(ProducerType.MULTI)
- .setWaitStrategy(useYieldStrategy ? new YieldingWaitStrategy()
: new BlockingWaitStrategy())
- .build();
+ .setRingBufferSize(bufferSize)
+ .setEventFactory(eventFactory)
+ .setThreadFactory(threadFactorySupplier.apply(nodeName,
stripeName))
+ .setProducerType(ProducerType.MULTI)
+ .setWaitStrategy(useYieldStrategy ? new
YieldingWaitStrategy() : new BlockingWaitStrategy())
+ .build();
eventHandlers.add(new StripeEntryHandler(i));
exceptionHandlers.add(new StripeExceptionHandler(name));
@@ -172,8 +180,9 @@ public class StripedDisruptor<T extends NodeIdAware> {
*/
public void shutdown() {
for (int i = 0; i < stripes; i++) {
- if (disruptors[i] != null)
+ if (disruptors[i] != null) {
disruptors[i].shutdown();
+ }
// Help GC to collect unused resources.
queues[i] = null;
@@ -185,8 +194,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
}
/**
- * Subscribes an event handler to one stripe of the Striped disruptor.
- * The stripe is determined by a group id.
+ * Subscribes an event handler to one stripe of the Striped disruptor. The
stripe is determined by a group id.
*
* @param nodeId Node id.
* @param handler Event handler for the group specified.
@@ -197,8 +205,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
}
/**
- * Subscribes an event handler and a exception handler to one stripe of
the Striped disruptor.
- * The stripe is determined by a group id.
+ * Subscribes an event handler and a exception handler to one stripe of
the Striped disruptor. The stripe is determined by a group id.
*
* @param nodeId Node id.
* @param handler Event handler for the group specified.
@@ -210,10 +217,19 @@ public class StripedDisruptor<T extends NodeIdAware> {
int stripeId = nextStripeToSubscribe();
- eventHandlers.get(stripeId).subscribe(nodeId, handler);
+ stripeMapper.put(nodeId, stripeId);
- if (exceptionHandler != null)
+ queues[stripeId].publishEvent((event, sequence) -> {
+ event.reset();
+
+ event.evtType = SUBSCRIBE;
+ event.nodeId = nodeId;
+ event.handler = (EventHandler<NodeIdAware>) handler;
+ });
+
+ if (exceptionHandler != null) {
exceptionHandlers.get(stripeId).subscribe(nodeId,
exceptionHandler);
+ }
return queues[stripeId];
}
@@ -228,29 +244,33 @@ public class StripedDisruptor<T extends NodeIdAware> {
assert stripeId != -1 : "The replication group has not subscribed yet
[nodeId=" + nodeId + "].";
- eventHandlers.get(stripeId).unsubscribe(nodeId);
+ stripeMapper.remove(nodeId);
+
+ queues[stripeId].publishEvent((event, sequence) -> {
+ event.reset();
+
+ event.evtType = SUBSCRIBE;
+ event.nodeId = nodeId;
+ event.handler = null;
+ });
+
exceptionHandlers.get(stripeId).unsubscribe(nodeId);
}
/**
- * If the replication group is already subscribed, this method determines
a stripe by a node id and returns a stripe number.
- * If the replication group did not subscribed yet, this method returns
{@code -1};
+ * If the replication group is already subscribed, this method determines
a stripe by a node id and returns a stripe number. If the
+ * replication group did not subscribed yet, this method returns {@code
-1};
*
* @param nodeId Node id.
* @return Stripe of the Striped disruptor.
*/
public int getStripe(NodeId nodeId) {
- for (StripeEntryHandler handler : eventHandlers) {
- if (handler.isSubscribed(nodeId)) {
- return handler.stripeId;
- }
- }
-
- return -1;
+ return stripeMapper.getOrDefault(nodeId, -1);
}
/**
* Generates the next stripe number in a round-robin manner.
+ *
* @return The stripe number.
*/
private int nextStripeToSubscribe() {
@@ -258,28 +278,16 @@ public class StripedDisruptor<T extends NodeIdAware> {
}
/**
- * Determines a Disruptor queue by a group id.
- *
- * @param nodeId Node id.
- * @return Disruptor queue appropriate to the group.
- */
- public RingBuffer<T> queue(NodeId nodeId) {
- int stripeId = getStripe(nodeId);
-
- assert stripeId != -1 : "The replication group has not subscribed yet
[nodeId=" + nodeId + "].";
-
- return queues[stripeId];
- }
-
- /**
- * Event handler for stripe of the Striped disruptor.
- * It routs an event to the event handler for a group.
+ * Event handler for stripe of the Striped disruptor. It routes an event
to the event handler for a group.
*/
private class StripeEntryHandler implements EventHandler<T> {
- private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers =
new ConcurrentHashMap<>();
+ private final Map<NodeId, EventHandler<T>> subscribers = new
HashMap<>();
+
+ /** The cache is used to correct handling the disruptor batch. */
+ private final Map<NodeId, T> eventCache = new HashMap<>();
- /** Size of the batch that is currently being handled. */
- private int currentBatchSize = 0;
+ /** Current batch sizes. */
+ private final Map<NodeId, Integer> currentBatchSizes = new HashMap<>();
/** Stripe id. */
private final int stripeId;
@@ -291,62 +299,75 @@ public class StripedDisruptor<T extends NodeIdAware> {
this.stripeId = stripeId;
}
- /**
- * Checks the replication group is subscribed to this stripe or not.
- * @param nodeId Replication group node id.
- * @return True if the group is subscribed, false otherwise.
- */
- public boolean isSubscribed(NodeId nodeId) {
- return subscribers.containsKey(nodeId);
- }
+ /** {@inheritDoc} */
+ @Override
+ public void onEvent(T event, long sequence, boolean endOfBatch) throws
Exception {
+ if (event.evtType == SUBSCRIBE) {
+ if (event.handler == null) {
+ subscribers.remove(event.nodeId());
+ } else {
+ subscribers.put(event.nodeId(), (EventHandler<T>)
event.handler);
+ }
+ } else {
+ internalBatching(event, sequence);
+ }
- /**
- * Subscribes a group to appropriate events for it.
- *
- * @param nodeId Node id.
- * @param handler Event handler for the group specified.
- */
- void subscribe(NodeId nodeId, EventHandler<T> handler) {
- subscribers.put(nodeId, handler);
+ if (endOfBatch) {
+ for (Map.Entry<NodeId, T> grpEvent : eventCache.entrySet()) {
+ EventHandler<T> grpHandler =
subscribers.get(grpEvent.getValue().nodeId());
+
+ if (grpHandler != null) {
+ if (metrics != null && metrics.enabled()) {
+ metrics.hitToStripe(stripeId);
+
+
metrics.addBatchSize(currentBatchSizes.getOrDefault(grpEvent.getKey(), 0) + 1);
+ }
+
+ grpHandler.onEvent(grpEvent.getValue(), sequence,
true);
+ }
+ }
+
+ currentBatchSizes.clear();
+ eventCache.clear();
+ }
}
/**
- * Unsubscribes a group for any event.
+ * Processes the event with intermediate cache to batch internally for
each subscriber for the stripe.
*
- * @param nodeId Node id.
+ * @param event Disruptor event to process.
+ * @param sequence Number in the sequence of the element.
+ * @throws Exception Throw when some handler fails.
*/
- void unsubscribe(NodeId nodeId) {
- subscribers.remove(nodeId);
- }
+ private void internalBatching(T event, long sequence) throws Exception
{
+ NodeId pushNodeId = sharedStripe ? FAKE_NODE_ID : event.nodeId();
- /** {@inheritDoc} */
- @Override public void onEvent(T event, long sequence, boolean
endOfBatch) throws Exception {
- EventHandler<T> handler = subscribers.get(event.nodeId());
+ T prevEvent = eventCache.put(pushNodeId, event);
- // TODO: IGNITE-20536 Need to add assert that handler is not null
and to implement a no-op handler.
- if (handler != null) {
- if (metrics != null && metrics.enabled()) {
- metrics.hitToStripe(stripeId);
+ if (prevEvent != null) {
+ EventHandler<T> grpHandler =
subscribers.get(prevEvent.nodeId());
+
+ if (grpHandler != null) {
+ if (metrics != null && metrics.enabled()) {
+ metrics.hitToStripe(stripeId);
- if (endOfBatch) {
- metrics.addBatchSize(currentBatchSize + 1);
+ currentBatchSizes.compute(pushNodeId, (nodeId, cnt) ->
{
+ if (cnt == null) {
+ return 1;
+ }
- currentBatchSize = 0;
- } else {
- currentBatchSize ++;
+ return cnt + 1;
+ });
}
- }
- handler.onEvent(event, sequence, endOfBatch ||
subscribers.size() > 1 && !supportsBatches);
- } else {
- LOG.warn(format("Group of the event is unsupported [nodeId={},
event={}]", event.nodeId(), event));
+ grpHandler.onEvent(prevEvent, sequence, false);
+ }
}
}
}
/**
- * Striped disruptor exception handler.
- * It prints into log when an exception has occurred and route it to the
handler for group.
+ * Striped disruptor exception handler. It prints into log when an
exception has occurred and route it to the handler for group.
*/
private class StripeExceptionHandler implements ExceptionHandler<T> {
/** Name of the Disruptor instance. */
@@ -383,31 +404,36 @@ public class StripedDisruptor<T extends NodeIdAware> {
}
/** {@inheritDoc} */
- @Override public void handleOnStartException(Throwable ex) {
+ @Override
+ public void handleOnStartException(Throwable ex) {
LOG.error("Fail to start disruptor [name={}]", ex, name);
}
/** {@inheritDoc} */
- @Override public void handleOnShutdownException(Throwable ex) {
+ @Override
+ public void handleOnShutdownException(Throwable ex) {
LOG.error("Fail to shutdown disruptor [name={}]", ex, name);
}
/** {@inheritDoc} */
- @Override public void handleEventException(Throwable ex, long
sequence, T event) {
+ @Override
+ public void handleEventException(Throwable ex, long sequence, T event)
{
NodeId nodeId = event.nodeId();
BiConsumer<T, Throwable> handler = nodeId == null ? null :
subscribers.get(nodeId);
LOG.error("Handle disruptor event error [name={}, event={},
hasHandler={}]", ex, name, event, handler != null);
- if (handler != null)
+ if (handler != null) {
handler.accept(event, ex);
+ }
}
}
/** {@inheritDoc} */
- @Override public String toString() {
+ @Override
+ public String toString() {
return format("{} [name={}]", StripedDisruptor.class.getSimpleName(),
name);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 49e72c4f1f..7952f03ff3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
@@ -103,20 +104,14 @@ public class LogManagerImpl implements LogManager {
LAST_LOG_ID // get last log id
}
- public static class StableClosureEvent implements NodeIdAware {
- /** Raft node id. */
- NodeId nodeId;
-
+ public static class StableClosureEvent extends NodeIdAware {
StableClosure done;
EventType type;
@Override
- public NodeId nodeId() {
- return nodeId;
- }
+ public void reset() {
+ super.reset();
- void reset() {
- this.nodeId = null;
this.done = null;
this.type = null;
}
@@ -219,6 +214,7 @@ public class LogManagerImpl implements LogManager {
this.shutDownLatch = new CountDownLatch(1);
Utils.runInThread(nodeOptions.getCommonExecutor(), () ->
this.diskQueue.publishEvent((event, sequence) -> {
event.reset();
+
event.nodeId = this.nodeId;
event.type = EventType.SHUTDOWN;
}));
@@ -336,6 +332,7 @@ public class LogManagerImpl implements LogManager {
// publish event out of lock
this.diskQueue.publishEvent((event, sequence) -> {
event.reset();
+
event.nodeId = this.nodeId;
event.type = EventType.OTHER;
event.done = done;
@@ -362,6 +359,7 @@ public class LogManagerImpl implements LogManager {
}
this.diskQueue.publishEvent((event, sequence) -> {
event.reset();
+
event.nodeId = this.nodeId;
event.type = type;
event.done = done;
diff --git
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index a6c78dc8b8..cda112fd14 100644
---
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -17,15 +17,25 @@
package org.apache.ignite.disruptor;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -34,6 +44,8 @@ import
org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests for striped disruptor.
@@ -70,11 +82,15 @@ public class StripedDisruptorTest extends
IgniteAbstractTest {
int finalInt = i;
taskQueue1.tryPublishEvent((event, sequence) -> {
+ event.reset();
+
event.nodeId = nodeId1;
event.num = finalInt;
});
taskQueue2.tryPublishEvent((event, sequence) -> {
+ event.reset();
+
event.nodeId = nodeId2;
event.num = finalInt;
});
@@ -117,6 +133,8 @@ public class StripedDisruptorTest extends
IgniteAbstractTest {
int finalInt = i;
taskQueue.publishEvent((event, sequence) -> {
+ event.reset();
+
event.nodeId = nodeId;
event.num = finalInt;
});
@@ -174,6 +192,234 @@ public class StripedDisruptorTest extends
IgniteAbstractTest {
}
}
+ @Test
+ public void testOneSubscriberBatching() throws Exception {
+ Random random = new Random();
+
+ int stripes = random.nextInt(20) + 1;
+
+ StripedDisruptor<NodeIdAwareTestObj> disruptor = new
StripedDisruptor<>("test", "test-disruptor",
+ 16384,
+ NodeIdAwareTestObj::new,
+ stripes,
+ false,
+ false,
+ null);
+
+ HashMap<NodeId, RingBuffer<NodeIdAwareTestObj>> queues = new
HashMap<>();
+ GroupAwareTestObjHandler[] handlers = new
GroupAwareTestObjHandler[stripes];
+ NodeId[] nodesIds = new NodeId[stripes];
+
+ for (int i = 0; i < stripes; i++) {
+ GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+ var nodeId = new NodeId("grp", new PeerId(String.valueOf(i)));
+
+ queues.put(nodeId, disruptor.subscribe(nodeId, handler));
+ handlers[i] = handler;
+ nodesIds[i] = nodeId;
+ }
+
+ int batchSize = random.nextInt(50) + 1;
+
+ for (NodeId nodeId : nodesIds) {
+ EventTranslator<NodeIdAwareTestObj>[] eventTranslators = new
EventTranslator[batchSize];
+
+ for (int i = 0; i < batchSize; i++) {
+ int finalI = i;
+
+ eventTranslators[i] = (event, sequence) -> {
+ event.reset();
+
+ event.nodeId = nodeId;
+ event.num = finalI;
+ };
+ }
+
+ queues.get(nodeId).publishEvents(eventTranslators);
+ }
+
+ assertTrue(IgniteTestUtils.waitForCondition(() -> {
+ for (GroupAwareTestObjHandler handler : handlers) {
+ if (handler.applied != batchSize || handler.batchesApplied !=
1) {
+ return false;
+ }
+ }
+
+ return true;
+ }, 10_000));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testMultipleSubscriberBatching(boolean supportBatching) throws
Exception {
+ Random random = new Random();
+
+ int totalHandlers = random.nextInt(20) + 1;
+
+ StripedDisruptor<NodeIdAwareTestObj> disruptor = new
StripedDisruptor<>("test", "test-disruptor",
+ 16384,
+ NodeIdAwareTestObj::new,
+ 1,
+ supportBatching,
+ false,
+ null);
+
+ RingBuffer<NodeIdAwareTestObj> queue = null;
+ GroupAwareTestObjHandler[] handlers = new
GroupAwareTestObjHandler[totalHandlers];
+ NodeId[] nodesIds = new NodeId[totalHandlers];
+
+ for (int i = 0; i < totalHandlers; i++) {
+ GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+ var nodeId = new NodeId("grp", new PeerId(String.valueOf(i)));
+
+ // Any queue can use here, because the striped disruptor has the
only stripe.
+ queue = disruptor.subscribe(nodeId, handler);
+
+ handlers[i] = handler;
+ nodesIds[i] = nodeId;
+
+ assertEquals(0, disruptor.getStripe(nodeId));
+ }
+
+ int batchSize = random.nextInt(50) + 1;
+
+ EventTranslator<NodeIdAwareTestObj>[] eventTranslators = new
EventTranslator[totalHandlers * batchSize];
+
+ for (int i = 0; i < totalHandlers * batchSize; i++) {
+ int finalI = i;
+
+ eventTranslators[i] = (event, sequence) -> {
+ event.reset();
+
+ event.nodeId = nodesIds[finalI % totalHandlers];
+ event.num = finalI;
+ };
+ }
+
+ queue.publishEvents(eventTranslators);
+
+ if (supportBatching) {
+ assertTrue(IgniteTestUtils.waitForCondition(() -> {
+ int batchCommited = 0;
+
+ for (GroupAwareTestObjHandler handler : handlers) {
+ if (handler.applied == batchSize && handler.batchesApplied
== 1) {
+ batchCommited++;
+ }
+ }
+
+ return batchCommited == 1;
+ }, 10_000));
+ } else {
+ assertTrue(IgniteTestUtils.waitForCondition(() -> {
+ for (GroupAwareTestObjHandler handler : handlers) {
+ if (handler.applied != batchSize || handler.batchesApplied
!= 1) {
+ return false;
+ }
+ }
+
+ return true;
+ }, 10_000));
+ }
+ }
+
+ @Test
+ public void testConcurrentSubscribe() throws Exception {
+ Random random = new Random();
+
+ int totalHandlers = random.nextInt(20) + 1;
+
+ StripedDisruptor<NodeIdAwareTestObj> disruptor = new
StripedDisruptor<>("test", "test-disruptor",
+ 16384,
+ NodeIdAwareTestObj::new,
+ 1,
+ false,
+ false,
+ null);
+
+ RingBuffer<NodeIdAwareTestObj> queue = null;
+ GroupAwareTestObjHandler[] handlers = new
GroupAwareTestObjHandler[totalHandlers];
+ NodeId[] nodesIds = new NodeId[totalHandlers];
+
+ for (int i = 0; i < totalHandlers; i++) {
+ GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+ var nodeId = new NodeId("grp", new PeerId(String.valueOf(i)));
+
+ // Any queue can use here, because the striped disruptor has the
only stripe.
+ queue = disruptor.subscribe(nodeId, handler);
+
+ handlers[i] = handler;
+ nodesIds[i] = nodeId;
+
+ assertEquals(0, disruptor.getStripe(nodeId));
+ }
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ int unstableSubscriberNumber = random.nextInt(totalHandlers);
+ NodeId unstableSubscriber = nodesIds[unstableSubscriberNumber];
+ GroupAwareTestObjHandler unstableSubscriberHandler =
handlers[unstableSubscriberNumber];
+
+ CompletableFuture<Void> stopTreadCompleted =
IgniteTestUtils.runAsync(() -> {
+ while (!stop.get()) {
+ if (disruptor.getStripe(unstableSubscriber) == -1) {
+ disruptor.subscribe(unstableSubscriber,
unstableSubscriberHandler);
+ } else {
+ disruptor.unsubscribe(unstableSubscriber);
+ }
+ }
+ });
+
+ Map<NodeId, Integer> appliedMap = new HashMap<>(totalHandlers);
+
+ for (int iter = 0; iter < 100_000; iter++) {
+ int batchSize = random.nextInt(10) + 1;
+
+ List<EventTranslator<NodeIdAwareTestObj>> eventTranslators =
random.ints(
+ batchSize,
+ 0,
+ totalHandlers
+ ).mapToObj(value -> {
+ appliedMap.compute(nodesIds[value], (nodeId, count) -> {
+ if (count == null) {
+ return 1;
+ }
+ return count + 1;
+ });
+
+ return (EventTranslator<NodeIdAwareTestObj>) (event, sequence)
-> {
+ event.reset();
+
+ event.nodeId = nodesIds[value];
+ event.num = value;
+ };
+ }).collect(Collectors.toList());
+
+ queue.publishEvents(eventTranslators.toArray(new
EventTranslator[0]));
+ }
+
+ stop.set(true);
+
+ assertThat(stopTreadCompleted, willCompleteSuccessfully());
+
+ assertTrue(IgniteTestUtils.waitForCondition(() -> {
+ for (int i = 0; i < totalHandlers; i++) {
+ if (i == unstableSubscriberNumber) {
+ continue;
+ }
+
+ if (handlers[i].applied !=
appliedMap.getOrDefault(nodesIds[i], 0)) {
+ return false;
+ }
+ }
+
+ return true;
+ }, 10_000));
+ }
+
/** Group event handler. */
private static class GroupAwareTestObjHandler implements
EventHandler<NodeIdAwareTestObj> {
/** This is a container for the batch events. */
@@ -182,6 +428,9 @@ public class StripedDisruptorTest extends
IgniteAbstractTest {
/** Counter of applied events. */
int applied = 0;
+ /** Amount of applied batches. */
+ int batchesApplied = 0;
+
/** {@inheritDoc} */
@Override
public void onEvent(NodeIdAwareTestObj event, long sequence, boolean
endOfBatch) {
@@ -190,6 +439,8 @@ public class StripedDisruptorTest extends
IgniteAbstractTest {
if (endOfBatch) {
applied += batch.size();
+ batchesApplied++;
+
batch.clear();
}
}
@@ -198,16 +449,8 @@ public class StripedDisruptorTest extends
IgniteAbstractTest {
/**
* Group aware object implementation to test the striped disruptor.
*/
- private static class NodeIdAwareTestObj implements NodeIdAware {
- /** Node id. */
- NodeId nodeId;
-
+ private static class NodeIdAwareTestObj extends NodeIdAware {
/** Any integer number. */
int num;
-
- @Override
- public NodeId nodeId() {
- return nodeId;
- }
}
}