This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ac7a6fd3c5d IGNITE-24484 RAFT heartbeat coalescing (#5221)
ac7a6fd3c5d is described below

commit ac7a6fd3c5d3f9b5adbc3ba3e422206dd16c9de0
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Mar 14 11:22:28 2025 +0300

    IGNITE-24484 RAFT heartbeat coalescing (#5221)
---
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |  21 +--
 .../apache/ignite/raft/jraft/core/TestCluster.java |  13 +-
 .../internal/raft/server/impl/JraftServerImpl.java |  17 +--
 .../org/apache/ignite/raft/jraft/JRaftUtils.java   |  13 ++
 .../org/apache/ignite/raft/jraft/NodeManager.java  | 162 ++++++++++++++++++++-
 .../apache/ignite/raft/jraft/RaftGroupService.java |  14 +-
 .../apache/ignite/raft/jraft/RaftMessageGroup.java |   6 +
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |   2 +-
 .../ignite/raft/jraft/option/NodeOptions.java      |  31 +++-
 .../rpc/{RpcClient.java => NetworkInvoker.java}    |  34 +----
 .../apache/ignite/raft/jraft/rpc/RpcClient.java    |  26 +---
 .../apache/ignite/raft/jraft/rpc/RpcRequests.java  |  10 ++
 .../raft/jraft/rpc/impl/AbstractClientService.java |  15 +-
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       |  11 +-
 .../impl/core/AppendEntriesRequestProcessor.java   |   9 +-
 .../rpc/impl/core/DefaultRaftClientService.java    |  66 ++++++++-
 .../rpc/impl/core/HeartbeatRequestProcessor.java   |  91 ++++++++++++
 .../ignite/raft/jraft/rpc/IgniteRpcTest.java       |   3 +-
 .../impl/cli/AbstractCliRequestProcessorTest.java  |   2 +-
 .../ignite/raft/jraft/test/MockAsyncContext.java   |   2 +-
 .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java |   5 +-
 ...ItTxDistributedTestThreeNodesThreeReplicas.java |   7 +-
 22 files changed, 437 insertions(+), 123 deletions(-)

diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 358c6f01f1c..8676a147bee 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -144,7 +144,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcServer;
 import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
-import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
 import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
@@ -2897,12 +2896,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         log.info("Leader stopped.");
 
         
assertTrue(cluster.getNode(peers.get(2).getPeerId()).isInstallingSnapshot());
-
-        // Wait 30 seconds to check if snapshot is still installing.
-        Thread.sleep(30_000);
-
-        // Even after 30 seconds (in reality, forever), the snapshot is still 
installing.
-        
assertTrue(cluster.getNode(peers.get(2).getPeerId()).isInstallingSnapshot());
     }
 
     /**
@@ -3234,6 +3227,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         assertThat(log2.startAsync(startComponentContext), 
willCompleteSuccessfully());
         nodeOpts.setServiceFactory(new IgniteJraftServiceFactory(log2));
         nodeOpts.setFsm(fsm);
+        nodeOpts.setNodeManager(new NodeManager(null));
 
         RaftGroupService service = createService("test", peer, nodeOpts, 
List.of());
 
@@ -3281,6 +3275,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         DefaultLogStorageFactory log2 = new DefaultLogStorageFactory(path);
         assertThat(log2.startAsync(startComponentContext), 
willCompleteSuccessfully());
         nodeOpts.setServiceFactory(new IgniteJraftServiceFactory(log2));
+        nodeOpts.setNodeManager(new NodeManager(null));
 
         RaftGroupService service = createService("test", peer, nodeOpts, 
List.of());
 
@@ -4271,12 +4266,9 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         sendTestTaskAndWait(leader);
         cluster.ensureSame();
 
-        DefaultRaftClientService rpcService = (DefaultRaftClientService) 
leader.getRpcClientService();
-        RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
-
         AtomicInteger cnt = new AtomicInteger();
 
-        rpcClientEx.blockMessages((msg, nodeId) -> {
+        
cluster.getServer(leader.getLeaderId()).getNodeOptions().getNodeManager().blockMessages((msg,
 nodeId) -> {
             assertTrue(msg instanceof RpcRequests.AppendEntriesRequest);
 
             if (cnt.get() >= 2)
@@ -4552,6 +4544,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
 
         options.setServiceFactory(new IgniteJraftServiceFactory(log));
         options.setLogUri("test");
+        options.setNodeManager(new NodeManager(null));
 
         return options;
     }
@@ -4645,8 +4638,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
             .map(p -> new NetworkAddress(TestUtils.getLocalAddress(), 
p.getPort()))
             .collect(toList());
 
-        var nodeManager = new NodeManager();
-
         ClusterService clusterService = ClusterServiceTestUtils.clusterService(
                 testInfo,
                 peer.getPort(),
@@ -4657,7 +4648,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
 
         executors.add(requestExecutor);
 
-        IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, 
nodeManager, nodeOptions, requestExecutor);
+        IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, 
nodeOptions, requestExecutor);
 
         nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
 
@@ -4665,7 +4656,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
 
         assertThat(clusterService.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-        var service = new RaftGroupService(groupId, peer.getPeerId(), 
nodeOptions, rpcServer, nodeManager) {
+        var service = new RaftGroupService(groupId, peer.getPeerId(), 
nodeOptions, rpcServer) {
             @Override public synchronized void shutdown() {
                 rpcServer.shutdown();
 
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 1e88427040a..919e47c3b34 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -260,9 +260,11 @@ public class TestCluster {
                 ));
             }
 
-            NodeManager nodeManager = new NodeManager();
-
             ClusterService clusterService = clusterService(testInfo, 
peer.getPort(), new StaticNodeFinder(addressList));
+            NodeManager nodeManager = new NodeManager(clusterService);
+
+            nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
+            nodeOptions.setNodeManager(nodeManager);
 
             var rpcClient = new IgniteRpcClient(clusterService);
 
@@ -272,16 +274,19 @@ public class TestCluster {
 
             ExecutorService requestExecutor = 
JRaftUtils.createRequestExecutor(nodeOptions);
 
-            var rpcServer = new TestIgniteRpcServer(clusterService, 
nodeManager, nodeOptions, requestExecutor);
+            var rpcServer = new TestIgniteRpcServer(clusterService, 
nodeOptions, requestExecutor);
 
             assertThat(clusterService.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
+            nodeManager.init(nodeOptions);
+
             if (optsClo != null)
                 optsClo.accept(peer.getPeerId(), nodeOptions);
 
             RaftGroupService server = new RaftGroupService(this.name, 
peer.getPeerId(),
-                nodeOptions, rpcServer, nodeManager) {
+                nodeOptions, rpcServer) {
                 @Override public synchronized void shutdown() {
+                    nodeManager.shutdown();
                     // This stop order is consistent with JRaftServerImpl
                     rpcServer.shutdown();
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 7ede33d53ee..6414991eb03 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -137,9 +137,6 @@ public class JraftServerImpl implements RaftServer {
      * needed to prevent concurrent start of the same raft group. */
     private final List<Object> startGroupInProgressMonitors;
 
-    /** Node manager. */
-    private final NodeManager nodeManager;
-
     /** Options. */
     private final NodeOptions opts;
 
@@ -180,7 +177,6 @@ public class JraftServerImpl implements RaftServer {
     ) {
         this.service = service;
         this.groupStoragesContextResolver = groupStoragesContextResolver;
-        this.nodeManager = new NodeManager();
         this.groupStoragesDestructionIntents = groupStoragesDestructionIntents;
 
         this.opts = opts;
@@ -277,6 +273,10 @@ public class JraftServerImpl implements RaftServer {
             opts.setSnapshotTimer(JRaftUtils.createTimer(opts, 
"JRaft-SnapshotTimer"));
         }
 
+        if (opts.getNodeManager() == null) {
+            opts.setNodeManager(new NodeManager(service));
+        }
+
         requestExecutor = Executors.newFixedThreadPool(
                 opts.getRaftRpcThreadPoolSize(),
                 IgniteThreadFactory.create(opts.getServerName(), 
"JRaft-Request-Processor", LOG, PROCESS_RAFT_REQ)
@@ -284,7 +284,7 @@ public class JraftServerImpl implements RaftServer {
 
         rpcServer = new IgniteRpcServer(
                 service,
-                nodeManager,
+                opts.getNodeManager(),
                 opts.getRaftMessagesFactory(),
                 requestExecutor,
                 serviceEventInterceptor,
@@ -353,6 +353,7 @@ public class JraftServerImpl implements RaftServer {
         }
 
         rpcServer.init(null);
+        opts.getNodeManager().init(opts);
 
         return 
completeRaftGroupStoragesDestruction(componentContext.executor());
     }
@@ -363,6 +364,7 @@ public class JraftServerImpl implements RaftServer {
         assert nodes.isEmpty() : IgniteStringFormatter.format("Raft nodes {} 
are still running on the Ignite node {}", nodes.keySet(),
                 service.topologyService().localMember().name());
 
+        opts.getNodeManager().shutdown();
         rpcServer.shutdown();
 
         if (opts.getfSMCallerExecutorDisruptor() != null) {
@@ -516,9 +518,7 @@ public class JraftServerImpl implements RaftServer {
 
             nodeOptions.setInitialConf(new Configuration(peerIds, learnerIds));
 
-            IgniteRpcClient client = new IgniteRpcClient(service);
-
-            nodeOptions.setRpcClient(client);
+            nodeOptions.setRpcClient(new IgniteRpcClient(service));
 
             
nodeOptions.setExternallyEnforcedConfigIndex(groupOptions.externallyEnforcedConfigIndex());
 
@@ -527,7 +527,6 @@ public class JraftServerImpl implements RaftServer {
                     PeerId.fromPeer(nodeId.peer()),
                     nodeOptions,
                     rpcServer,
-                    nodeManager,
                     groupOptions.ownFsmCallerExecutorDisruptorConfig()
             );
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index 218a7ddfb49..524781cd451 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -35,6 +35,7 @@ import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.option.BootstrapOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.ThreadPoolUtil;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -259,4 +260,16 @@ public final class JRaftUtils {
 
     private JRaftUtils() {
     }
+
+    /**
+     * Is determined whether an append request is heartbeat.
+     *
+     * @param request Append entry request.
+     * @return True if the request is heartbeat.
+     */
+    public static boolean isHeartbeatRequest(AppendEntriesRequest request) {
+        // No entries and no data means a true heartbeat request.
+        // TODO refactor, adds a new flag field? 
https://issues.apache.org/jira/browse/IGNITE-14832
+        return request.entriesList() == null && request.data() == null;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java
index eb90f4e6d5a..4ebc3c180df 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java
@@ -14,24 +14,159 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.raft.jraft;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.raft.jraft.core.Scheduler;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatRequestBuilder;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcClient;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 
 /**
  * Raft nodes manager.
  */
-public class NodeManager {
+public class NodeManager implements Lifecycle<NodeOptions> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(NodeManager.class);
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     private final ConcurrentMap<NodeId, Node> nodeMap = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<String, List<Node>> groupMap = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<PeerId, Queue<Object[]>> coalesced = new 
ConcurrentHashMap<>();
+
+    /** Node options. */
+    private NodeOptions options;
+    /** Task scheduler. */
+    private Scheduler scheduler;
+    /** Rpc client. */
+    private final RpcClient rpcClient;
+    /** Message factory. */
+    private RaftMessagesFactory messagesFactory;
+    /** Predicate to block a heartbeat messages. */
+    private BiPredicate<Message, PeerId> blockPred;
+
+    public NodeManager(ClusterService service) {
+        rpcClient = new IgniteRpcClient(service);
+    }
+
+    @Override
+    public boolean init(NodeOptions opts) {
+        options = opts;
+        scheduler = opts.getScheduler();
+        messagesFactory = opts.getRaftMessagesFactory();
+
+        // TODO: IGNITE-24789 Single trigger for all RAFT heartbeat in node.
+        scheduler.schedule(this::onSentHeartbeat , 
opts.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);
+
+        return true;
+    }
+
+    @Override
+    public void shutdown() {
+        stopGuard.compareAndSet(false, true);
+
+        rpcClient.shutdown();
+    }
+
+    public void blockMessages(BiPredicate<Message, PeerId> predicate) {
+        this.blockPred = predicate;
+    }
+
+    public void stopBlock() {
+        this.blockPred = null;
+    }
+
+    /**
+     * Sends a heartbeat request.
+     */
+    private void onSentHeartbeat() {
+        for (PeerId remote : coalesced.keySet()) {
+            coalesced.computeIfPresent(remote, (peer, queue) -> {
+                if (!queue.isEmpty()) {
+                    CoalescedHeartbeatRequestBuilder builder = 
messagesFactory.coalescedHeartbeatRequest();
+                    ArrayList<AppendEntriesRequest> list = new ArrayList<>();
+                    builder.messages(list);
+
+                    Object[] req;
+
+                    List<CompletableFuture<Message>> futs = new ArrayList<>();
+                    ArrayList<Object[]> blocked = new ArrayList<>();
+
+                    while ((req = queue.poll()) != null) {
+                        var msg = (AppendEntriesRequest) req[0];
+
+                        if (blockPred != null && blockPred.test(msg, peer)) {
+                            blocked.add(req);
+
+                            continue;
+                        }
+
+                        builder.messages().add(msg);
+
+                        futs.add((CompletableFuture<Message>) req[1]);
+                    }
+
+                    queue.addAll(blocked);
+
+                    try {
+                        rpcClient.invokeAsync(peer, builder.build(), null, 
(result, err) -> {
+                            if (err != null) {
+                                for (CompletableFuture<Message> fut : futs) {
+                                    fut.completeExceptionally(err);
+                                }
+
+                                return;
+                            }
+
+                            CoalescedHeartbeatResponse resp = 
(CoalescedHeartbeatResponse) result;
+
+                            assert resp.messages().size() == futs.size();
+
+                            int i = 0;
+                            for (Message message : resp.messages()) {
+                                futs.get(i++).complete(message); // Future 
completion will trigger callbacks.
+                            }
+                        }, options.getElectionTimeoutMs() / 2);
+                    } catch (Exception e) {
+                        LOG.error("Failed to send heartbeat message to remote 
node [remote={}].", e, peer);
+
+                        for (CompletableFuture<Message> fut : futs) {
+                            fut.completeExceptionally(e);
+                        }
+                    }
+                }
+
+                return queue;
+            });
+        }
+
+        if (!stopGuard.get()) {
+            scheduler.schedule(this::onSentHeartbeat, 
options.getElectionTimeoutMs() / 4, TimeUnit.MILLISECONDS);
+        }
+    }
 
     /**
      * Adds a node.
@@ -68,11 +203,21 @@ public class NodeManager {
      */
     public boolean remove(final Node node) {
         if (this.nodeMap.remove(node.getNodeId(), node)) {
-            final List<Node> nodes = this.groupMap.get(node.getGroupId());
+            PeerId peerId = node.getNodeId().getPeerId();
+
+            for (PeerId remote : coalesced.keySet()) {
+                if (remote.equals(peerId.getConsistentId())) {
+                    coalesced.remove(remote);
+                }
+            }
+
+            List<Node> nodes = this.groupMap.get(node.getGroupId());
+
             if (nodes != null) {
                 return nodes.remove(node);
             }
         }
+
         return false;
     }
 
@@ -96,4 +241,17 @@ public class NodeManager {
     public List<Node> getAllNodes() {
         return 
this.groupMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
     }
+
+    public CompletableFuture<Message> enqueue(PeerId to, Message request) {
+        CompletableFuture<Message> fut = new CompletableFuture<>();
+
+        coalesced.computeIfAbsent(to, k -> new ConcurrentLinkedQueue<>())
+                .add(new Object[]{request, fut});
+
+        return fut;
+    }
+
+    public ConcurrentMap<PeerId, Queue<Object[]>> getCoalesced() {
+        return coalesced;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
index 260fcb4ef29..206e4f3ec0b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
@@ -16,14 +16,13 @@
  */
 package org.apache.ignite.raft.jraft;
 
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
-import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
-import org.apache.ignite.raft.jraft.option.RpcOptions;
 import org.apache.ignite.raft.jraft.rpc.RpcServer;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.jetbrains.annotations.Nullable;
@@ -74,16 +73,14 @@ public class RaftGroupService {
      * @param serverId Server id.
      * @param nodeOptions Node options.
      * @param rpcServer RPC server.
-     * @param nodeManager Node manager.
      */
     public RaftGroupService(
             final String groupId,
             final PeerId serverId,
             final NodeOptions nodeOptions,
-            final RpcServer rpcServer,
-            final NodeManager nodeManager
+            final RpcServer rpcServer
     ) {
-        this(groupId, serverId, nodeOptions, rpcServer, nodeManager, null);
+        this(groupId, serverId, nodeOptions, rpcServer, null);
     }
 
     /**
@@ -91,7 +88,6 @@ public class RaftGroupService {
      * @param serverId Server id.
      * @param nodeOptions Node options.
      * @param rpcServer RPC server.
-     * @param nodeManager Node manager.
      * @param ownFsmCallerExecutorDisruptorConfig Configuration own striped 
disruptor for FSMCaller service of raft node, {@code null}
      *      means use shared disruptor.
      */
@@ -100,15 +96,13 @@ public class RaftGroupService {
             final PeerId serverId,
             final NodeOptions nodeOptions,
             final RpcServer rpcServer,
-            final NodeManager nodeManager,
             @Nullable RaftNodeDisruptorConfiguration 
ownFsmCallerExecutorDisruptorConfig
     ) {
-        super();
         this.groupId = groupId;
         this.serverId = serverId;
         this.nodeOptions = nodeOptions;
         this.rpcServer = rpcServer;
-        this.nodeManager = nodeManager;
+        this.nodeManager = nodeOptions.getNodeManager();
         this.ownFsmCallerExecutorDisruptorConfig = 
ownFsmCallerExecutorDisruptorConfig;
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
index dccd5f84665..ba420b21555 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
@@ -172,6 +172,12 @@ public class RaftMessageGroup {
 
         /** */
         public static final short SM_ERROR_RESPONSE = 3014;
+
+        /** */
+        public static final short COALESCED_HEARTBEAT_REQUEST = 3015;
+
+        /** */
+        public static final short COALESCED_HEARTBEAT_RESPONSE = 3016;
     }
 
     /**
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index dfe59a561f2..b0fff753eb2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1763,7 +1763,7 @@ public class NodeImpl implements Node, RaftServerService {
     /**
      * ReadIndex response closure
      */
-    private static class ReadIndexHeartbeatResponseClosure extends 
RpcResponseClosureAdapter<AppendEntriesResponse> {
+    public static class ReadIndexHeartbeatResponseClosure extends 
RpcResponseClosureAdapter<AppendEntriesResponse> {
         final ReadIndexResponseBuilder respBuilder;
         final RpcResponseClosure<ReadIndexResponse> closure;
         final int quorum;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index decf522e573..0c2e0d8db30 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -17,16 +17,16 @@
 package org.apache.ignite.raft.jraft.option;
 
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
 import org.apache.ignite.internal.raft.JraftGroupEventsListener;
 import org.apache.ignite.internal.raft.Marshaller;
 import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
-import org.apache.ignite.raft.jraft.Node;import 
org.apache.ignite.raft.jraft.StateMachine;
+import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.StateMachine;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.core.ElectionPriority;
 import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
@@ -43,7 +43,8 @@ import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
-import org.apache.ignite.raft.jraft.util.timer.Timer;import 
org.jetbrains.annotations.Nullable;
+import org.apache.ignite.raft.jraft.util.timer.Timer;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Node options.
@@ -273,6 +274,9 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
 
     private RaftMetricSource raftMetrics;
 
+    /** Node manager. */
+    private NodeManager nodeManager;
+
     /**
      * Externally enforced config index.
      *
@@ -309,6 +313,24 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         this.raftMetrics = raftMetrics;
     }
 
+    /**
+     * Gets a node manager.
+     *
+     * @return Node manager.
+     */
+    public NodeManager getNodeManager() {
+        return nodeManager;
+    }
+
+    /**
+     * Sets a node manager.
+     *
+     * @param nodeManager Node manager.
+     */
+    public void setNodeManager(NodeManager nodeManager) {
+        this.nodeManager = nodeManager;
+    }
+
     /**
      * @return Stripe count.
      */
@@ -738,6 +760,7 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         nodeOptions.setStripes(this.getStripes());
         nodeOptions.setLogStripesCount(this.getLogStripesCount());
         nodeOptions.setLogYieldStrategy(this.isLogYieldStrategy());
+        nodeOptions.setNodeManager(this.getNodeManager());
 
         return nodeOptions;
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/NetworkInvoker.java
similarity index 63%
copy from 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
copy to 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/NetworkInvoker.java
index 339ba177775..3aa4f090fac 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/NetworkInvoker.java
@@ -14,36 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.raft.jraft.rpc;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.network.TopologyEventHandler;
-import org.apache.ignite.raft.jraft.Lifecycle;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.RemotingException;
-import org.apache.ignite.raft.jraft.option.RpcOptions;
 import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * Rpc invocation.
  */
-public interface RpcClient extends Lifecycle<RpcOptions> {
-    /**
-     * Check connection for given address.
-     *
-     * @param peerId target peer ID.
-     * @return true if there is a connection and the connection is active and 
writable.
-     * @deprecated // TODO asch remove IGNITE-14832
-     */
-    boolean checkConnection(PeerId peerId);
-
-    /**
-     * Register a connect event listener for the handler.
-     *
-     * @param handler The handler.
-     */
-    void registerConnectEventListener(TopologyEventHandler handler);
-
+public interface NetworkInvoker {
     /**
      * Asynchronous invocation with a callback.
      *
@@ -56,10 +38,10 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @return The future.
      */
     CompletableFuture<Message> invokeAsync(
-        PeerId peerId,
-        Object request,
-        @Nullable InvokeContext ctx,
-        InvokeCallback callback,
-        long timeoutMs
+            PeerId peerId,
+            Object request,
+            @Nullable InvokeContext ctx,
+            InvokeCallback callback,
+            long timeoutMs
     ) throws InterruptedException, RemotingException;
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
index 339ba177775..e1d01222f00 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
@@ -16,18 +16,15 @@
  */
 package org.apache.ignite.raft.jraft.rpc;
 
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.raft.jraft.Lifecycle;
 import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.error.RemotingException;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
-import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * RPC client.
  */
-public interface RpcClient extends Lifecycle<RpcOptions> {
+public interface RpcClient extends Lifecycle<RpcOptions>, NetworkInvoker {
     /**
      * Check connection for given address.
      *
@@ -43,23 +40,4 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @param handler The handler.
      */
     void registerConnectEventListener(TopologyEventHandler handler);
-
-    /**
-     * Asynchronous invocation with a callback.
-     *
-     * @param peerId target peer ID
-     * @param request request object
-     * @param ctx invoke context
-     * @param callback invoke callback
-     * @param timeoutMs timeout millisecond
-     *
-     * @return The future.
-     */
-    CompletableFuture<Message> invokeAsync(
-        PeerId peerId,
-        Object request,
-        @Nullable InvokeContext ctx,
-        InvokeCallback callback,
-        long timeoutMs
-    ) throws InterruptedException, RemotingException;
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 01d535a8bfe..a39e5ac8de5 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -245,4 +245,14 @@ public final class RpcRequests {
 
         boolean success();
     }
+
+    @Transferable(value = 
RaftMessageGroup.RpcRequestsMessageGroup.COALESCED_HEARTBEAT_REQUEST)
+    public interface CoalescedHeartbeatRequest extends Message {
+        Collection<AppendEntriesRequest> messages();
+    }
+
+    @Transferable(value = 
RaftMessageGroup.RpcRequestsMessageGroup.COALESCED_HEARTBEAT_RESPONSE)
+    public interface CoalescedHeartbeatResponse extends Message {
+        Collection<Message> messages();
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index fdd78616ba7..97b2bbd68bd 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -21,16 +21,17 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFu
 import java.net.ConnectException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;import 
java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.internal.raft.PeerUnavailableException;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.InvokeTimeoutException;
@@ -41,6 +42,7 @@ import org.apache.ignite.raft.jraft.rpc.ClientService;
 import org.apache.ignite.raft.jraft.rpc.InvokeCallback;
 import org.apache.ignite.raft.jraft.rpc.InvokeContext;
 import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.NetworkInvoker;
 import org.apache.ignite.raft.jraft.rpc.RpcClient;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
@@ -152,7 +154,7 @@ public abstract class AbstractClientService implements 
ClientService, TopologyEv
                 .build();
 
         CompletableFuture<Message> fut =
-                invokeWithDone(peerId, req, null, null, 
rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
+                invokeWithDone(peerId, req, null, null, 
rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor, this.rpcClient);
 
         return fut.thenApply(msg -> {
             ErrorResponse resp = (ErrorResponse) msg;
@@ -178,20 +180,19 @@ public abstract class AbstractClientService implements 
ClientService, TopologyEv
     public <T extends Message> CompletableFuture<Message> invokeWithDone(final 
PeerId peerId, final Message request,
         final RpcResponseClosure<T> done, final int timeoutMs,
         final Executor rpcExecutor) {
-        return invokeWithDone(peerId, request, null, done, timeoutMs, 
rpcExecutor);
+        return invokeWithDone(peerId, request, null, done, timeoutMs, 
rpcExecutor, this.rpcClient);
     }
 
     public <T extends Message> CompletableFuture<Message> invokeWithDone(final 
PeerId peerId, final Message request,
         final InvokeContext ctx,
         final RpcResponseClosure<T> done, final int timeoutMs) {
-        return invokeWithDone(peerId, request, ctx, done, timeoutMs, 
this.rpcExecutor);
+        return invokeWithDone(peerId, request, ctx, done, timeoutMs, 
this.rpcExecutor, this.rpcClient);
     }
 
     public <T extends Message> CompletableFuture<Message> invokeWithDone(final 
PeerId peerId, final Message request,
         final InvokeContext ctx,
         final RpcResponseClosure<T> done, final int timeoutMs,
-        final Executor rpcExecutor) {
-        final RpcClient rc = this.rpcClient;
+        final Executor rpcExecutor, NetworkInvoker rc) {
         final FutureImpl<Message> future = new FutureImpl<>();
         final Executor currExecutor = rpcExecutor != null ? rpcExecutor : 
this.rpcExecutor;
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index f5b397deea2..85c9bb289e2 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.rpc.impl;
 
 import static 
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
+
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,14 +27,14 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.server.impl.RaftServiceEventInterceptor;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessageHandler;
 import org.apache.ignite.internal.network.TopologyEventHandler;
+import org.apache.ignite.internal.raft.server.impl.RaftServiceEventInterceptor;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.RaftMessageGroup;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -55,6 +56,7 @@ import 
org.apache.ignite.raft.jraft.rpc.impl.cli.TransferLeaderRequestProcessor;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
 import org.apache.ignite.raft.jraft.rpc.impl.core.GetFileRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.impl.core.HeartbeatRequestProcessor;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.InstallSnapshotRequestProcessor;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.InterceptingAppendEntriesRequestProcessor;
 import org.apache.ignite.raft.jraft.rpc.impl.core.ReadIndexRequestProcessor;
@@ -110,6 +112,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new PingRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new ReadIndexRequestProcessor(rpcExecutor, 
raftMessagesFactory));
+        registerProcessor(new HeartbeatRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         // raft native cli service
         registerProcessor(new AddPeerRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new RemovePeerRequestProcessor(rpcExecutor, 
raftMessagesFactory));
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index e24f7e0f689..ca2610349ab 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -22,6 +22,7 @@ import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -376,12 +377,6 @@ public class AppendEntriesRequestProcessor extends 
NodeRequestProcessor<AppendEn
         return getOrCreatePeerRequestContext(groupId, pair, 
node).getAndIncrementSequence();
     }
 
-    private boolean isHeartbeatRequest(final AppendEntriesRequest request) {
-        // No entries and no data means a true heartbeat request.
-        // TODO refactor, adds a new flag field? 
https://issues.apache.org/jira/browse/IGNITE-14832
-        return request.entriesList() == null && request.data() == null;
-    }
-
     @Override
     public Message processRequest0(final RaftServerService service, final 
AppendEntriesRequest request,
         final RpcRequestClosure done) {
@@ -392,7 +387,7 @@ public class AppendEntriesRequestProcessor extends 
NodeRequestProcessor<AppendEn
             final String groupId = request.groupId();
             final PeerPair pair = pairOf(request.peerId(), request.serverId());
 
-            boolean isHeartbeat = isHeartbeatRequest(request);
+            boolean isHeartbeat = JRaftUtils.isHeartbeatRequest(request);
             int reqSequence = -1;
 
             if (!isHeartbeat) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
index 9f476c1413b..0f653af3300 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
@@ -19,10 +19,16 @@ package org.apache.ignite.raft.jraft.rpc.impl.core;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.Status;
+import 
org.apache.ignite.raft.jraft.core.NodeImpl.ReadIndexHeartbeatResponseClosure;
 import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.InvokeTimeoutException;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.error.RemotingException;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -44,6 +50,7 @@ import 
org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
 import org.apache.ignite.raft.jraft.rpc.impl.AbstractClientService;
+import org.apache.ignite.raft.jraft.util.Utils;
 
 /**
  * Raft rpc service.
@@ -92,15 +99,72 @@ public class DefaultRaftClientService extends 
AbstractClientService implements R
 
         // Assign an executor in round-robin fasion.
         final Executor executor = 
this.appendEntriesExecutorMap.computeIfAbsent(peerId,
-            k -> nodeOptions.getStripedExecutor().next());
+                k -> nodeOptions.getStripedExecutor().next());
 
         if (connect(peerId)) { // Replicator should be started asynchronously 
by node joined event.
+            // TODO: IGNITE-24788 Configurable coalescing for RAFT heartbeat.
+            if (JRaftUtils.isHeartbeatRequest(request) && 
!isReadIndexRequest(done)) {
+                return sendHeartbeat(peerId, request, timeoutMs, done, 
executor);
+            }
+
             return invokeWithDone(peerId, request, done, timeoutMs, executor);
         }
 
         return onConnectionFail(executor, request, done, peerId);
     }
 
+    /**
+     * Accumulates heartbeat messages to send them into the batch request.
+     *
+     * @param peerId Remote peer id.
+     * @param request Request.
+     * @param timeoutMs Timeout.
+     * @param done Done callback.
+     * @param executor Executor where the done callback is executed.
+     * @return A future with response.
+     */
+    private Future<Message> sendHeartbeat(
+            PeerId peerId,
+            AppendEntriesRequest request,
+            int timeoutMs,
+            RpcResponseClosure<AppendEntriesResponse> done,
+            Executor executor
+    ) {
+        NodeManager nodeManager = this.nodeOptions.getNodeManager();
+
+        return invokeWithDone(
+                peerId,
+                request,
+                null,
+                done,
+                timeoutMs,
+                executor,
+                (peerId1, request1, ctx, callback, timeoutMs1) ->
+                        nodeManager.enqueue(peerId, (Message) 
request1).whenComplete((res, err) -> {
+                            if (err instanceof ExecutionException) {
+                                err = new RemotingException(err);
+                            } else if (err instanceof TimeoutException) // 
Translate timeout exception.
+                            {
+                                err = new InvokeTimeoutException();
+                            }
+
+                            Throwable finalErr = err;
+
+                            // Avoid deadlocks if a closure has completed in 
the same thread.
+                            Utils.runInThread(callback.executor(), () -> 
callback.complete(res, finalErr));
+                        })
+        );
+    }
+
+    /**
+     * Checks whether it is a read index request or not.
+     * @param doneClosure Done closure.
+     * @return True if the read index request.
+     */
+    private static boolean 
isReadIndexRequest(RpcResponseClosure<AppendEntriesResponse> doneClosure) {
+        return doneClosure instanceof ReadIndexHeartbeatResponseClosure;
+    }
+
     @Override
     public Future<Message> getFile(final PeerId peerId, final GetFileRequest 
request, final int timeoutMs,
         final RpcResponseClosure<GetFileResponse> done) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java
new file mode 100644
index 00000000000..53d0a67a345
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/HeartbeatRequestProcessor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc.impl.core;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatResponseBuilder;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatRequest;
+
+/**
+ * Heartbeat request processor.
+ */
+public class HeartbeatRequestProcessor extends 
RpcRequestProcessor<CoalescedHeartbeatRequest> {
+
+    /**
+     * Constructor.
+     *
+     * @param executor Executor.
+     * @param msgFactory Message factory.
+     */
+    public HeartbeatRequestProcessor(Executor executor, RaftMessagesFactory 
msgFactory) {
+        super(executor, msgFactory);
+    }
+
+    @Override
+    public Message processRequest(CoalescedHeartbeatRequest request, 
RpcRequestClosure done) {
+        CoalescedHeartbeatResponseBuilder builder = 
msgFactory().coalescedHeartbeatResponse();
+        builder.messages(new ArrayList<>());
+
+        for (AppendEntriesRequest message : request.messages()) {
+            PeerId peerId = PeerId.parsePeer(message.peerId());
+
+            Message msg;
+
+            if (peerId == null) {
+                msg = RaftRpcFactory.DEFAULT.newResponse(
+                        msgFactory(),
+                        RaftError.EINVAL,
+                        "Fail to parse peerId: %s",
+                        message.peerId());
+            } else {
+                Node node = 
done.getRpcCtx().getNodeManager().get(message.groupId(), peerId);
+
+                if (node == null) {
+                    msg = RaftRpcFactory.DEFAULT.newResponse(
+                            msgFactory(),
+                            RaftError.ENOENT, "Peer id not found: %s, group: 
%s", message.peerId(),
+                            message.groupId());
+                } else {
+                    RaftServerService svc = (RaftServerService) node;
+
+                    msg = svc.handleAppendEntriesRequest(message, null);
+                }
+            }
+
+            builder.messages().add(msg);
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public String interest() {
+        return CoalescedHeartbeatRequest.class.getName();
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index e6594731edf..ad40007064d 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -78,10 +78,11 @@ public class IgniteRpcTest extends AbstractRpcTest {
         );
 
         NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setNodeManager(new NodeManager(service));
 
         requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
 
-        var server = new TestIgniteRpcServer(service, new NodeManager(), 
nodeOptions, requestExecutor) {
+        var server = new TestIgniteRpcServer(service, nodeOptions, 
requestExecutor) {
             @Override public void shutdown() {
                 super.shutdown();
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
index 26c43e2eb8f..ef10c3612f6 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/AbstractCliRequestProcessorTest.java
@@ -45,7 +45,7 @@ public abstract class AbstractCliRequestProcessorTest<T 
extends Message> extends
     private final String groupId = "test";
     private final String peerIdStr = "localhost:8081";
     protected MockAsyncContext asyncContext;
-    protected NodeManager nodeManager = new NodeManager();
+    protected NodeManager nodeManager = new NodeManager(null);
     protected RaftMessagesFactory msgFactory = new RaftMessagesFactory();
 
     public abstract T createRequest(String groupId, PeerId peerId);
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
index 81c8a3cbd60..afce52a38ae 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/MockAsyncContext.java
@@ -30,7 +30,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcContext;
  */
 public class MockAsyncContext implements RpcContext {
     private Object responseObject;
-    private NodeManager nodeManager = new NodeManager();
+    private NodeManager nodeManager = new NodeManager(null);
 
     private ClusterNode sender = new ClusterNodeImpl(
             UUID.randomUUID(),
diff --git 
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
 
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 259f6f4113b..7f41d7f161c 100644
--- 
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ 
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -34,15 +34,14 @@ import org.apache.ignite.raft.messages.TestMessageGroup;
 public class TestIgniteRpcServer extends IgniteRpcServer {
     /**
      * @param clusterService Cluster service.
-     * @param nodeManager Node manager.
      * @param nodeOptions Node options.
      * @param requestExecutor Requests executor.
      */
-    public TestIgniteRpcServer(ClusterService clusterService, NodeManager 
nodeManager, NodeOptions nodeOptions,
+    public TestIgniteRpcServer(ClusterService clusterService, NodeOptions 
nodeOptions,
             ExecutorService requestExecutor) {
         super(
                 clusterService,
-                nodeManager,
+                nodeOptions.getNodeManager(),
                 nodeOptions.getRaftMessagesFactory(),
                 requestExecutor,
                 new RaftServiceEventInterceptor(),
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
index 66d90b25172..f4ad5118aaf 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
@@ -81,18 +81,19 @@ public class ItTxDistributedTestThreeNodesThreeReplicas 
extends TxAbstractTest {
             if (msg instanceof RpcRequests.AppendEntriesRequest) {
                 RpcRequests.AppendEntriesRequest tmp = (AppendEntriesRequest) 
msg;
 
-                if (tmp.entriesList() != null && !tmp.entriesList().isEmpty()) 
{
+                if (tmp.entriesList() != null && !tmp.entriesList().isEmpty() 
&& tmp.data() != null) {
                     return true;
                 }
             }
             return false;
         });
 
+        ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl) 
igniteTransactions.begin();
+        CompletableFuture<Void> fut = accounts.recordView().upsertAsync(tx, 
makeValue(1, 100.));
+
         assertTrue(IgniteTestUtils.waitForCondition(() -> 
server.blockedMessages(new RaftNodeId(groupId, leader)).size() == 2, 10000),
                 "Failed to wait for blocked messages");
 
-        ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl) 
igniteTransactions.begin();
-        CompletableFuture<Void> fut = accounts.recordView().upsertAsync(tx, 
makeValue(1, 100.));
         // Update must complete now despite the blocked replication protocol.
         assertTrue(IgniteTestUtils.waitForCondition(fut::isDone, 5_000), "The 
update future is not completed within timeout");
 

Reply via email to