This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b7a8ff334b IGNITE-17465 Backport JRaft improvements up to 1.3.9 - 
Fixes #1049.
b7a8ff334b is described below

commit b7a8ff334b46a781704daf66512040509434052f
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Feb 20 15:28:40 2023 +0300

    IGNITE-17465 Backport JRaft improvements up to 1.3.9 - Fixes #1049.
    
    Signed-off-by: Alexey Scherbakov <[email protected]>
---
 .../ignite/raft/jraft/core/ItCliServiceTest.java   |  18 +++
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |   5 +
 .../org/apache/ignite/raft/jraft/CliService.java   |  11 ++
 .../java/org/apache/ignite/raft/jraft/Node.java    |  15 +-
 .../ignite/raft/jraft/closure/JoinableClosure.java |   7 +-
 .../ignite/raft/jraft/core/CliServiceImpl.java     | 109 ++++++-------
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |   2 +
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  52 +++---
 .../raft/jraft/core/ReadOnlyServiceImpl.java       |  25 ++-
 .../apache/ignite/raft/jraft/core/Replicator.java  | 177 +++++++++++++++------
 .../apache/ignite/raft/jraft/entity/Checksum.java  |  18 +++
 .../apache/ignite/raft/jraft/entity/LogEntry.java  |  22 +--
 .../org/apache/ignite/raft/jraft/entity/Task.java  |  14 +-
 .../ignite/raft/jraft/rpc/InvokeContext.java       |   4 +
 .../rpc/impl/core/DefaultRaftClientService.java    |  26 ++-
 .../raft/jraft/storage/impl/LogManagerImpl.java    |   6 +-
 .../snapshot/local/LocalSnapshotStorage.java       |   2 +-
 .../jraft/storage/snapshot/remote/CopySession.java |   4 +
 .../apache/ignite/raft/jraft/util/SegmentList.java |   4 +-
 .../raft/jraft/util/StorageOptionsFactory.java     |  11 +-
 .../org/apache/ignite/raft/jraft/util/Utils.java   |  38 +++++
 .../ignite/raft/jraft/core/ReplicatorTest.java     |   4 +-
 .../ignite/raft/jraft/entity/LogEntryTest.java     |   5 +-
 .../entity/codec/BaseLogEntryCodecFactoryTest.java |  23 ++-
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |   4 +-
 25 files changed, 410 insertions(+), 196 deletions(-)

diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
index 931b155021..82589b73ae 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
@@ -23,6 +23,7 @@ import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -271,6 +272,23 @@ public class ItCliServiceTest {
         sleep(1000);
         assertEquals(Collections.singletonList(learner3.getPeerId()), 
cliService.getLearners(groupId, conf));
         assertTrue(cliService.getAliveLearners(groupId, conf).isEmpty());
+
+        TestPeer learner4 = new TestPeer(testInfo, TestUtils.INIT_PORT + 
LEARNER_PORT_STEP + 4);
+        assertTrue(cluster.startLearner(learner4));
+
+        cliService.addLearners(groupId, conf, 
Collections.singletonList(learner4.getPeerId()));
+        sleep(1000);
+        assertEquals(1, cliService.getAliveLearners(groupId, conf).size());
+        assertTrue(cliService.learner2Follower(groupId, conf, 
learner4.getPeerId()).isOk());
+
+        sleep(1000);
+        List<PeerId> currentLearners = cliService.getAliveLearners(groupId, 
conf);
+        assertFalse(currentLearners.contains(learner4.getPeerId()));
+
+        List<PeerId> currentFollowers = cliService.getPeers(groupId, conf);
+        assertTrue(currentFollowers.contains(learner4.getPeerId()));
+
+        cluster.stop(learner4.getPeerId());
     }
 
     @Test
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index b65a3f3652..0066219717 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -549,6 +549,11 @@ public class ItNodeTest {
             LOG.info("Replicator has been created {} {}", peer, val);
         }
 
+        @Override
+        public void stateChanged(final PeerId peer, final ReplicatorState 
newState) {
+            LOG.info("Replicator {} state is changed into {}.", peer, 
newState);
+        }
+
         /** {@inheritDoc} */
         @Override
         public void onError(PeerId peer, Status status) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
