This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 04c159aefc IGNITE-17445 RocksDbKeyValueStorage recreates DB on start, 
so data can't be found until Raft log is replayed (#1066)
04c159aefc is described below

commit 04c159aefc95fcaef728bdedcc6ff2dd4e9333a7
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Sat Sep 17 14:14:42 2022 +0300

    IGNITE-17445 RocksDbKeyValueStorage recreates DB on start, so data can't be 
found until Raft log is replayed (#1066)
---
 .../raft/server/ItJraftCounterServerTest.java      | 143 +++++++++++++--
 .../raft/server/ItSimpleCounterServerTest.java     |   3 +
 .../raft/server/{ => counter}/CounterListener.java |   2 +-
 .../server/{ => counter}/CounterSnapshotFile.java  |   2 +-
 .../raft/server/{ => counter}/GetValueCommand.java |   2 +-
 .../{ => counter}/IncrementAndGetCommand.java      |   2 +-
 .../snasphot/SnapshotInMemoryStorageFactory.java   | 194 +++++++++++++++++++++
 .../TestWriteCommand.java}                         |   8 +-
 .../server/snasphot/UpdateCountRaftListener.java   | 104 +++++++++++
 .../org/apache/ignite/raft/jraft/FSMCaller.java    |   5 +
 .../apache/ignite/raft/jraft/core/BallotBox.java   |   2 -
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |   5 +
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  28 +++
 13 files changed, 474 insertions(+), 26 deletions(-)

diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 9c504ba046..dbcfd720ff 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -46,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -70,11 +72,18 @@ import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.server.counter.CounterListener;
+import org.apache.ignite.raft.server.counter.GetValueCommand;
+import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
+import org.apache.ignite.raft.server.snasphot.SnapshotInMemoryStorageFactory;
+import org.apache.ignite.raft.server.snasphot.TestWriteCommand;
+import org.apache.ignite.raft.server.snasphot.UpdateCountRaftListener;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -161,6 +170,16 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
     protected void after() throws Exception {
         super.after();
 
+        shutdownCluster();
+
+        IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
+
+        TestUtils.assertAllJraftThreadsStopped();
+
+        LOG.info(">>>>>>>>>>>>>>> End test method: {}", 
testInfo.getTestMethod().orElseThrow().getName());
+    }
+
+    private void shutdownCluster() throws Exception {
         LOG.info("Start client shutdown");
 
         Iterator<RaftGroupService> iterClients = clients.iterator();
@@ -173,6 +192,8 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
             client.shutdown();
         }
 
+        clients.clear();
+
         LOG.info("Start server shutdown servers={}", servers.size());
 
         Iterator<JraftServerImpl> iterSrv = servers.iterator();
@@ -193,20 +214,15 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
             server.stop();
         }
 
-        IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
-
-        TestUtils.assertAllJraftThreadsStopped();
-
-        LOG.info(">>>>>>>>>>>>>>> End test method: {}", 
testInfo.getTestMethod().orElseThrow().getName());
+        servers.clear();
     }
 
     /**
      * Starts server.
      *
-     * @param idx The index.
-     * @param clo Init closure.
+     * @param idx  The index.
+     * @param clo  Init closure.
      * @param cons Node options updater.
-     *
      * @return Raft server instance.
      */
     private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, 
Consumer<NodeOptions> cons) {
@@ -277,6 +293,8 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
         startClient(COUNTER_GROUP_1);
     }
 
+
+
     /**
      * Checks that the number of Disruptor threads does not depend on  count 
started RAFT nodes.
      */
