This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ac7a6fd3c5d IGNITE-24484 RAFT heartbeat coalescing (#5221)
ac7a6fd3c5d is described below
commit ac7a6fd3c5d3f9b5adbc3ba3e422206dd16c9de0
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Mar 14 11:22:28 2025 +0300
IGNITE-24484 RAFT heartbeat coalescing (#5221)
---
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 21 +--
.../apache/ignite/raft/jraft/core/TestCluster.java | 13 +-
.../internal/raft/server/impl/JraftServerImpl.java | 17 +--
.../org/apache/ignite/raft/jraft/JRaftUtils.java | 13 ++
.../org/apache/ignite/raft/jraft/NodeManager.java | 162 ++++++++++++++++++++-
.../apache/ignite/raft/jraft/RaftGroupService.java | 14 +-
.../apache/ignite/raft/jraft/RaftMessageGroup.java | 6 +
.../apache/ignite/raft/jraft/core/NodeImpl.java | 2 +-
.../ignite/raft/jraft/option/NodeOptions.java | 31 +++-
.../rpc/{RpcClient.java => NetworkInvoker.java} | 34 +----
.../apache/ignite/raft/jraft/rpc/RpcClient.java | 26 +---
.../apache/ignite/raft/jraft/rpc/RpcRequests.java | 10 ++
.../raft/jraft/rpc/impl/AbstractClientService.java | 15 +-
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 11 +-
.../impl/core/AppendEntriesRequestProcessor.java | 9 +-
.../rpc/impl/core/DefaultRaftClientService.java | 66 ++++++++-
.../rpc/impl/core/HeartbeatRequestProcessor.java | 91 ++++++++++++
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 3 +-
.../impl/cli/AbstractCliRequestProcessorTest.java | 2 +-
.../ignite/raft/jraft/test/MockAsyncContext.java | 2 +-
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 5 +-
...ItTxDistributedTestThreeNodesThreeReplicas.java | 7 +-
22 files changed, 437 insertions(+), 123 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 358c6f01f1c..8676a147bee 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -144,7 +144,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcServer;
import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
-import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
@@ -2897,12 +2896,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
log.info("Leader stopped.");
assertTrue(cluster.getNode(peers.get(2).getPeerId()).isInstallingSnapshot());
-
- // Wait 30 seconds to check if snapshot is still installing.
- Thread.sleep(30_000);
-
- // Even after 30 seconds (in reality, forever), the snapshot is still
installing.
-
assertTrue(cluster.getNode(peers.get(2).getPeerId()).isInstallingSnapshot());
}
/**
@@ -3234,6 +3227,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
assertThat(log2.startAsync(startComponentContext),
willCompleteSuccessfully());
nodeOpts.setServiceFactory(new IgniteJraftServiceFactory(log2));
nodeOpts.setFsm(fsm);
+ nodeOpts.setNodeManager(new NodeManager(null));
RaftGroupService service = createService("test", peer, nodeOpts,
List.of());
@@ -3281,6 +3275,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
DefaultLogStorageFactory log2 = new DefaultLogStorageFactory(path);
assertThat(log2.startAsync(startComponentContext),
willCompleteSuccessfully());
nodeOpts.setServiceFactory(new IgniteJraftServiceFactory(log2));
+ nodeOpts.setNodeManager(new NodeManager(null));
RaftGroupService service = createService("test", peer, nodeOpts,
List.of());
@@ -4271,12 +4266,9 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
sendTestTaskAndWait(leader);
cluster.ensureSame();
- DefaultRaftClientService rpcService = (DefaultRaftClientService)
leader.getRpcClientService();
- RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
-
AtomicInteger cnt = new AtomicInteger();
- rpcClientEx.blockMessages((msg, nodeId) -> {
+
cluster.getServer(leader.getLeaderId()).getNodeOptions().getNodeManager().blockMessages((msg,
nodeId) -> {
assertTrue(msg instanceof RpcRequests.AppendEntriesRequest);
if (cnt.get() >= 2)
@@ -4552,6 +4544,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
options.setServiceFactory(new IgniteJraftServiceFactory(log));
options.setLogUri("test");
+ options.setNodeManager(new NodeManager(null));
return options;
}
@@ -4645,8 +4638,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
.map(p -> new NetworkAddress(TestUtils.getLocalAddress(),
p.getPort()))
.collect(toList());
- var nodeManager = new NodeManager();
-
ClusterService clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
peer.getPort(),
@@ -4657,7 +4648,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
executors.add(requestExecutor);
- IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService,
nodeManager, nodeOptions, requestExecutor);
+ IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService,
nodeOptions, requestExecutor);
nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
@@ -4665,7 +4656,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
assertThat(clusterService.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- var service = new RaftGroupService(groupId, peer.getPeerId(),
nodeOptions, rpcServer, nodeManager) {
+ var service = new RaftGroupService(groupId, peer.getPeerId(),
nodeOptions, rpcServer) {
@Override public synchronized void shutdown() {
rpcServer.shutdown();
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 1e88427040a..919e47c3b34 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -260,9 +260,11 @@ public class TestCluster {
));
}
- NodeManager nodeManager = new NodeManager();
-
ClusterService clusterService = clusterService(testInfo,
peer.getPort(), new StaticNodeFinder(addressList));
+ NodeManager nodeManager = new NodeManager(clusterService);
+
+ nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
+ nodeOptions.setNodeManager(nodeManager);
var rpcClient = new IgniteRpcClient(clusterService);
@@ -272,16 +274,19 @@ public class TestCluster {
ExecutorService requestExecutor =
JRaftUtils.createRequestExecutor(nodeOptions);
- var rpcServer = new TestIgniteRpcServer(clusterService,
nodeManager, nodeOptions, requestExecutor);
+ var rpcServer = new TestIgniteRpcServer(clusterService,
nodeOptions, requestExecutor);
assertThat(clusterService.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ nodeManager.init(nodeOptions);
+
if (optsClo != null)
optsClo.accept(peer.getPeerId(), nodeOptions);
RaftGroupService server = new RaftGroupService(this.name,
peer.getPeerId(),
- nodeOptions, rpcServer, nodeManager) {
+ nodeOptions, rpcServer) {
@Override public synchronized void shutdown() {
+ nodeManager.shutdown();
// This stop order is consistent with JRaftServerImpl
rpcServer.shutdown();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 7ede33d53ee..6414991eb03 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -137,9 +137,6 @@ public class JraftServerImpl implements RaftServer {
* needed to prevent concurrent start of the same raft group. */
private final List<Object> startGroupInProgressMonitors;
- /** Node manager. */
- private final NodeManager nodeManager;
-
/** Options. */
private final NodeOptions opts;
@@ -180,7 +177,6 @@ public class JraftServerImpl implements RaftServer {
) {
this.service = service;
this.groupStoragesContextResolver = groupStoragesContextResolver;
- this.nodeManager = new NodeManager();
this.groupStoragesDestructionIntents = groupStoragesDestructionIntents;
this.opts = opts;
@@ -277,6 +273,10 @@ public class JraftServerImpl implements RaftServer {
opts.setSnapshotTimer(JRaftUtils.createTimer(opts,
"JRaft-SnapshotTimer"));
}
+ if (opts.getNodeManager() == null) {
+ opts.setNodeManager(new NodeManager(service));
+ }
+
requestExecutor = Executors.newFixedThreadPool(
opts.getRaftRpcThreadPoolSize(),
IgniteThreadFactory.create(opts.getServerName(),
"JRaft-Request-Processor", LOG, PROCESS_RAFT_REQ)
@@ -284,7 +284,7 @@ public class JraftServerImpl implements RaftServer {
rpcServer = new IgniteRpcServer(
service,
- nodeManager,
+ opts.getNodeManager(),
opts.getRaftMessagesFactory(),
requestExecutor,
serviceEventInterceptor,
@@ -353,6 +353,7 @@ public class JraftServerImpl implements RaftServer {
}
rpcServer.init(null);
+ opts.getNodeManager().init(opts);
return
completeRaftGroupStoragesDestruction(componentContext.executor());
}
@@ -363,6 +364,7 @@ public class JraftServerImpl implements RaftServer {
assert nodes.isEmpty() : IgniteStringFormatter.format("Raft nodes {}
are still running on the Ignite node {}", nodes.keySet(),
service.topologyService().localMember().name());
+ opts.getNodeManager().shutdown();
rpcServer.shutdown();
if (opts.getfSMCallerExecutorDisruptor() != null) {
@@ -516,9 +518,7 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setInitialConf(new Configuration(peerIds, learnerIds));
- IgniteRpcClient client = new IgniteRpcClient(service);
-
- nodeOptions.setRpcClient(client);
+ nodeOptions.setRpcClient(new IgniteRpcClient(service));
nodeOptions.setExternallyEnforcedConfigIndex(groupOptions.externallyEnforcedConfigIndex());
@@ -527,7 +527,6 @@ public class JraftServerImpl implements RaftServer {
PeerId.fromPeer(nodeId.peer()),
nodeOptions,
rpcServer,
- nodeManager,
groupOptions.ownFsmCallerExecutorDisruptorConfig()
);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index 218a7ddfb49..524781cd451 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -35,6 +35,7 @@ import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.BootstrapOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RpcOptions;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.ThreadPoolUtil;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -259,4 +260,16 @@ public final class JRaftUtils {
private JRaftUtils() {
}
+
+ /**
+ * Is determined whether an append request is heartbeat.
+ *
+ * @param request Append entry request.
+ * @return True if the request is heartbeat.
+ */
+ public static boolean isHeartbeatRequest(AppendEntriesRequest request) {
+ // No entries and no data means a true heartbeat request.
+ // TODO refactor, adds a new flag field?
https://issues.apache.org/jira/browse/IGNITE-14832
+ return request.entriesList() == null && request.data() == null;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java
index eb90f4e6d5a..4ebc3c180df 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java
@@ -14,24 +14,159 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.raft.jraft;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiPredicate;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.raft.jraft.core.Scheduler;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatRequestBuilder;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcClient;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
/**
* Raft nodes manager.
*/
-public class NodeManager {
+public class NodeManager implements Lifecycle<NodeOptions> {
+ private static final IgniteLogger LOG =
Loggers.forClass(NodeManager.class);
+
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
private final ConcurrentMap<NodeId, Node> nodeMap = new
ConcurrentHashMap<>();
private final ConcurrentMap<String, List<Node>> groupMap = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<PeerId, Queue<Object[]>> coalesced = new
ConcurrentHashMap<>();
+
+ /** Node options. */
+ private NodeOptions options;
+ /** Task scheduler. */
+ private Scheduler scheduler;
+ /** Rpc client. */
+ private final RpcClient rpcClient;
+ /** Message factory. */
+ private RaftMessagesFactory messagesFactory;
+ /** Predicate to block a heartbeat messages. */
+ private BiPredicate<Message, PeerId> blockPred;
+
+ public NodeManager(ClusterService service) {
+ rpcClient = new IgniteRpcClient(service);
+ }
+
+ @Override
+ public boolean init(NodeOptions opts) {
+ options = opts;
+ scheduler = opts.getScheduler();
+ messagesFactory = opts.getRaftMessagesFactory();
+
+ // TODO: IGNITE-24789 Single trigger for all RAFT heartbeat in node.
+ scheduler.schedule(this::onSentHeartbeat ,
opts.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);
+
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ stopGuard.compareAndSet(false, true);
+
+ rpcClient.shutdown();
+ }
+
+ public void blockMessages(BiPredicate<Message, PeerId> predicate) {
+ this.blockPred = predicate;
+ }
+
+ public void stopBlock() {
+ this.blockPred = null;
+ }
+
+ /**
+ * Sends a heartbeat request.
+ */
+ private void onSentHeartbeat() {
+ for (PeerId remote : coalesced.keySet()) {
+ coalesced.computeIfPresent(remote, (peer, queue) -> {
+ if (!queue.isEmpty()) {
+ CoalescedHeartbeatRequestBuilder builder =
messagesFactory.coalescedHeartbeatRequest();
+ ArrayList<AppendEntriesRequest> list = new ArrayList<>();
+ builder.messages(list);
+
+ Object[] req;
+
+ List<CompletableFuture<Message>> futs = new ArrayList<>();
+ ArrayList<Object[]> blocked = new ArrayList<>();
+
+ while ((req = queue.poll()) != null) {
+ var msg = (AppendEntriesRequest) req[0];
+
+ if (blockPred != null && blockPred.test(msg, peer)) {
+ blocked.add(req);
+
+ continue;
+ }
+
+ builder.messages().add(msg);
+
+ futs.add((CompletableFuture<Message>) req[1]);
+ }
+
+ queue.addAll(blocked);
+
+ try {
+ rpcClient.invokeAsync(peer, builder.build(), null,
(result, err) -> {
+ if (err != null) {
+ for (CompletableFuture<Message> fut : futs) {
+ fut.completeExceptionally(err);
+ }
+
+ return;
+ }
+
+ CoalescedHeartbeatResponse resp =
(CoalescedHeartbeatResponse) result;
+
+ assert resp.messages().size() == futs.size();
+
+ int i = 0;
+ for (Message message : resp.messages()) {
+ futs.get(i++).complete(message); // Future
completion will trigger callbacks.
+ }
+ }, options.getElectionTimeoutMs() / 2);
+ } catch (Exception e) {
+ LOG.error("Failed to send heartbeat message to remote
node [remote={}].", e, peer);
+
+ for (CompletableFuture<Message> fut : futs) {
+ fut.completeExceptionally(e);
+ }
+ }
+ }
+
+ return queue;
+ });
+ }
+
+ if (!stopGuard.get()) {
+ scheduler.schedule(this::onSentHeartbeat,
options.getElectionTimeoutMs() / 4, TimeUnit.MILLISECONDS);
+ }
+ }
/**
* Adds a node.
@@ -68,11 +203,21 @@ public class NodeManager {
*/
public boolean remove(final Node node) {
if (this.nodeMap.remove(node.getNodeId(), node)) {
- final List<Node> nodes = this.groupMap.get(node.getGroupId());
+ PeerId peerId = node.getNodeId().getPeerId();
+
+ for (PeerId remote : coalesced.keySet()) {
+ if (remote.equals(peerId.getConsistentId())) {
+ coalesced.remove(remote);
+ }
+ }
+
+ List<Node> nodes = this.groupMap.get(node.getGroupId());
+
if (nodes != null) {
return nodes.remove(node);
}
}
+
return false;
}
@@ -96,4 +241,17 @@ public class NodeManager {
public List<Node> getAllNodes() {
return
this.groupMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
}
+
+ public CompletableFuture<Message> enqueue(PeerId to, Message request) {
+ CompletableFuture<Message> fut = new CompletableFuture<>();
+
+ coalesced.computeIfAbsent(to, k -> new ConcurrentLinkedQueue<>())
+ .add(new Object[]{request, fut});
+
+ return fut;
+ }
+
+ public ConcurrentMap<PeerId, Queue<Object[]>> getCoalesced() {
+ return coalesced;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
index 260fcb4ef29..206e4f3ec0b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
@@ -16,14 +16,13 @@
*/
package org.apache.ignite.raft.jraft;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
-import org.apache.ignite.raft.jraft.option.RpcOptions;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.jetbrains.annotations.Nullable;
@@ -74,16 +73,14 @@ public class RaftGroupService {
* @param serverId Server id.
* @param nodeOptions Node options.
* @param rpcServer RPC server.
- * @param nodeManager Node manager.
*/
public RaftGroupService(
final String groupId,
final PeerId serverId,
final NodeOptions nodeOptions,
- final RpcServer rpcServer,
- final NodeManager nodeManager
+ final RpcServer rpcServer
) {
- this(groupId, serverId, nodeOptions, rpcServer, nodeManager, null);
+ this(groupId, serverId, nodeOptions, rpcServer, null);
}
/**
@@ -91,7 +88,6 @@ public class RaftGroupService {
* @param serverId Server id.
* @param nodeOptions Node options.
* @param rpcServer RPC server.
- * @param nodeManager Node manager.
* @param ownFsmCallerExecutorDisruptorConfig Configuration own striped
disruptor for FSMCaller service of raft node, {@code null}
* means use shared disruptor.
*/
@@ -100,15 +96,13 @@ public class RaftGroupService {
final PeerId serverId,
final NodeOptions nodeOptions,
final RpcServer rpcServer,
- final NodeManager nodeManager,
@Nullable RaftNodeDisruptorConfiguration
ownFsmCallerExecutorDisruptorConfig
) {
- super();
this.groupId = groupId;
this.serverId = serverId;
this.nodeOptions = nodeOptions;
this.rpcServer = rpcServer;
- this.nodeManager = nodeManager;
+ this.nodeManager = nodeOptions.getNodeManager();
this.ownFsmCallerExecutorDisruptorConfig =
ownFsmCallerExecutorDisruptorConfig;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
index dccd5f84665..ba420b21555 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
@@ -172,6 +172,12 @@ public class RaftMessageGroup {
/** */
public static final short SM_ERROR_RESPONSE = 3014;
+
+ /** */
+ public static final short COALESCED_HEARTBEAT_REQUEST = 3015;
+
+ /** */
+ public static final short COALESCED_HEARTBEAT_RESPONSE = 3016;
}
/**
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index dfe59a561f2..b0fff753eb2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1763,7 +1763,7 @@ public class NodeImpl implements Node, RaftServerService {
/**
* ReadIndex response closure
*/
- private static class ReadIndexHeartbeatResponseClosure extends
RpcResponseClosureAdapter<AppendEntriesResponse> {
+ public static class ReadIndexHeartbeatResponseClosure extends
RpcResponseClosureAdapter<AppendEntriesResponse> {
final ReadIndexResponseBuilder respBuilder;
final RpcResponseClosure<ReadIndexResponse> closure;
final int quorum;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index decf522e573..0c2e0d8db30 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -17,16 +17,16 @@
package org.apache.ignite.raft.jraft.option;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.Marshaller;
import
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
-import org.apache.ignite.raft.jraft.Node;import
org.apache.ignite.raft.jraft.StateMachine;
+import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.core.ElectionPriority;
import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
@@ -43,7 +43,8 @@ import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
-import org.apache.ignite.raft.jraft.util.timer.Timer;import
org.jetbrains.annotations.Nullable;
+import org.apache.ignite.raft.jraft.util.timer.Timer;
+import org.jetbrains.annotations.Nullable;
/**
* Node options.
@@ -273,6 +274,9 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
private RaftMetricSource raftMetrics;
+ /** Node manager. */
+ private NodeManager nodeManager;
+
/**
* Externally enforced config index.
*
@@ -309,6 +313,24 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
this.raftMetrics = raftMetrics;
}
+ /**
+ * Gets a node manager.
+ *
+ * @return Node manager.
+ */
+ public NodeManager getNodeManager() {
+ return nodeManager;
+ }
+
+ /**
+ * Sets a node manager.
+ *
+ * @param nodeManager Node manager.
+ */
+ public void setNodeManager(NodeManager nodeManager) {
+ this.nodeManager = nodeManager;
+ }
+
/**
* @return Stripe count.
*/
@@ -738,6 +760,7 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
nodeOptions.setStripes(this.getStripes());
nodeOptions.setLogStripesCount(this.getLogStripesCount());
nodeOptions.setLogYieldStrategy(this.isLogYieldStrategy());
+ nodeOptions.setNodeManager(this.getNodeManager());
return nodeOptions;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/NetworkInvoker.java
similarity index 63%
copy from
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
copy to
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/NetworkInvoker.java
index 339ba177775..3aa4f090fac 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/NetworkInvoker.java
@@ -14,36 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.raft.jraft.rpc;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.network.TopologyEventHandler;
-import org.apache.ignite.raft.jraft.Lifecycle;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RemotingException;
-import org.apache.ignite.raft.jraft.option.RpcOptions;
import org.jetbrains.annotations.Nullable;
/**
- *
+ * Rpc invocation.
*/
-public interface RpcClient extends Lifecycle<RpcOptions> {
- /**
- * Check connection for given address.
- *
- * @param peerId target peer ID.
- * @return true if there is a connection and the connection is active and
writable.
- * @deprecated // TODO asch remove IGNITE-14832
- */
- boolean checkConnection(PeerId peerId);
-
- /**
- * Register a connect event listener for the handler.
- *
- * @param handler The handler.
- */
- void registerConnectEventListener(TopologyEventHandler handler);
-
+public interface NetworkInvoker {
/**
* Asynchronous invocation with a callback.
*
@@ -56,10 +38,10 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @return The future.
*/
CompletableFuture<Message> invokeAsync(
- PeerId peerId,
- Object request,
- @Nullable InvokeContext ctx,
- InvokeCallback callback,
- long timeoutMs
+ PeerId peerId,
+ Object request,
+ @Nullable InvokeContext ctx,
+ InvokeCallback callback,
+ long timeoutMs
) throws InterruptedException, RemotingException;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
index 339ba177775..e1d01222f00 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
@@ -16,18 +16,15 @@
*/
package org.apache.ignite.raft.jraft.rpc;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.raft.jraft.Lifecycle;
import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.error.RemotingException;
import org.apache.ignite.raft.jraft.option.RpcOptions;
-import org.jetbrains.annotations.Nullable;
/**
- *
+ * RPC client.
*/
-public interface RpcClient extends Lifecycle<RpcOptions> {
+public interface RpcClient extends Lifecycle<RpcOptions>, NetworkInvoker {
/**
* Check connection for given address.
*
@@ -43,23 +40,4 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @param handler The handler.
*/
void registerConnectEventListener(TopologyEventHandler handler);
-
- /**
- * Asynchronous invocation with a callback.
- *
- * @param peerId target peer ID
- * @param request request object
- * @param ctx invoke context
- * @param callback invoke callback
- * @param timeoutMs timeout millisecond
- *
- * @return The future.
- */
- CompletableFuture<Message> invokeAsync(
- PeerId peerId,
- Object request,
- @Nullable InvokeContext ctx,
- InvokeCallback callback,
- long timeoutMs
- ) throws InterruptedException, RemotingException;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 01d535a8bfe..a39e5ac8de5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -245,4 +245,14 @@ public final class RpcRequests {
boolean success();
}
+
+ @Transferable(value =
RaftMessageGroup.RpcRequestsMessageGroup.COALESCED_HEARTBEAT_REQUEST)
+ public interface CoalescedHeartbeatRequest extends Message {
+ Collection<AppendEntriesRequest> messages();
+ }
+
+ @Transferable(value =
RaftMessageGroup.RpcRequestsMessageGroup.COALESCED_HEARTBEAT_RESPONSE)
+ public interface CoalescedHeartbeatResponse extends Message {
+ Collection<Message> messages();
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index fdd78616ba7..97b2bbd68bd 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -21,16 +21,17 @@ import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFu
import java.net.ConnectException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;import
java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.raft.PeerUnavailableException;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.InvokeTimeoutException;
@@ -41,6 +42,7 @@ import org.apache.ignite.raft.jraft.rpc.ClientService;
import org.apache.ignite.raft.jraft.rpc.InvokeCallback;
import org.apache.ignite.raft.jraft.rpc.InvokeContext;
import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.NetworkInvoker;
import org.apache.ignite.raft.jraft.rpc.RpcClient;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
@@ -152,7 +154,7 @@ public abstract class AbstractClientService implements
ClientService, TopologyEv
.build();
CompletableFuture<Message> fut =
- invokeWithDone(peerId, req, null, null,
rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
+ invokeWithDone(peerId, req, null, null,
rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor, this.rpcClient);
return fut.thenApply(msg -> {
ErrorResponse resp = (ErrorResponse) msg;
@@ -178,20 +180,19 @@ public abstract class AbstractClientService implements
ClientService, TopologyEv
public <T extends Message> CompletableFuture<Message> invokeWithDone(final
PeerId peerId, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
- return invokeWithDone(peerId, request, null, done, timeoutMs,
rpcExecutor);
+ return invokeWithDone(peerId, request, null, done, timeoutMs,
rpcExecutor, this.rpcClient);
}
public <T extends Message> CompletableFuture<Message> invokeWithDone(final
PeerId peerId, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs) {
- return invokeWithDone(peerId, request, ctx, done, timeoutMs,
this.rpcExecutor);
+ return invokeWithDone(peerId, request, ctx, done, timeoutMs,
this.rpcExecutor, this.rpcClient);
}
public <T extends Message> CompletableFuture<Message> invokeWithDone(final
PeerId peerId, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs,
- final Executor rpcExecutor) {
- final RpcClient rc = this.rpcClient;
+ final Executor rpcExecutor, NetworkInvoker rc) {
final FutureImpl<Message> future = new FutureImpl<>();
final Executor currExecutor = rpcExecutor != null ? rpcExecutor :
this.rpcExecutor;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index f5b397deea2..85c9bb289e2 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.rpc.impl;
import static
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
import static
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
+
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,14 +27,14 @@ import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.server.impl.RaftServiceEventInterceptor;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.TopologyEventHandler;
+import org.apache.ignite.internal.raft.server.impl.RaftServiceEventInterceptor;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -55,6 +56,7 @@ import
org.apache.ignite.raft.jraft.rpc.impl.cli.TransferLeaderRequestProcessor;
import
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
import
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.GetFileRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.impl.core.HeartbeatRequestProcessor;
import
org.apache.ignite.raft.jraft.rpc.impl.core.InstallSnapshotRequestProcessor;
import
org.apache.ignite.raft.jraft.rpc.impl.core.InterceptingAppendEntriesRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.ReadIndexRequestProcessor;
@@ -110,6 +112,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new PingRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new ReadIndexRequestProcessor(rpcExecutor,
raftMessagesFactory));
+ registerProcessor(new HeartbeatRequestProcessor(rpcExecutor,
raftMessagesFactory));
// raft native cli service
registerProcessor(new AddPeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new RemovePeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index e24f7e0f689..ca2610349ab 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -22,6 +22,7 @@ import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -376,12 +377,6 @@ public class AppendEntriesRequestProcessor extends
NodeRequestProcessor<AppendEn
return getOrCreatePeerRequestContext(groupId, pair,
node).getAndIncrementSequence();
}
- private boolean isHeartbeatRequest(final AppendEntriesRequest request) {
- // No entries and no data means a true heartbeat request.
- // TODO refactor, adds a new flag field?
https://issues.apache.org/jira/browse/IGNITE-14832
- return request.entriesList() == null && request.data() == null;
- }
-
@Override
public Message processRequest0(final RaftServerService service, final
AppendEntriesRequest request,
final RpcRequestClosure done) {
@@ -392,7 +387,7 @@ public class AppendEntriesRequestProcessor extends
NodeRequestProcessor<AppendEn
final String groupId = request.groupId();
final PeerPair pair = pairOf(request.peerId(), request.serverId());
- boolean isHeartbeat = isHeartbeatRequest(request);
+ boolean isHeartbeat = JRaftUtils.isHeartbeatRequest(request);
int reqSequence = -1;
if (!isHeartbeat) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
index 9f476c1413b..0f653af3300 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
@@ -19,10 +19,16 @@ package org.apache.ignite.raft.jraft.rpc.impl.core;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.Status;
+import
org.apache.ignite.raft.jraft.core.NodeImpl.ReadIndexHeartbeatResponseClosure;
import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.InvokeTimeoutException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RemotingException;
import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -44,6 +50,7 @@ import
org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowResponse;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
import org.apache.ignite.raft.jraft.rpc.impl.AbstractClientService;
+import org.apache.ignite.raft.jraft.util.Utils;
/**
* Raft rpc service.
@@ -92,15 +99,72 @@ public class DefaultRaftClientService extends
AbstractClientService implements R
// Assign an executor in round-robin fasion.
final Executor executor =
this.appendEntriesExecutorMap.computeIfAbsent(peerId,
- k -> nodeOptions.getStripedExecutor().next());
+ k -> nodeOptions.getStripedExecutor().next());
if (connect(peerId)) { // Replicator should be started asynchronously
by node joined event.
+ // TODO: IGNITE-24788 Configurable coalescing for RAFT heartbeat.
+ if (JRaftUtils.isHeartbeatRequest(request) &&
!isReadIndexRequest(done)) {
+ return sendHeartbeat(peerId, request, timeoutMs, done,
executor);
+ }
+
return invokeWithDone(peerId, request, done, timeoutMs, executor);
}
return onConnectionFail(executor, request, done, peerId);
}
+ /**
+ * Accumulates heartbeat messages to send them into the batch request.
+ *
+ * @param peerId Remote peer id.
+ * @param request Request.
+ * @param timeoutMs Timeout.
+ * @param done Done callback.
+ * @param executor Executor where the done callback is executed.
+ * @return A future with response.
+ */
+ private Future<Message> sendHeartbeat(
+ PeerId peerId,
+ AppendEntriesRequest request,
+ int timeoutMs,
+ RpcResponseClosure<AppendEntriesResponse> done,
+ Executor executor
+ ) {
+ NodeManager nodeManager = this.nodeOptions.getNodeManager();
+
+ return invokeWithDone(
+ peerId,
+ request,
+ null,
+ done,
+ timeoutMs,
+ executor,
+ (peerId1, request1, ctx, callback, timeoutMs1) ->
+ nodeManager.enqueue(peerId, (Message)
request1).whenComplete((res, err) -> {
+ if (err instanceof ExecutionException) {
+ err = new RemotingException(err);
+ } else if (err instanceof TimeoutException) //
Translate timeout exception.
+ {
+ err = new InvokeTimeoutException();
+ }
+
+ Throwable finalErr = err;
+
+ // Avoid deadlocks if a closure has completed in
the same thread.
+ Utils.runInThread(callback.executor(), () ->
callback.complete(res, finalErr));
+ })
+ );
+ }
+
+ /**
+ * Checks whether it is a read index request or not.
+ * @param doneClosure Done closure.
+ * @return True if the read index request.
+ */
+ private static boolean
isReadIndexRequest(RpcResponseClosure<AppendEntriesResponse> doneClosure) {
+ return doneClosure instanceof ReadIndexHeartbeatResponseClosure;
+ }
+
@Override
public Future<Message> getFile(final PeerId peerId, final GetFileRequest
request, final int timeoutMs,
final RpcResponseClosure<GetFileResponse> done) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java
new file mode 100644
index 00000000000..53d0a67a345
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc.impl.core;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatResponseBuilder;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatRequest;
+
+/**
+ * Heartbeat request processor.
+ */
+public class HeartbeatRequestProcessor extends
RpcRequestProcessor<CoalescedHeartbeatRequest> {
+
+ /**
+ * Constructor.
+ *
+ * @param executor Executor.
+ * @param msgFactory Message factory.
+ */
+ public HeartbeatRequestProcessor(Executor executor, RaftMessagesFactory
msgFactory) {
+ super(executor, msgFactory);
+ }
+
+ @Override
+ public Message processRequest(CoalescedHeartbeatRequest request,
RpcRequestClosure done) {
+ CoalescedHeartbeatResponseBuilder builder =
msgFactory().coalescedHeartbeatResponse();
+ builder.messages(new ArrayList<>());
+
+ for (AppendEntriesRequest message : request.messages()) {
+ PeerId peerId = PeerId.parsePeer(message.peerId());
+
+ Message msg;
+
+ if (peerId == null) {
+ msg = RaftRpcFactory.DEFAULT.newResponse(
+ msgFactory(),
+ RaftError.EINVAL,
+ "Fail to parse peerId: %s",
+ message.peerId());
+ } else {
+ Node node =
done.getRpcCtx().getNodeManager().get(message.groupId(), peerId);
+
+ if (node == null) {
+ msg = RaftRpcFactory.DEFAULT.newResponse(
+ msgFactory(),
+ RaftError.ENOENT, "Peer id not found: %s, group:
%s", message.peerId(),
+ message.groupId());
+ } else {
+ RaftServerService svc = (RaftServerService) node;
+
+ msg = svc.handleAppendEntriesRequest(message, null);
+ }
+ }
+
+ builder.messages().add(msg);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public String interest() {
+ return CoalescedHeartbeatRequest.class.getName();
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index e6594731edf..ad40007064d 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -78,10 +78,11 @@ public class IgniteRpcTest extends AbstractRpcTest {
);
NodeOptions nodeOptions = new NodeOptions();
+ nodeOptions.setNodeManager(new NodeManager(service));
requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
- var server = new TestIgniteRpcServer(service, new NodeManager(),
nodeOptions, requestExecutor) {
+ var server = new TestIgniteRpcServer(service, nodeOptions,
requestExecutor) {
@Override public void shutdown() {
super.shutdown();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
index 26c43e2eb8f..ef10c3612f6 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
@@ -45,7 +45,7 @@ public abstract class AbstractCliRequestProcessorTest<T
extends Message> extends
private final String groupId = "test";
private final String peerIdStr = "localhost:8081";
protected MockAsyncContext asyncContext;
- protected NodeManager nodeManager = new NodeManager();
+ protected NodeManager nodeManager = new NodeManager(null);
protected RaftMessagesFactory msgFactory = new RaftMessagesFactory();
public abstract T createRequest(String groupId, PeerId peerId);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
index 81c8a3cbd60..afce52a38ae 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
@@ -30,7 +30,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcContext;
*/
public class MockAsyncContext implements RpcContext {
private Object responseObject;
- private NodeManager nodeManager = new NodeManager();
+ private NodeManager nodeManager = new NodeManager(null);
private ClusterNode sender = new ClusterNodeImpl(
UUID.randomUUID(),
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 259f6f4113b..7f41d7f161c 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -34,15 +34,14 @@ import org.apache.ignite.raft.messages.TestMessageGroup;
public class TestIgniteRpcServer extends IgniteRpcServer {
/**
* @param clusterService Cluster service.
- * @param nodeManager Node manager.
* @param nodeOptions Node options.
* @param requestExecutor Requests executor.
*/
- public TestIgniteRpcServer(ClusterService clusterService, NodeManager
nodeManager, NodeOptions nodeOptions,
+ public TestIgniteRpcServer(ClusterService clusterService, NodeOptions
nodeOptions,
ExecutorService requestExecutor) {
super(
clusterService,
- nodeManager,
+ nodeOptions.getNodeManager(),
nodeOptions.getRaftMessagesFactory(),
requestExecutor,
new RaftServiceEventInterceptor(),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
index 66d90b25172..f4ad5118aaf 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
@@ -81,18 +81,19 @@ public class ItTxDistributedTestThreeNodesThreeReplicas
extends TxAbstractTest {
if (msg instanceof RpcRequests.AppendEntriesRequest) {
RpcRequests.AppendEntriesRequest tmp = (AppendEntriesRequest)
msg;
- if (tmp.entriesList() != null && !tmp.entriesList().isEmpty())
{
+ if (tmp.entriesList() != null && !tmp.entriesList().isEmpty()
&& tmp.data() != null) {
return true;
}
}
return false;
});
+ ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl)
igniteTransactions.begin();
+ CompletableFuture<Void> fut = accounts.recordView().upsertAsync(tx,
makeValue(1, 100.));
+
assertTrue(IgniteTestUtils.waitForCondition(() ->
server.blockedMessages(new RaftNodeId(groupId, leader)).size() == 2, 10000),
"Failed to wait for blocked messages");
- ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl)
igniteTransactions.begin();
- CompletableFuture<Void> fut = accounts.recordView().upsertAsync(tx,
makeValue(1, 100.));
// Update must complete now despite the blocked replication protocol.
assertTrue(IgniteTestUtils.waitForCondition(fut::isDone, 5_000), "The
update future is not completed within timeout");