index 2974e3944d..1897b0e356 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
@@ -87,6 +87,17 @@ public interface CliService extends Lifecycle<CliOptions> {
      */
     Status removeLearners(final String groupId, final Configuration conf, 
final List<PeerId> learners);
 
+    /**
+     * Converts the specified learner to follower of |conf|.
+     * return OK status when success.
+     *
+     * @param groupId  the raft group id
+     * @param conf     current configuration
+     * @param learner  learner peer
+     * @return operation status
+     */
+    Status learner2Follower(final String groupId, final Configuration conf, 
final PeerId learner);
+
     /**
      * Update learners set in the replicating group which consists of |conf|. 
return OK status when success.
      *
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
index 0592d6d3dc..078729af46 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
@@ -21,6 +21,7 @@ import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.core.NodeMetrics;
 import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.core.State;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.Task;
@@ -77,13 +78,6 @@ public interface Node extends Lifecycle<NodeOptions>, 
Describer {
      */
     boolean isLeader(final boolean blocking);
 
-    /**
-     * Shutdown local replica node.
-     *
-     * @param done callback
-     */
-    void shutdown(final Closure done);
-
     /**
      * Block the thread until the node is successfully stopped.
      *
@@ -310,6 +304,13 @@ public interface Node extends Lifecycle<NodeOptions>, 
Describer {
      */
     int getNodeTargetPriority();
 
+    /**
+     * Get the node's state.
+     *
+     * @return node's state.
+     */
+    State getNodeState();
+
     /**
      * Get the node's current term.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
index 1e763059e7..d81ae74b5d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
@@ -37,8 +37,11 @@ public class JoinableClosure implements Closure {
 
     @Override
     public void run(final Status status) {
-        this.closure.run(status);
-        latch.countDown();
+        try {
+            this.closure.run(status);
+        } finally {
+            latch.countDown();
+        }
     }
 
     public void join() throws InterruptedException {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
index 02c656b40e..43d5e58c0a 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
@@ -20,6 +20,7 @@ import static java.util.stream.Collectors.toList;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -93,6 +94,36 @@ public class CliServiceImpl implements CliService {
         this.cliClientService = null;
     }
 
+    private void recordConfigurationChange(final String groupId, final 
Collection<String> oldPeersList,
+                                           final Collection<String> 
newPeersList) {
+        final Configuration oldConf = new Configuration();
+        for (final String peerIdStr : oldPeersList) {
+            final PeerId oldPeer = new PeerId();
+            oldPeer.parse(peerIdStr);
+            oldConf.addPeer(oldPeer);
+        }
+        final Configuration newConf = new Configuration();
+        for (final String peerIdStr : newPeersList) {
+            final PeerId newPeer = new PeerId();
+            newPeer.parse(peerIdStr);
+            newConf.addPeer(newPeer);
+        }
+        LOG.info("Configuration of replication group {} changed from {} to 
{}.", groupId, oldConf, newConf);
+    }
+
+    private Status checkLeaderAndConnect(final String groupId, final 
Configuration conf, final PeerId leaderId) {
+        final Status st = getLeader(groupId, conf, leaderId);
+        if (!st.isOk()) {
+            return st;
+        }
+
+        if (!this.cliClientService.connect(leaderId)) {
+            return new Status(-1, "Fail to init channel to leader %s", 
leaderId);
+        }
+
+        return Status.OK();
+    }
+
     @Override
     public Status addPeer(final String groupId, final Configuration conf, 
final PeerId peer) {
         Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
@@ -100,15 +131,11 @@ public class CliServiceImpl implements CliService {
         Requires.requireNonNull(peer, "Null peer");
 
         final PeerId leaderId = new PeerId();
-        final Status st = getLeader(groupId, conf, leaderId);
+        final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
         if (!st.isOk()) {
             return st;
         }
 
-        if (!this.cliClientService.connect(leaderId)) {
-            return new Status(-1, "Fail to init channel to leader %s", 
leaderId);
-        }
-
         AddPeerRequest req = cliOptions.getRaftMessagesFactory()
             .addPeerRequest()
             .groupId(groupId)
@@ -120,20 +147,7 @@ public class CliServiceImpl implements CliService {
             final Message result = this.cliClientService.addPeer(leaderId, 
req, null).get();
             if (result instanceof AddPeerResponse) {
                 final AddPeerResponse resp = (AddPeerResponse) result;
-                final Configuration oldConf = new Configuration();
-                for (final String peerIdStr : resp.oldPeersList()) {
-                    final PeerId oldPeer = new PeerId();
-                    oldPeer.parse(peerIdStr);
-                    oldConf.addPeer(oldPeer);
-                }
-                final Configuration newConf = new Configuration();
-                for (final String peerIdStr : resp.newPeersList()) {
-                    final PeerId newPeer = new PeerId();
-                    newPeer.parse(peerIdStr);
-                    newConf.addPeer(newPeer);
-                }
-
-                LOG.info("Configuration of replication group {} changed from 
{} to {}.", groupId, oldConf, newConf);
+                recordConfigurationChange(groupId, resp.oldPeersList(), 
resp.newPeersList());
                 return Status.OK();
             }
             else {
@@ -159,15 +173,11 @@ public class CliServiceImpl implements CliService {
         Requires.requireTrue(!peer.isEmpty(), "Removing peer is blank");
 
         final PeerId leaderId = new PeerId();
-        final Status st = getLeader(groupId, conf, leaderId);
+        final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
         if (!st.isOk()) {
             return st;
         }
 
-        if (!this.cliClientService.connect(leaderId)) {
-            return new Status(-1, "Fail to init channel to leader %s", 
leaderId);
-        }
-
         RemovePeerRequest req = cliOptions.getRaftMessagesFactory()
             .removePeerRequest()
             .groupId(groupId)
@@ -179,20 +189,7 @@ public class CliServiceImpl implements CliService {
             final Message result = this.cliClientService.removePeer(leaderId, 
req, null).get();
             if (result instanceof RemovePeerResponse) {
                 final RemovePeerResponse resp = (RemovePeerResponse) result;
-                final Configuration oldConf = new Configuration();
-                for (final String peerIdStr : resp.oldPeersList()) {
-                    final PeerId oldPeer = new PeerId();
-                    oldPeer.parse(peerIdStr);
-                    oldConf.addPeer(oldPeer);
-                }
-                final Configuration newConf = new Configuration();
-                for (final String peerIdStr : resp.newPeersList()) {
-                    final PeerId newPeer = new PeerId();
-                    newPeer.parse(peerIdStr);
-                    newConf.addPeer(newPeer);
-                }
-
-                LOG.info("Configuration of replication group {} changed from 
{} to {}", groupId, oldConf, newConf);
+                recordConfigurationChange(groupId, resp.oldPeersList(), 
resp.newPeersList());
                 return Status.OK();
             }
             else {
@@ -213,15 +210,11 @@ public class CliServiceImpl implements CliService {
         Requires.requireNonNull(newPeers, "Null new peers");
 
         final PeerId leaderId = new PeerId();
-        final Status st = getLeader(groupId, conf, leaderId);
+        final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
         if (!st.isOk()) {
             return st;
         }
 
-        if (!this.cliClientService.connect(leaderId)) {
-            return new Status(-1, "Fail to init channel to leader %s", 
leaderId);
-        }
-
         ChangePeersRequest req = cliOptions.getRaftMessagesFactory()
             .changePeersRequest()
             .groupId(groupId)
@@ -233,20 +226,7 @@ public class CliServiceImpl implements CliService {
             final Message result = this.cliClientService.changePeers(leaderId, 
req, null).get();
             if (result instanceof ChangePeersResponse) {
                 final ChangePeersResponse resp = (ChangePeersResponse) result;
-                final Configuration oldConf = new Configuration();
-                for (final String peerIdStr : resp.oldPeersList()) {
-                    final PeerId oldPeer = new PeerId();
-                    oldPeer.parse(peerIdStr);
-                    oldConf.addPeer(oldPeer);
-                }
-                final Configuration newConf = new Configuration();
-                for (final String peerIdStr : resp.newPeersList()) {
-                    final PeerId newPeer = new PeerId();
-                    newPeer.parse(peerIdStr);
-                    newConf.addPeer(newPeer);
-                }
-
-                LOG.info("Configuration of replication group {} changed from 
{} to {}", groupId, oldConf, newConf);
+                recordConfigurationChange(groupId, resp.oldPeersList(), 
resp.newPeersList());
                 return Status.OK();
             }
             else {
@@ -386,6 +366,15 @@ public class CliServiceImpl implements CliService {
         }
     }
 
+    @Override
+    public Status learner2Follower(final String groupId, final Configuration 
conf, final PeerId learner) {
+        Status status = removeLearners(groupId, conf, Arrays.asList(learner));
+        if (status.isOk()) {
+            status = addPeer(groupId, conf, new 
PeerId(learner.getConsistentId()));
+        }
+        return status;
+    }
+
     @Override
     public Status resetLearners(final String groupId, final Configuration 
conf, final List<PeerId> learners) {
         checkLearnersOpParams(groupId, conf, learners);
@@ -424,15 +413,11 @@ public class CliServiceImpl implements CliService {
         Requires.requireNonNull(peer, "Null peer");
 
         final PeerId leaderId = new PeerId();
-        final Status st = getLeader(groupId, conf, leaderId);
+        final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
         if (!st.isOk()) {
             return st;
         }
 
-        if (!this.cliClientService.connect(leaderId)) {
-            return new Status(-1, "Fail to init channel to leader %s", 
leaderId);
-        }
-
         TransferLeaderRequest rb = cliOptions.getRaftMessagesFactory()
             .transferLeaderRequest()
             .groupId(groupId)
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index be0f8207d9..9456e17565 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -378,6 +378,7 @@ public class FSMCallerImpl implements FSMCaller {
             if (task.committedIndex > maxCommittedIndex) {
                 maxCommittedIndex = task.committedIndex;
             }
+            task.reset();
         }
         else {
             if (maxCommittedIndex >= 0) {
@@ -438,6 +439,7 @@ public class FSMCallerImpl implements FSMCaller {
             }
             finally {
                 this.nodeMetrics.recordLatency(task.type.metricName(), 
Utils.monotonicMs() - startMs);
+                task.reset();
             }
         }
         try {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 4e2a3e5e14..4899a10ece 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -181,7 +181,6 @@ public class NodeImpl implements Node, RaftServerService {
     private BallotBox ballotBox;
     private SnapshotExecutor snapshotExecutor;
     private ReplicatorGroup replicatorGroup;
-    private final List<Closure> shutdownContinuations = new ArrayList<>();
     private RaftClientService rpcClientService;
     private ReadOnlyService readOnlyService;
 
@@ -286,7 +285,7 @@ public class NodeImpl implements Node, RaftServerService {
             if (event.shutdownLatch != null) {
                 if (!this.tasks.isEmpty()) {
                     executeApplyingTasks(this.tasks);
-                    this.tasks.clear();
+                    reset();
                 }
                 event.shutdownLatch.countDown();
                 return;
@@ -295,9 +294,16 @@ public class NodeImpl implements Node, RaftServerService {
             this.tasks.add(event);
             if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() 
|| endOfBatch) {
                 executeApplyingTasks(this.tasks);
-                this.tasks.clear();
+                reset();
             }
         }
+
+        private void reset() {
+            for (final LogEntryAndClosure task : tasks) {
+                task.reset();
+            }
+            this.tasks.clear();
+        }
     }
 
     /**
@@ -1557,18 +1563,21 @@ public class NodeImpl implements Node, 
RaftServerService {
                         final Status st = new Status(RaftError.EPERM, 
"expected_term=%d doesn't match current_term=%d",
                             task.expectedTerm, this.currTerm);
                         
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, st);
+                        task.reset();
                     }
                     continue;
                 }
                 if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
                     this.conf.isStable() ? null : this.conf.getOldConf(), 
task.done)) {
                     
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, new 
Status(RaftError.EINTERNAL, "Fail to append task."));
+                    task.reset();
                     continue;
                 }
                 // set task entry info before adding to list.
                 task.entry.getId().setTerm(this.currTerm);
                 task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
                 entries.add(task.entry);
+                task.reset();
             }
             this.logManager.appendEntries(entries, new 
LeaderStableClosure(entries));
             // update conf.first
@@ -2625,12 +2634,8 @@ public class NodeImpl implements Node, RaftServerService 
{
     }
 
     private void afterShutdown() {
-        List<Closure> savedDoneList = null;
         this.writeLock.lock();
         try {
-            if (!this.shutdownContinuations.isEmpty()) {
-                savedDoneList = new ArrayList<>(this.shutdownContinuations);
-            }
             if (this.logStorage != null) {
                 this.logStorage.shutdown();
             }
@@ -2639,11 +2644,6 @@ public class NodeImpl implements Node, RaftServerService 
{
         finally {
             this.writeLock.unlock();
         }
-        if (savedDoneList != null) {
-            for (final Closure closure : savedDoneList) {
-                
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), closure);
-            }
-        }
     }
 
     @Override
@@ -2678,11 +2678,6 @@ public class NodeImpl implements Node, RaftServerService 
{
         }
     }
 
-    @Override
-    public void shutdown() {
-        shutdown(null);
-    }
-
     public void onConfigurationChangeDone(final long term) {
         this.writeLock.lock();
         try {
@@ -3001,7 +2996,7 @@ public class NodeImpl implements Node, RaftServerService {
     }
 
     @Override
-    public void shutdown(final Closure done) {
+    public void shutdown() {
         this.writeLock.lock();
         try {
             LOG.info("Node {} shutdown, currTerm={} state={}.", getNodeId(), 
this.currTerm, this.state);
@@ -3047,20 +3042,6 @@ public class NodeImpl implements Node, RaftServerService 
{
                         }));
                 }
             }
-
-            if (this.state != State.STATE_SHUTDOWN) {
-                if (done != null) {
-                    this.shutdownContinuations.add(done);
-                }
-                return;
-            }
-
-            // This node is down, it's ok to invoke done right now. Don't 
invoke this
-            // in place to avoid the dead writeLock issue when done.Run() is 
going to acquire
-            // a writeLock which is already held by the caller
-            if (done != null) {
-                
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done);
-            }
         }
         finally {
             this.writeLock.unlock();
@@ -3775,6 +3756,11 @@ public class NodeImpl implements Node, RaftServerService 
{
         return this.targetPriority;
     }
 
+    @Override
+    public State getNodeState() {
+        return this.state;
+    }
+
     @Override
     public void describe(final Printer out) {
         // node
@@ -3835,8 +3821,8 @@ public class NodeImpl implements Node, RaftServerService {
         this.ballotBox.describe(out);
 
         // snapshotExecutor
-        out.println("snapshotExecutor: ");
         if (this.snapshotExecutor != null) {
+            out.println("snapshotExecutor: ");
             this.snapshotExecutor.describe(out);
         }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index 0ceb11796b..e6aae9f426 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -97,6 +97,14 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, 
LastAppliedLogIndex
         CountDownLatch shutdownLatch;
         long startTime;
 
+        private void reset() {
+            this.nodeId = null;
+            this.requestContext = null;
+            this.done = null;
+            this.shutdownLatch = null;
+            this.startTime = 0L;
+        }
+
         @Override
         public NodeId nodeId() {
             return nodeId;
@@ -120,7 +128,7 @@ public class ReadOnlyServiceImpl implements 
ReadOnlyService, LastAppliedLogIndex
             throws Exception {
             if (newEvent.shutdownLatch != null) {
                 executeReadIndexEvents(this.events);
-                this.events.clear();
+                reset();
                 newEvent.shutdownLatch.countDown();
                 return;
             }
@@ -128,8 +136,15 @@ public class ReadOnlyServiceImpl implements 
ReadOnlyService, LastAppliedLogIndex
             this.events.add(newEvent);
             if (this.events.size() >= 
ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
                 executeReadIndexEvents(this.events);
-                this.events.clear();
+                reset();
+            }
+        }
+
+        private void reset() {
+            for (final ReadIndexEvent event : this.events) {
+                event.reset();
             }
+            this.events.clear();
         }
     }
 
@@ -233,12 +248,14 @@ public class ReadOnlyServiceImpl implements 
ReadOnlyService, LastAppliedLogIndex
     private void resetPendingStatusError(final Status st) {
         this.lock.lock();
         try {
-            for (final List<ReadIndexStatus> statuses : 
this.pendingNotifyStatus.values()) {
+            final Iterator<List<ReadIndexStatus>> it = 
this.pendingNotifyStatus.values().iterator();
+            while (it.hasNext()) {
+                final List<ReadIndexStatus> statuses = it.next();
                 for (final ReadIndexStatus status : statuses) {
                     reportError(status, st);
                 }
+                it.remove();
             }
-            this.pendingNotifyStatus.clear();
         }
         finally {
             this.lock.unlock();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 68224939f3..baecbe8e36 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import static com.codahale.metrics.MetricRegistry.name;
 import static java.util.stream.Collectors.toList;
 
 import com.codahale.metrics.Gauge;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.closure.CatchUpClosure;
+import 
org.apache.ignite.raft.jraft.core.Replicator.ReplicatorStateListener.ReplicatorState;
 import org.apache.ignite.raft.jraft.entity.EntryMetaBuilder;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -86,6 +88,7 @@ public class Replicator implements ThreadId.OnError {
     private long timeoutNowIndex;
     private volatile long lastRpcSendTimestamp;
     private volatile long heartbeatCounter = 0;
+    private volatile long probeCounter = 0;
     private volatile long appendEntriesCounter = 0;
     private volatile long installSnapshotCounter = 0;
     protected Stat statInfo = new Stat();
@@ -122,6 +125,8 @@ public class Replicator implements ThreadId.OnError {
     // Pending response queue;
     private final PriorityQueue<RpcResponse> pendingResponses = new 
PriorityQueue<>(50);
 
+    private final String metricName;
+
     private int getAndIncrementReqSeq() {
         final int prev = this.reqSeq;
         this.reqSeq++;
@@ -141,9 +146,10 @@ public class Replicator implements ThreadId.OnError {
     }
 
     /**
-     * Replicator state
+     * Replicator internal state
      */
     public enum State {
+        Created,
         Probe, // probe follower state
         Snapshot, // installing snapshot to follower
         Replicate, // replicate logs normally
@@ -158,6 +164,8 @@ public class Replicator implements ThreadId.OnError {
         this.timerManager = replicatorOptions.getTimerManager();
         this.raftOptions = raftOptions;
         this.rpcService = replicatorOptions.getRaftRpcService();
+        this.metricName = getReplicatorMetricName(replicatorOptions);
+        setState(State.Created);
     }
 
     /**
@@ -180,6 +188,7 @@ public class Replicator implements ThreadId.OnError {
             gauges.put("next-index", (Gauge<Long>) () -> this.r.nextIndex);
             gauges.put("heartbeat-times", (Gauge<Long>) () -> 
this.r.heartbeatCounter);
             gauges.put("install-snapshot-times", (Gauge<Long>) () -> 
this.r.installSnapshotCounter);
+            gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
             gauges.put("append-entries-times", (Gauge<Long>) () -> 
this.r.appendEntriesCounter);
             return gauges;
         }
@@ -198,7 +207,8 @@ public class Replicator implements ThreadId.OnError {
     enum ReplicatorEvent {
         CREATED, // created
         ERROR, // error
-        DESTROYED // destroyed
+        DESTROYED , // destroyed
+        STATE_CHANGED; // state changed.
     }
 
     /**
@@ -206,6 +216,27 @@ public class Replicator implements ThreadId.OnError {
      * when replicator created, destroyed or had some errors.
      */
     public interface ReplicatorStateListener {
+        /**
+         * Represents state changes in the replicator.
+         */
+        enum ReplicatorState {
+            /**
+             * The replicator is created.
+             */
+            CREATED,
+            /**
+             * The replicator is destroyed.
+             */
+            DESTROYED,
+            /**
+             * The replicator begins to do it's job (replicating logs or 
installing snapshot).
+             */
+            ONLINE,
+            /**
+             * The replicator is suspended by raft error or lost connection.
+             */
+            OFFLINE
+        }
 
         /**
          * Called when this replicator has been created.
@@ -228,10 +259,22 @@ public class Replicator implements ThreadId.OnError {
          * @param peer replicator related peerId
          */
         void onDestroyed(final PeerId peer);
+
+        /**
+         * Called when the replicator state is changed. See {@link 
ReplicatorState}
+         * @param peer the replicator's peer id.
+         * @param newState the new replicator state.
+         */
+        default void stateChanged(final PeerId peer, final ReplicatorState 
newState) {}
+    }
+
+    private static void notifyReplicatorStatusListener(final Replicator 
replicator, final ReplicatorEvent event,
+            final Status status) {
+        notifyReplicatorStatusListener(replicator, event, status, null);
     }
 
     /**
-     * Notify replicator event(such as created, error, destroyed) to 
replicatorStateListener which is implemented by
+     * Notify replicator event (such as created, error, destroyed) to 
replicatorStateListener which is implemented by
      * users.
      *
      * @param replicator replicator object
@@ -239,7 +282,7 @@ public class Replicator implements ThreadId.OnError {
      * @param status replicator's error detailed status
      */
     private static void notifyReplicatorStatusListener(final Replicator 
replicator, final ReplicatorEvent event,
-        final Status status) {
+        final Status status, final ReplicatorState newState) {
         final ReplicatorOptions replicatorOpts = 
Requires.requireNonNull(replicator.getOpts(), "replicatorOptions");
         final Node node = Requires.requireNonNull(replicatorOpts.getNode(), 
"node");
         final PeerId peer = 
Requires.requireNonNull(replicatorOpts.getPeerId(), "peer");
@@ -259,6 +302,8 @@ public class Replicator implements ThreadId.OnError {
                         case DESTROYED:
                             
Utils.runInThread(replicatorOpts.getCommonExecutor(), () -> 
listener.onDestroyed(peer));
                             break;
+                        case STATE_CHANGED:
+                            
Utils.runInThread(replicatorOpts.getCommonExecutor(), () ->  
listener.stateChanged(peer, newState));
                         default:
                             break;
                     }
@@ -387,14 +432,36 @@ public class Replicator implements ThreadId.OnError {
         return this.inflights;
     }
 
-    @OnlyForTest
     State getState() {
         return this.state;
     }
 
-    @OnlyForTest
     void setState(final State state) {
+        State oldState = this.state;
         this.state = state;
+
+        if (oldState != state) {
+            ReplicatorState newState = null;
+            switch (state) {
+                case Created:
+                    newState = ReplicatorState.CREATED;
+                    break;
+                case Replicate:
+                case Snapshot:
+                    newState = ReplicatorState.ONLINE;
+                    break;
+                case Probe:
+                    newState = ReplicatorState.OFFLINE;
+                    break;
+                case Destroyed:
+                    newState = ReplicatorState.DESTROYED;
+                    break;
+            }
+
+            if (newState != null) {
+                notifyReplicatorStatusListener(this, 
ReplicatorEvent.STATE_CHANGED, null, newState);
+            }
+        }
     }
 
     @OnlyForTest
@@ -481,7 +548,7 @@ public class Replicator implements ThreadId.OnError {
         final int seq, final Future<Message> rpcInfly) {
         this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, 
rpcInfly);
         this.inflights.add(this.rpcInFly);
-        this.nodeMetrics.recordSize("replicate-inflights-count", 
this.inflights.size());
+        this.nodeMetrics.recordSize(name(this.metricName, 
"replicate-inflights-count"), this.inflights.size());
     }
 
     /**
@@ -522,22 +589,27 @@ public class Replicator implements ThreadId.OnError {
     }
 
     void installSnapshot() {
-        if (this.state == State.Snapshot) {
+        if (getState() == State.Snapshot) {
             LOG.warn("Replicator {} is installing snapshot, ignore the new 
request.", this.options.getPeerId());
-            this.id.unlock();
+            unlockId();
             return;
         }
         boolean doUnlock = true;
+        if (!rpcService.connect(options.getPeerId())) {
+            LOG.error("Fail to check install snapshot connection to peer={}, 
give up to send install snapshot request.", options.getPeerId());
+            block(Utils.nowMs(), RaftError.EHOSTDOWN.getNumber());
+            return;
+        }
         try {
             Requires.requireTrue(this.reader == null,
                 "Replicator %s already has a snapshot reader, current state is 
%s", this.options.getPeerId(),
-                this.state);
+                getState());
             this.reader = this.options.getSnapshotStorage().open();
             if (this.reader == null) {
                 final NodeImpl node = this.options.getNode();
                 final RaftException error = new 
RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                 error.setStatus(new Status(RaftError.EIO, "Fail to open 
snapshot"));
-                this.id.unlock();
+                unlockId();
                 doUnlock = false;
                 node.onError(error);
                 return;
@@ -548,7 +620,7 @@ public class Replicator implements ThreadId.OnError {
                 final RaftException error = new 
RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                 error.setStatus(new Status(RaftError.EIO, "Fail to generate 
uri for snapshot reader"));
                 releaseReader();
-                this.id.unlock();
+                unlockId();
                 doUnlock = false;
                 node.onError(error);
                 return;
@@ -560,7 +632,7 @@ public class Replicator implements ThreadId.OnError {
                 final RaftException error = new 
RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                 error.setStatus(new Status(RaftError.EIO, "Fail to load meta 
from %s", snapshotPath));
                 releaseReader();
-                this.id.unlock();
+                unlockId();
                 doUnlock = false;
                 node.onError(error);
                 return;
@@ -580,7 +652,7 @@ public class Replicator implements ThreadId.OnError {
             this.statInfo.lastLogIncluded = meta.lastIncludedIndex();
             this.statInfo.lastTermIncluded = meta.lastIncludedTerm();
 
-            this.state = State.Snapshot;
+            setState(State.Snapshot);
             // noinspection NonAtomicOperationOnVolatileField
             this.installSnapshotCounter++;
             final long monotonicSendTimeMs = Utils.monotonicMs();
@@ -598,7 +670,7 @@ public class Replicator implements ThreadId.OnError {
         }
         finally {
             if (doUnlock) {
-                this.id.unlock();
+                unlockId();
             }
         }
     }
@@ -643,7 +715,7 @@ public class Replicator implements ThreadId.OnError {
         if (!success) {
             //should reset states
             r.resetInflights();
-            r.state = State.Probe;
+            r.setState(State.Probe);
             r.block(Utils.nowMs(), status.getCode());
             return false;
         }
@@ -653,7 +725,7 @@ public class Replicator implements ThreadId.OnError {
             r.sendTimeoutNow(false, false);
         }
         // id is unlock in _send_entriesheartbeatCounter
-        r.state = State.Replicate;
+        r.setState(State.Replicate);
         return true;
     }
 
@@ -714,8 +786,8 @@ public class Replicator implements ThreadId.OnError {
                 this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
                 this.statInfo.firstLogIndex = this.nextIndex;
                 this.statInfo.lastLogIndex = this.nextIndex - 1;
-                this.appendEntriesCounter++;
-                this.state = State.Probe;
+                this.probeCounter++;
+                setState(State.Probe);
                 final int stateVersion = this.version;
                 final int seq = getAndIncrementReqSeq();
                 final Future<Message> rpcFuture = 
this.rpcService.appendEntries(this.options.getPeerId(),
@@ -733,7 +805,7 @@ public class Replicator implements ThreadId.OnError {
                 .getNodeId(), this.options.getPeerId(), 
this.options.getTerm(), request.committedIndex());
         }
         finally {
-            this.id.unlock();
+            unlockId();
         }
     }
 
@@ -798,9 +870,8 @@ public class Replicator implements ThreadId.OnError {
         final MetricRegistry metricRegistry = 
opts.getNode().getNodeMetrics().getMetricRegistry();
         if (metricRegistry != null) {
             try {
-                final String replicatorMetricName = 
getReplicatorMetricName(opts);
-                if (!metricRegistry.getNames().contains(replicatorMetricName)) 
{
-                    metricRegistry.register(replicatorMetricName, new 
ReplicatorMetricSet(opts, r));
+                if (!metricRegistry.getNames().contains(r.metricName)) {
+                    metricRegistry.register(r.metricName, new 
ReplicatorMetricSet(opts, r));
                 }
             }
             catch (final IllegalArgumentException e) {
@@ -817,11 +888,11 @@ public class Replicator implements ThreadId.OnError {
         r.lastRpcSendTimestamp = Utils.monotonicMs();
         r.startHeartbeatTimer(Utils.nowMs());
         // id.unlock in sendEmptyEntries
-        r.sendEmptyEntries(false);
+        r.sendProbeRequest();
         return r.id;
     }
 
-    private static String getReplicatorMetricName(final ReplicatorOptions 
opts) {
+    private String getReplicatorMetricName(final ReplicatorOptions opts) {
         return "replicator-" + opts.getNode().getGroupId() + "/" + 
opts.getPeerId();
     }
 
@@ -853,7 +924,7 @@ public class Replicator implements ThreadId.OnError {
 
     @Override
     public String toString() {
-        return "Replicator [state=" + this.state + ", statInfo=" + 
this.statInfo + ", peerId="
+        return "Replicator [state=" + getState() + ", statInfo=" + 
this.statInfo + ", peerId="
             + this.options.getPeerId() + ", type=" + 
this.options.getReplicatorType() + "]";
     }
 
@@ -903,7 +974,7 @@ public class Replicator implements ThreadId.OnError {
             // _next_index otherwise the replicator is likely waits in         
   executor.shutdown();
             // _wait_more_entries and no further logs would be replicated even 
if the
             // last_index of this followers is less than |next_index - 1|
-            r.sendEmptyEntries(false);
+            r.sendProbeRequest();
         }
         else if (errCode != RaftError.ESTOP.getNumber()) {
             // id is unlock in _send_entries
@@ -929,7 +1000,7 @@ public class Replicator implements ThreadId.OnError {
         // fine now.
         if (this.blockTimer != null) {
             // already in blocking state,return immediately.
-            this.id.unlock();
+            unlockId();
             return;
         }
         final long dueTime = startTimeMs + 
this.options.getDynamicHeartBeatTimeoutMs();
@@ -938,13 +1009,13 @@ public class Replicator implements ThreadId.OnError {
             this.blockTimer = this.timerManager.schedule(() -> 
onBlockTimeout(this.id, this.options.getCommonExecutor()),
                 dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS);
             this.statInfo.runningState = RunningState.BLOCKING;
-            this.id.unlock();
+            unlockId();
         }
         catch (final Exception e) {
             this.blockTimer = null;
             LOG.error("Fail to add timer", e);
             // id unlock in sendEmptyEntries.
-            sendEmptyEntries(false);
+            sendProbeRequest();
         }
     }
 
@@ -1066,9 +1137,9 @@ public class Replicator implements ThreadId.OnError {
         // Unregister replicator metric set
         if (this.nodeMetrics.isEnabled()) {
             this.nodeMetrics.getMetricRegistry() //
-                
.removeMatching(MetricFilter.startsWith(getReplicatorMetricName(this.options)));
+                .removeMatching(MetricFilter.startsWith(this.metricName));
         }
-        this.state = State.Destroyed;
+        setState(State.Destroyed);
         notifyReplicatorStatusListener((Replicator) savedId.getData(), 
ReplicatorEvent.DESTROYED);
         savedId.unlockAndDestroy();
         // Avoid nulling id because it's used to sync replicator state on 
destroy.
@@ -1117,7 +1188,7 @@ public class Replicator implements ThreadId.OnError {
                         .append(status);
                     LOG.debug(sb.toString());
                 }
-                r.state = State.Probe;
+                r.setState(State.Probe);
                 notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, 
status);
                 if (++r.consecutiveErrorTimes % 10 == 0) {
                     LOG.warn("Fail to issue RPC to {}, 
consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
@@ -1154,7 +1225,7 @@ public class Replicator implements ThreadId.OnError {
                 }
                 LOG.warn("Heartbeat to peer {} failure, try to send a probe 
request.", r.options.getPeerId());
                 doUnlock = false;
-                r.sendEmptyEntries(false);
+                r.sendProbeRequest();
                 r.startHeartbeatTimer(startTimeMs);
                 return;
             }
@@ -1201,8 +1272,8 @@ public class Replicator implements ThreadId.OnError {
             LOG.warn("Too many pending responses {} for replicator {}, 
maxReplicatorInflightMsgs={}",
                 holdingQueue.size(), r.options.getPeerId(), 
r.raftOptions.getMaxReplicatorInflightMsgs());
             r.resetInflights();
-            r.state = State.Probe;
-            r.sendEmptyEntries(false);
+            r.setState(State.Probe);
+            r.sendProbeRequest();
             return;
         }
 
@@ -1255,7 +1326,7 @@ public class Replicator implements ThreadId.OnError {
                         "Replicator {} response sequence out of order, expect 
{}, but it is {}, reset state to try again.",
                         r, inflight.seq, queuedPipelinedResponse.seq);
                     r.resetInflights();
-                    r.state = State.Probe;
+                    r.setState(State.Probe);
                     continueSendEntries = false;
                     r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
                     return;
@@ -1320,9 +1391,9 @@ public class Replicator implements ThreadId.OnError {
                 "Replicator {} received invalid AppendEntriesResponse, 
in-flight startIndex={}, request prevLogIndex={}, reset the replicator state 
and probe again.",
                 r, inflight.startIndex, request.prevLogIndex());
             r.resetInflights();
-            r.state = State.Probe;
+            r.setState(State.Probe);
             // unlock id in sendEmptyEntries
-            r.sendEmptyEntries(false);
+            r.sendProbeRequest();
             return false;
         }
         // record metrics
@@ -1365,7 +1436,7 @@ public class Replicator implements ThreadId.OnError {
                     r.consecutiveErrorTimes, status);
             }
             r.resetInflights();
-            r.state = State.Probe;
+            r.setState(State.Probe);
             // unlock in in block
             r.block(startTimeMs, status.getCode());
             return false;
@@ -1416,7 +1487,7 @@ public class Replicator implements ThreadId.OnError {
                 }
             }
             // dummy_id is unlock in _send_heartbeat
-            r.sendEmptyEntries(false);
+            r.sendProbeRequest();
             return false;
         }
         if (isLogDebugEnabled) {
@@ -1426,7 +1497,7 @@ public class Replicator implements ThreadId.OnError {
         // success
         if (response.term() != r.options.getTerm()) {
             r.resetInflights();
-            r.state = State.Probe;
+            r.setState(State.Probe);
             LOG.error("Fail, response term {} dismatch, expect term {}", 
response.term(), r.options.getTerm());
             id.unlock();
             return false;
@@ -1446,7 +1517,7 @@ public class Replicator implements ThreadId.OnError {
             }
         }
 
-        r.state = State.Replicate;
+        r.setState(State.Replicate);
         r.blockTimer = null;
         r.nextIndex += entriesSize;
         r.hasSucceeded = true;
@@ -1497,7 +1568,7 @@ public class Replicator implements ThreadId.OnError {
             this.statInfo.runningState = RunningState.IDLE;
         }
         finally {
-            this.id.unlock();
+            unlockId();
         }
     }
 
@@ -1527,7 +1598,7 @@ public class Replicator implements ThreadId.OnError {
         }
         finally {
             if (doUnlock) {
-                this.id.unlock();
+                unlockId();
             }
         }
 
@@ -1606,6 +1677,7 @@ public class Replicator implements ThreadId.OnError {
         final long monotonicSendTimeMs = Utils.monotonicMs();
         final int seq = getAndIncrementReqSeq();
 
+        this.appendEntriesCounter++;
         Future<Message> rpcFuture = null;
         try {
             rpcFuture = 
this.rpcService.appendEntries(this.options.getPeerId(), request, -1,
@@ -1647,6 +1719,10 @@ public class Replicator implements ThreadId.OnError {
         r.sendEmptyEntries(true, closure);
     }
 
+    private void sendProbeRequest() {
+        sendEmptyEntries(false);
+    }
+
     private static void sendHeartbeat(final ThreadId id) {
         final Replicator r = (Replicator) id.lock();
         if (r == null) {
@@ -1683,7 +1759,7 @@ public class Replicator implements ThreadId.OnError {
         }
         finally {
             if (unlockId) {
-                this.id.unlock();
+                unlockId();
             }
         }
 
@@ -1793,7 +1869,7 @@ public class Replicator implements ThreadId.OnError {
         // Register log_index so that _on_rpc_return trigger
         // _send_timeout_now if _next_index reaches log_index
         this.timeoutNowIndex = logIndex;
-        this.id.unlock();
+        unlockId();
         return true;
     }
 
@@ -1830,4 +1906,11 @@ public class Replicator implements ThreadId.OnError {
         return nextIdx;
     }
 
+    private void unlockId() {
+        if (this.id == null) {
+            return;
+        }
+        this.id.unlock();
+    }
+
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java
index 87d148e072..19c3519afc 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.raft.jraft.entity;
 
+import java.util.Collection;
+
 /**
  * Checksum for entity.
  */
@@ -38,4 +40,20 @@ public interface Checksum {
     default long checksum(final long v1, final long v2) {
         return v1 ^ v2;
     }
+
+    /**
+     * Returns the checksum value of act on factors.
+     *
+     * @param factors checksum collection
+     * @param v origin checksum
+     * @return checksum value
+     */
+    default long checksum(final Collection<? extends Checksum> factors, long 
v) {
+        if (factors != null && !factors.isEmpty()) {
+            for (final Checksum factor : factors) {
+                v = checksum(v, factor.checksum());
+            }
+        }
+        return v;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
index e9b010c85d..8022e24bd1 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
@@ -17,7 +17,6 @@
 package org.apache.ignite.raft.jraft.entity;
 
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.raft.jraft.util.CrcUtil;
 
@@ -25,6 +24,8 @@ import org.apache.ignite.raft.jraft.util.CrcUtil;
  * A replica log entry.
  */
 public class LogEntry implements Checksum {
+    public static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
+
     /** entry type */
     private EnumOutter.EntryType type;
     /** log id with index/term */
@@ -38,7 +39,7 @@ public class LogEntry implements Checksum {
     /** log entry old learners */
     private List<PeerId> oldLearners;
     /** entry data */
-    private ByteBuffer data;
+    private ByteBuffer data = EMPTY_DATA;
     /** checksum for log entry */
     private long checksum;
     /** true when the log has checksum **/
@@ -77,25 +78,16 @@ public class LogEntry implements Checksum {
     @Override
     public long checksum() {
         long c = checksum(this.type.getNumber(), this.id.checksum());
-        c = checksumPeers(this.peers, c);
-        c = checksumPeers(this.oldPeers, c);
-        c = checksumPeers(this.learners, c);
-        c = checksumPeers(this.oldLearners, c);
+        c = checksum(this.peers, c);
+        c = checksum(this.oldPeers, c);
+        c = checksum(this.learners, c);
+        c = checksum(this.oldLearners, c);
         if (this.data != null && this.data.hasRemaining()) {
             c = checksum(c, CrcUtil.crc64(this.data));
         }
         return c;
     }
 
-    private long checksumPeers(final Collection<PeerId> peers, long c) {
-        if (peers != null && !peers.isEmpty()) {
-            for (final PeerId peer : peers) {
-                c = checksum(c, peer.checksum());
-            }
-        }
-        return c;
-    }
-
     /**
      * Returns whether the log entry has a checksum.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java
index 89663a005e..9574acc381 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.closure.JoinableClosure;
+import org.apache.ignite.raft.jraft.util.Requires;
 
 /**
  * Basic message structure of jraft, contains:
@@ -39,7 +40,7 @@ public class Task implements Serializable {
     private static final long serialVersionUID = 2971309899898274575L;
 
     /** Associated  task data */
-    private ByteBuffer data;
+    private ByteBuffer data = LogEntry.EMPTY_DATA;;
     /** task closure, called when the data is successfully committed to the 
raft group or failures happen. */
     private Closure done;
     /**
@@ -55,7 +56,7 @@ public class Task implements Serializable {
     /**
      * Creates a task with data/done.
      */
-    public Task(ByteBuffer data, Closure done) {
+    public Task(final ByteBuffer data, final Closure done) {
         super();
         this.data = data;
         this.done = done;
@@ -64,7 +65,7 @@ public class Task implements Serializable {
     /**
      * Creates a task with data/done/expectedTerm.
      */
-    public Task(ByteBuffer data, Closure done, long expectedTerm) {
+    public Task(final ByteBuffer data, final Closure done, final long 
expectedTerm) {
         super();
         this.data = data;
         this.done = done;
@@ -75,7 +76,8 @@ public class Task implements Serializable {
         return this.data;
     }
 
-    public void setData(ByteBuffer data) {
+    public void setData(final ByteBuffer data) {
+        Requires.requireNonNull(data, "data should not be null, you can use 
LogEntry.EMPTY_DATA instead.");
         this.data = data;
     }
 
@@ -83,7 +85,7 @@ public class Task implements Serializable {
         return this.done;
     }
 
-    public void setDone(Closure done) {
+    public void setDone(final Closure done) {
         this.done = done;
     }
 
@@ -91,7 +93,7 @@ public class Task implements Serializable {
         return this.expectedTerm;
     }
 
-    public void setExpectedTerm(long expectedTerm) {
+    public void setExpectedTerm(final long expectedTerm) {
         this.expectedTerm = expectedTerm;
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
index 720260b0a4..a854fc34b3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
@@ -31,6 +31,10 @@ public class InvokeContext {
         return this.ctx.put(key, value);
     }
 
+    public Object putIfAbsent(final String key, final Object value) {
+        return this.ctx.putIfAbsent(key, value);
+    }
+
     public <T> T get(final String key) {
         return (T) this.ctx.get(key);
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
index a954cf6453..9f476c1413 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
@@ -67,13 +67,23 @@ public class DefaultRaftClientService extends 
AbstractClientService implements R
     @Override
     public Future<Message> preVote(final PeerId peerId, final 
RequestVoteRequest request,
         final RpcResponseClosure<RequestVoteResponse> done) {
-        return invokeWithDone(peerId, request, done, 
this.nodeOptions.getElectionTimeoutMs());
+
+        if (connect(peerId)) {
+            return invokeWithDone(peerId, request, done, 
this.nodeOptions.getElectionTimeoutMs());
+        }
+
+        return onConnectionFail(rpcExecutor, request, done, peerId);
     }
 
     @Override
     public Future<Message> requestVote(final PeerId peerId, final 
RequestVoteRequest request,
         final RpcResponseClosure<RequestVoteResponse> done) {
-        return invokeWithDone(peerId, request, done, 
this.nodeOptions.getElectionTimeoutMs());
+
+        if (connect(peerId)) {
+            return invokeWithDone(peerId, request, done, 
this.nodeOptions.getElectionTimeoutMs());
+        }
+
+        return onConnectionFail(rpcExecutor, request, done, peerId);
     }
 
     @Override
@@ -88,7 +98,7 @@ public class DefaultRaftClientService extends 
AbstractClientService implements R
             return invokeWithDone(peerId, request, done, timeoutMs, executor);
         }
 
-        return failedFuture(executor, request, done, peerId);
+        return onConnectionFail(executor, request, done, peerId);
     }
 
     @Override
@@ -109,7 +119,7 @@ public class DefaultRaftClientService extends 
AbstractClientService implements R
             return invokeWithDone(peerId, request, done, 
this.rpcOptions.getRpcInstallSnapshotTimeout());
         }
 
-        return failedFuture(rpcExecutor, request, done, peerId);
+        return onConnectionFail(rpcExecutor, request, done, peerId);
     }
 
     @Override
@@ -131,22 +141,22 @@ public class DefaultRaftClientService extends 
AbstractClientService implements R
      * @param peerId The Peer ID.
      * @return The future.
      */
-    private Future<Message> failedFuture(Executor executor, Message request, 
RpcResponseClosure<?> done, PeerId peerId) {
+    private Future<Message> onConnectionFail(Executor executor, Message 
request, RpcResponseClosure<?> done, PeerId peerId) {
         // fail-fast when no connection
         final CompletableFuture<Message> future = new CompletableFuture<>();
 
         executor.execute(() -> {
+            final String fmt = "Check connection[%s] fail and try to create 
new one";
             if (done != null) {
                 try {
-                    done.run(new Status(RaftError.EINTERNAL, "Check 
connection[%s] fail and try to create new one", peerId));
+                    done.run(new Status(RaftError.EINTERNAL, fmt, peerId));
                 }
                 catch (final Throwable t) {
                     LOG.error("Fail to run RpcResponseClosure, the request is 
{}.", t, request);
                 }
             }
 
-            future.completeExceptionally(new RemotingException("Check 
connection[" +
-                peerId + "] fail and try to create new one"));
+            future.completeExceptionally(new 
RemotingException(String.format(fmt, peerId)));
         });
 
         return future;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 2b898dfc52..906b573fa7 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -510,9 +510,13 @@ public class LogManagerImpl implements LogManager {
                 this.lastId = this.ab.flush();
                 setDiskId(this.lastId);
                 LogManagerImpl.this.shutDownLatch.countDown();
+                event.reset();
                 return;
             }
             final StableClosure done = event.done;
+            final EventType eventType = event.type;
+
+            event.reset();
 
             if (done.getEntries() != null && !done.getEntries().isEmpty()) {
                 this.ab.append(done);
@@ -520,7 +524,7 @@ public class LogManagerImpl implements LogManager {
             else {
                 this.lastId = this.ab.flush();
                 boolean ret = true;
-                switch (event.type) {
+                switch (eventType) {
                     case LAST_LOG_ID:
                         ((LastLogIdClosure) 
done).setLastLogId(this.lastId.copy());
                         break;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
index 6838439205..97f08d6a46 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
@@ -201,7 +201,7 @@ public class LocalSnapshotStorage implements 
SnapshotStorage {
                 }
             }
             catch (final IOException e) {
-                LOG.error("Fail to sync writer {}.", writer.getPath());
+                LOG.error("Fail to sync writer {}.", writer.getPath(), e);
                 ret = RaftError.EIO.getNumber();
                 ioe = e;
                 break;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
index cf36af4436..fe4d756d1c 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
@@ -109,6 +109,10 @@ public class CopySession implements Session {
             if (!this.finished) {
                 Utils.closeQuietly(this.outputStream);
             }
+            if (null != this.destBuf) {
+                this.destBuf.recycle();
+                this.destBuf = null;
+            }
         }
         finally {
             this.lock.unlock();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java
index 1911f9091a..44d02912a5 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java
@@ -21,11 +21,11 @@ import java.util.Collection;
 import java.util.function.Predicate;
 
 /**
- * A list implementation based on segments.Only supports removing elements 
from start or end. The list keep the elements
+ * A list implementation based on segments. Only supports removing elements 
from start or end. The list keep the elements
  * in a segment list, every segment contains at most 128 elements.
  *
  * [segment, segment, segment ...] /                 |                    \ 
segment             segment
- * segment [0, 1 ...  127]    [128, 129 ... 255]    [256, 1 ... 383]
+ * segment [0, 1 ... 127]    [128, 129 ... 255]    [256, 257 ... 383]
  */
 public class SegmentList<T> {
     private static final int SEGMENT_SHIFT = 7;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
index ae168aefb0..220e331fc1 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
@@ -113,11 +113,20 @@ public final class StorageOptionsFactory {
         // If true, missing column families will be automatically created.
         opts.setCreateMissingColumnFamilies(true);
 
-        // Number of open files that can be used by the DB.  You may need to 
increase
+        // Number of open files that can be used by the DB. You may need to 
increase
         // this if your database has a large working set. Value -1 means files 
opened
         // are always kept open.
         opts.setMaxOpenFiles(-1);
 
+        // To limit the num of LOG. Once LOG exceed this num, RocksDB will 
delete old LOG
+        // automatically.
+        opts.setKeepLogFileNum(100);
+
+        // To limit the size of WALs. Once WALs exceed this size, RocksDB will 
start
+        // forcing the flush of column families to allow deletion of some 
oldest WALs.
+        // We make it 1G as default.
+        opts.setMaxTotalWalSize(1 << 30);
+
         // The maximum number of concurrent background compactions. The 
default is 1,
         // but to fully utilize your CPU and storage you might want to 
increase this
         // to approximately number of cores in the system.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
index 6b1c208763..0a27cc4894 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
@@ -23,8 +23,11 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.StandardOpenOption;
@@ -368,6 +371,41 @@ public final class Utils {
         }
     }
 
+    /**
+     * Unmap mappedByteBuffer
+     * See 
https://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java
+     */
+    public static void unmap(final MappedByteBuffer cb) {
+        // JavaSpecVer: 1.6, 1.7, 1.8, 9, 10
+        final boolean isOldJDK = 
System.getProperty("java.specification.version", "99").startsWith("1.");
+        try {
+            if (isOldJDK) {
+                final Method cleaner = cb.getClass().getMethod("cleaner");
+                cleaner.setAccessible(true);
+                final Method clean = 
Class.forName("sun.misc.Cleaner").getMethod("clean");
+                clean.setAccessible(true);
+                clean.invoke(cleaner.invoke(cb));
+            } else {
+                Class unsafeClass;
+                try {
+                    unsafeClass = Class.forName("sun.misc.Unsafe");
+                } catch (final Exception ex) {
+                    // jdk.internal.misc.Unsafe doesn't yet have an 
invokeCleaner() method,
+                    // but that method should be added if sun.misc.Unsafe is 
removed.
+                    unsafeClass = Class.forName("jdk.internal.misc.Unsafe");
+                }
+                final Method clean = unsafeClass.getMethod("invokeCleaner", 
ByteBuffer.class);
+                clean.setAccessible(true);
+                final Field theUnsafeField = 
unsafeClass.getDeclaredField("theUnsafe");
+                theUnsafeField.setAccessible(true);
+                final Object theUnsafe = theUnsafeField.get(null);
+                clean.invoke(theUnsafe, cb);
+            }
+        } catch (final Exception ex) {
+            LOG.error("Fail to un-mapped segment file.", ex);
+        }
+    }
+
     public static String getString(final byte[] bs, final int off, final int 
len) {
         return new String(bs, off, len, StandardCharsets.UTF_8);
     }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
index 40bda369e7..5399bfda20 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
@@ -184,10 +184,10 @@ public class ReplicatorTest {
         assertNotNull(r);
         assertSame(r.getOpts(), this.opts);
         Set<String> metrics = 
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
-        assertEquals(6, metrics.size());
+        assertEquals(7, metrics.size());
         r.destroy();
         metrics = 
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
-        assertEquals(1, metrics.size());
+        assertEquals(0, metrics.size());
     }
 
     private Replicator getReplicator() {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
index cab5683a92..4b7b5e70e3 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LogEntryTest {
@@ -36,7 +37,7 @@ public class LogEntryTest {
         LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
         entry.setId(new LogId(100, 3));
         entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new 
PeerId("localhost", 100, 2)));
-        assertNull(entry.getData());
+        assertSame(LogEntry.EMPTY_DATA, entry.getData());
         assertNull(entry.getOldPeers());
 
         DefaultLogEntryCodecFactory factory = 
DefaultLogEntryCodecFactory.getInstance();
@@ -55,7 +56,7 @@ public class LogEntryTest {
         assertEquals(2, nentry.getPeers().size());
         assertEquals("localhost:99:1", nentry.getPeers().get(0).toString());
         assertEquals("localhost:100:2", nentry.getPeers().get(1).toString());
-        assertNull(nentry.getData());
+        assertSame(LogEntry.EMPTY_DATA, entry.getData());
         assertNull(nentry.getOldPeers());
     }
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
index cf190abdf6..6e067c4ceb 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
@@ -30,6 +30,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -47,6 +48,24 @@ public abstract class BaseLogEntryCodecFactoryTest {
 
     protected abstract LogEntryCodecFactory newFactory();
 
+    @Test
+    public void testEmptyData() {
+        LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+        entry.setId(new LogId(100, 3));
+        entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new 
PeerId("localhost", 100, 2)));
+        entry.setData(ByteBuffer.allocate(0));
+
+        byte[] content = this.encoder.encode(entry);
+
+        assertNotNull(content);
+        assertTrue(content.length > 0);
+
+        LogEntry nentry = this.decoder.decode(content);
+        assertNotNull(nentry);
+        assertNotNull(nentry.getData());
+        assertEquals(0, nentry.getData().remaining());
+    }
+
     @Test
     public void testEncodeDecodeEmpty() {
         try {
@@ -65,7 +84,7 @@ public abstract class BaseLogEntryCodecFactoryTest {
         LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
         entry.setId(new LogId(100, 3));
         entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new 
PeerId("localhost", 100, 2)));
-        assertNull(entry.getData());
+        assertSame(LogEntry.EMPTY_DATA, entry.getData());
         assertNull(entry.getOldPeers());
 
         byte[] content = this.encoder.encode(entry);
@@ -82,7 +101,7 @@ public abstract class BaseLogEntryCodecFactoryTest {
         assertEquals(2, nentry.getPeers().size());
         assertEquals("localhost:99:1", nentry.getPeers().get(0).toString());
         assertEquals("localhost:100:2", nentry.getPeers().get(1).toString());
-        assertNull(nentry.getData());
+        assertSame(LogEntry.EMPTY_DATA, nentry.getData());
         assertNull(nentry.getOldPeers());
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 3eea7b73c1..f1699b41ab 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -247,9 +247,11 @@ public class ItRaftCommandLeftInLogUntilRestartTest 
extends AbstractBasicIntegra
                         return;
                     }
 
+                    long idx = event.committedIndex;
+
                     handler.onEvent(event, sequence, endOfBatch);
 
-                    appliedIndex.set(event.committedIndex);
+                    appliedIndex.set(idx);
                 }, exceptionHandler);
             }
         });

Reply via email to