@@ -656,9 +674,12 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
     /** Tests if a starting a new group in shared pools mode doesn't increases 
timer threads count. */
     @Test
     public void testTimerThreadsCount() {
-        JraftServerImpl srv0 = startServer(0, x -> {}, opts -> 
opts.setTimerPoolSize(1));
-        JraftServerImpl srv1 = startServer(1, x -> {}, opts -> 
opts.setTimerPoolSize(1));
-        JraftServerImpl srv2 = startServer(2, x -> {}, opts -> 
opts.setTimerPoolSize(1));
+        JraftServerImpl srv0 = startServer(0, x -> {
+        }, opts -> opts.setTimerPoolSize(1));
+        JraftServerImpl srv1 = startServer(1, x -> {
+        }, opts -> opts.setTimerPoolSize(1));
+        JraftServerImpl srv2 = startServer(2, x -> {
+        }, opts -> opts.setTimerPoolSize(1));
 
         waitForTopology(srv0.clusterService(), 3, 5_000);
 
@@ -672,7 +693,8 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
             for (int i = 0; i < groupsCnt; i++) {
                 int finalI = i;
                 futs.add(svc.submit(new Runnable() {
-                    @Override public void run() {
+                    @Override
+                    public void run() {
                         String grp = "counter" + finalI;
 
                         srv0.startRaftGroup(grp, listenerFactory.get(), 
INITIAL_CONF, defaults());
@@ -709,6 +731,95 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
                 "All timer threads: " + timerThreads.toString());
     }
 
+    /**
+     * The test shows that all committed updates are applied after a RAFT 
group restart automatically.
+     * Actual data be available to read from state storage (not a state 
machine) directly just after the RAFT node started.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testApplyUpdatesOutOfSnapshot() throws Exception {
+        HashMap<Integer, AtomicInteger> counters = new HashMap<>(3);
+        HashMap<Path, Integer> snapshotDataStorage = new HashMap<>(3);
+        HashMap<String, SnapshotMeta> snapshotMetaStorage = new HashMap<>(3);
+
+        for (int i = 0; i < 3; i++) {
+            AtomicInteger counter;
+
+            counters.put(i, counter = new AtomicInteger());
+
+            startServer(i, raftServer -> {
+                raftServer.startRaftGroup("test_raft_group", new 
UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
+                        defaults().snapshotStorageFactory(new 
SnapshotInMemoryStorageFactory(snapshotMetaStorage)));
+            }, opts -> {});
+        }
+
+        var raftClient = startClient("test_raft_group");
+
+        raftClient.refreshMembers(true).get();
+        var peers = raftClient.peers();
+
+        raftClient.run(new TestWriteCommand());
+
+        assertTrue(TestUtils.waitForCondition(() -> counters.get(0).get() == 
1, 10_000));
+
+        raftClient.snapshot(peers.get(0)).get();
+
+        raftClient.run(new TestWriteCommand());
+
+        assertTrue(TestUtils.waitForCondition(() -> counters.get(1).get() == 
2, 10_000));
+
+        raftClient.snapshot(peers.get(1)).get();
+
+        raftClient.run(new TestWriteCommand());
+
+        for (AtomicInteger counter : counters.values()) {
+            assertTrue(TestUtils.waitForCondition(() -> counter.get() == 3, 
10_000));
+        }
+
+        shutdownCluster();
+
+        Path peer0SnapPath = snapshotPath(peers.get(0).address());
+        Path peer1SnapPath = snapshotPath(peers.get(1).address());
+        Path peer2SnapPath = snapshotPath(peers.get(2).address());
+
+        assertEquals(1, snapshotDataStorage.get(peer0SnapPath));
+        assertEquals(2, snapshotDataStorage.get(peer1SnapPath));
+        assertNull(snapshotDataStorage.get(peer2SnapPath));
+
+        assertNotNull(snapshotMetaStorage.get(peer0SnapPath.toString()));
+        assertNotNull(snapshotMetaStorage.get(peer1SnapPath.toString()));
+        assertNull(snapshotMetaStorage.get(peer2SnapPath.toString()));
+
+        for (int i = 0; i < 3; i++) {
+            var counter = counters.get(i);
+
+            startServer(i, raftServer -> {
+                counter.set(0);
+
+                raftServer.startRaftGroup("test_raft_group", new 
UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
+                        defaults().snapshotStorageFactory(new 
SnapshotInMemoryStorageFactory(snapshotMetaStorage)));
+            }, opts -> {});
+        }
+
+        for (AtomicInteger counter : counters.values()) {
+            assertEquals(3, counter.get());
+        }
+    }
+
+    /**
+     * Builds a snapshot path by the peer address of RAFT node.
+     *
+     * @param peerAddress Raft node peer address.
+     * @return Path to snapshot.
+     */
+    private Path snapshotPath(NetworkAddress peerAddress) {
+        int nodeId = peerAddress.port() - PORT;
+
+        return dataPath.resolve("node" + nodeId).resolve("test_raft_group" + 
"_" + peerAddress.toString().replace(':', '_'))
+                .resolve("snapshot");
+    }
+
     /**
      * Returns {@code true} if thread is related to timers.
      *
@@ -832,8 +943,8 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
      * Applies increments.
      *
      * @param client The client
-     * @param start Start element.
-     * @param stop Stop element.
+     * @param start  Start element.
+     * @param stop   Stop element.
      * @return The counter value.
      * @throws Exception If failed.
      */
@@ -863,8 +974,8 @@ class ItJraftCounterServerTest extends 
RaftServerAbstractTest {
      * Validates state machine.
      *
      * @param expected Expected value.
-     * @param server The server.
-     * @param groupId Group id.
+     * @param server   The server.
+     * @param groupId  Group id.
      * @return Validation result.
      */
     private static boolean validateStateMachine(long expected, JraftServerImpl 
server, String groupId) {
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index 679585ffbf..07ee77475a 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -42,6 +42,9 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.server.counter.CounterListener;
+import org.apache.ignite.raft.server.counter.GetValueCommand;
+import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterListener.java
similarity index 98%
rename from 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
rename to 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterListener.java
index 37b99107ae..c88543437e 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
 
 import java.io.File;
 import java.io.IOException;
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterSnapshotFile.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterSnapshotFile.java
similarity index 98%
rename from 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterSnapshotFile.java
rename to 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterSnapshotFile.java
index d94e11c5a4..feb9346b74 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterSnapshotFile.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterSnapshotFile.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
 
 import java.io.File;
 import java.io.IOException;
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
similarity index 95%
copy from 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
copy to 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
index 86028a1d44..01056cd886 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
 
 import org.apache.ignite.raft.client.ReadCommand;
 
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
similarity index 96%
rename from 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
rename to 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
index 709ce6dbfe..672536beb7 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
 
 import org.apache.ignite.raft.client.WriteCommand;
 
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/SnapshotInMemoryStorageFactory.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/SnapshotInMemoryStorageFactory.java
new file mode 100644
index 0000000000..6f329b7ab5
--- /dev/null
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/SnapshotInMemoryStorageFactory.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server.snasphot;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+
+/**
+ * The factory for a snapshot storage, which is persisted in the map.
+ */
+public class SnapshotInMemoryStorageFactory implements SnapshotStorageFactory {
+    /** The map to store a store metadata. */
+    private final Map<String, SnapshotMeta> metaStorage;
+
+    /**
+     * The constructor.
+     *
+     * @param metaStorage Map storage for snapshot metadata.
+     */
+    public SnapshotInMemoryStorageFactory(Map<String, SnapshotMeta> 
metaStorage) {
+        this.metaStorage = metaStorage;
+    }
+
+    @Override
+    public SnapshotStorage createSnapshotStorage(String uri, RaftOptions 
raftOptions) {
+        return new SnapshotStorage() {
+            @Override
+            public boolean setFilterBeforeCopyRemote() {
+                return false;
+            }
+
+            @Override
+            public SnapshotWriter create() {
+                return new SnapshotWriter() {
+                    @Override
+                    public boolean saveMeta(SnapshotMeta meta) {
+                        metaStorage.put(uri, meta);
+                        return true;
+                    }
+
+                    @Override
+                    public boolean addFile(String fileName, Message fileMeta) {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public boolean removeFile(String fileName) {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public void close(boolean keepDataOnError) throws 
IOException {
+                        // No-op.
+                    }
+
+                    @Override
+                    public void close() throws IOException {
+                        // No-op.
+                    }
+
+                    @Override
+                    public boolean init(Void opts) {
+                        return true;
+                    }
+
+                    @Override
+                    public void shutdown() {
+                        // No-op.
+                    }
+
+                    @Override
+                    public String getPath() {
+                        return uri;
+                    }
+
+                    @Override
+                    public Set<String> listFiles() {
+                        // No files in the snapshot.
+                        return Set.of();
+                    }
+
+                    @Override
+                    public Message getFileMeta(String fileName) {
+                        // No files in the snapshot.
+                        return null;
+                    }
+                };
+            }
+
+            @Override
+            public SnapshotReader open() {
+                var snapMeta = metaStorage.get(uri);
+
+                if (snapMeta == null) {
+                    return null;
+                }
+
+                return new SnapshotReader() {
+                    @Override
+                    public SnapshotMeta load() {
+                        return snapMeta;
+                    }
+
+                    @Override
+                    public String generateURIForCopy() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public void close() throws IOException {
+                        // No-op.
+                    }
+
+                    @Override
+                    public boolean init(Void opts) {
+                        return true;
+                    }
+
+                    @Override
+                    public void shutdown() {
+                        // No-op.
+                    }
+
+                    @Override
+                    public String getPath() {
+                        return uri;
+                    }
+
+                    @Override
+                    public Set<String> listFiles() {
+                        // No files in the snapshot.
+                        return Set.of();
+                    }
+
+                    @Override
+                    public Message getFileMeta(String fileName) {
+                        return null;
+                    }
+                };
+            }
+
+            @Override
+            public SnapshotReader copyFrom(String uri, SnapshotCopierOptions 
opts) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public SnapshotCopier startToCopyFrom(String uri, 
SnapshotCopierOptions opts) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void setSnapshotThrottle(SnapshotThrottle snapshotThrottle) 
{
+
+            }
+
+            @Override
+            public boolean init(Void opts) {
+                return true;
+            }
+
+            @Override
+            public void shutdown() {
+                // No-op.
+            }
+        };
+    }
+}
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
similarity index 82%
rename from 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
rename to 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
index 86028a1d44..d0d702a082 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.snasphot;
 
-import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
 
 /**
- * Get a value command.
+ * Test write command.
  */
-public class GetValueCommand implements ReadCommand {
+public class TestWriteCommand implements WriteCommand {
 }
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
new file mode 100644
index 0000000000..188443684f
--- /dev/null
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server.snasphot;
+
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The RAFT state machine counts applied write commands and stores the result 
into {@link java.util.concurrent.atomic.AtomicLong} that is
+ * passed through constructor.
+ * {@see org.apache.ignite.raft.locked.TestWriteCommand}
+ */
+public class UpdateCountRaftListener implements RaftGroupListener {
+    /** The logger. */
+    private static IgniteLogger LOG = 
Loggers.forClass(UpdateCountRaftListener.class);
+
+    /** Counter of received updates. */
+    private final AtomicInteger counter;
+
+    /** Storage to persist a state of the listener on snapshot. */
+    private final Map<Path, Integer> snapshotDataStorage;
+
+    /**
+     * The constructor.
+     *
+     * @param counter             Counter to store amount of updates.
+     * @param snapshotDataStorage Storage for a snapshot.
+     */
+    public UpdateCountRaftListener(AtomicInteger counter, Map<Path, Integer> 
snapshotDataStorage) {
+        this.counter = counter;
+        this.snapshotDataStorage = snapshotDataStorage;
+    }
+
+    @Override
+    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        throw new UnsupportedOperationException("Read command is not supported 
for the RAFT listener.");
+    }
+
+    @Override
+    public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+        while (iterator.hasNext()) {
+            CommandClosure<WriteCommand> clo = iterator.next();
+
+            assert clo.command() instanceof TestWriteCommand;
+
+            int current = counter.incrementAndGet();
+
+            LOG.info("Increment value [curVal={}]", current);
+
+            clo.result(null);
+        }
+    }
+
+    @Override
+    public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+        snapshotDataStorage.put(path, counter.get());
+
+        doneClo.accept(null);
+    }
+
+    @Override
+    public boolean onSnapshotLoad(Path path) {
+        counter.set(snapshotDataStorage.getOrDefault(path, 0));
+
+        return true;
+    }
+
+    @Override
+    public void onShutdown() {
+    }
+
+    @Override
+    public @Nullable CompletableFuture<Void> onBeforeApply(Command command) {
+        return null;
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java
index 3d96cfc7dc..273d5f51a1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java
@@ -46,6 +46,11 @@ public interface FSMCaller extends 
Lifecycle<FSMCallerOptions>, Describer {
      */
     void addLastAppliedLogIndexListener(final LastAppliedLogIndexListener 
listener);
 
+    /**
+     * Removes a LastAppliedLogIndexListener.
+     */
+    void removeLastAppliedLogIndexListener(final LastAppliedLogIndexListener 
listener);
+
     /**
      * Called when log entry committed
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
index fd084f377f..820ed2cb62 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
@@ -175,8 +175,6 @@ public class BallotBox implements 
Lifecycle<BallotBoxOptions>, Describer {
                 return false;
             }
             this.pendingIndex = newPendingIndex;
-            //TODO Fix it properly: 
https://issues.apache.org/jira/browse/IGNITE-17445
-            this.lastCommittedIndex = newPendingIndex - 1;
             this.closureQueue.resetFirstIndex(newPendingIndex);
             return true;
         }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index 82858da3e9..b95f4ca175 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -215,6 +215,11 @@ public class FSMCallerImpl implements FSMCaller {
         this.lastAppliedLogIndexListeners.add(listener);
     }
 
+    @Override
+    public void removeLastAppliedLogIndexListener(final 
LastAppliedLogIndexListener listener) {
+        this.lastAppliedLogIndexListeners.remove(listener);
+    }
+
     private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
         if (this.shutdownLatch != null) {
             // Shutting down
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 8f7d69178b..8db8fe7a4f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.FSMCaller;
+import org.apache.ignite.raft.jraft.FSMCaller.LastAppliedLogIndexListener;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Node;
@@ -1033,6 +1034,33 @@ public class NodeImpl implements Node, RaftServerService 
{
         // Adds metric registry to RPC service.
         this.options.setMetricRegistry(this.metrics.getMetricRegistry());
 
+        // Wait committed.
+        long commitIdx = logManager.getLastLogIndex();
+
+        if (commitIdx > fsmCaller.getLastAppliedIndex()) {
+            CountDownLatch applyCommitLatch = new CountDownLatch(1);
+
+            LastAppliedLogIndexListener lnsr = lastAppliedLogIndex -> {
+                if (lastAppliedLogIndex >= commitIdx) {
+                    applyCommitLatch.countDown();
+                }
+            };
+
+            fsmCaller.addLastAppliedLogIndexListener(lnsr);
+
+            fsmCaller.onCommitted(commitIdx);
+
+            try {
+                applyCommitLatch.await();
+
+                fsmCaller.removeLastAppliedLogIndexListener(lnsr);
+            } catch (InterruptedException e) {
+                LOG.error("Fail to apply committed updates.", e);
+
+                return false;
+            }
+        }
+
         if (!this.rpcClientService.init(this.options)) {
             LOG.error("Fail to init rpc service.");
             return false;

Reply via email to