This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 04c159aefc IGNITE-17445 RocksDbKeyValueStorage recreates DB on start,
so data can't be found until Raft log is replayed (#1066)
04c159aefc is described below
commit 04c159aefc95fcaef728bdedcc6ff2dd4e9333a7
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Sat Sep 17 14:14:42 2022 +0300
IGNITE-17445 RocksDbKeyValueStorage recreates DB on start, so data can't be
found until Raft log is replayed (#1066)
---
.../raft/server/ItJraftCounterServerTest.java | 143 +++++++++++++--
.../raft/server/ItSimpleCounterServerTest.java | 3 +
.../raft/server/{ => counter}/CounterListener.java | 2 +-
.../server/{ => counter}/CounterSnapshotFile.java | 2 +-
.../raft/server/{ => counter}/GetValueCommand.java | 2 +-
.../{ => counter}/IncrementAndGetCommand.java | 2 +-
.../snasphot/SnapshotInMemoryStorageFactory.java | 194 +++++++++++++++++++++
.../TestWriteCommand.java} | 8 +-
.../server/snasphot/UpdateCountRaftListener.java | 104 +++++++++++
.../org/apache/ignite/raft/jraft/FSMCaller.java | 5 +
.../apache/ignite/raft/jraft/core/BallotBox.java | 2 -
.../ignite/raft/jraft/core/FSMCallerImpl.java | 5 +
.../apache/ignite/raft/jraft/core/NodeImpl.java | 28 +++
13 files changed, 474 insertions(+), 26 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 9c504ba046..dbcfd720ff 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -46,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -70,11 +72,18 @@ import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.server.counter.CounterListener;
+import org.apache.ignite.raft.server.counter.GetValueCommand;
+import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
+import org.apache.ignite.raft.server.snasphot.SnapshotInMemoryStorageFactory;
+import org.apache.ignite.raft.server.snasphot.TestWriteCommand;
+import org.apache.ignite.raft.server.snasphot.UpdateCountRaftListener;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -161,6 +170,16 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
protected void after() throws Exception {
super.after();
+ shutdownCluster();
+
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
+
+ TestUtils.assertAllJraftThreadsStopped();
+
+ LOG.info(">>>>>>>>>>>>>>> End test method: {}",
testInfo.getTestMethod().orElseThrow().getName());
+ }
+
+ private void shutdownCluster() throws Exception {
LOG.info("Start client shutdown");
Iterator<RaftGroupService> iterClients = clients.iterator();
@@ -173,6 +192,8 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
client.shutdown();
}
+ clients.clear();
+
LOG.info("Start server shutdown servers={}", servers.size());
Iterator<JraftServerImpl> iterSrv = servers.iterator();
@@ -193,20 +214,15 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
server.stop();
}
- IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
-
- TestUtils.assertAllJraftThreadsStopped();
-
- LOG.info(">>>>>>>>>>>>>>> End test method: {}",
testInfo.getTestMethod().orElseThrow().getName());
+ servers.clear();
}
/**
* Starts server.
*
- * @param idx The index.
- * @param clo Init closure.
+ * @param idx The index.
+ * @param clo Init closure.
* @param cons Node options updater.
- *
* @return Raft server instance.
*/
private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo,
Consumer<NodeOptions> cons) {
@@ -277,6 +293,8 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
startClient(COUNTER_GROUP_1);
}
+
+
/**
* Checks that the number of Disruptor threads does not depend on count
started RAFT nodes.
*/
@@ -656,9 +674,12 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
/** Tests if a starting a new group in shared pools mode doesn't increases
timer threads count. */
@Test
public void testTimerThreadsCount() {
- JraftServerImpl srv0 = startServer(0, x -> {}, opts ->
opts.setTimerPoolSize(1));
- JraftServerImpl srv1 = startServer(1, x -> {}, opts ->
opts.setTimerPoolSize(1));
- JraftServerImpl srv2 = startServer(2, x -> {}, opts ->
opts.setTimerPoolSize(1));
+ JraftServerImpl srv0 = startServer(0, x -> {
+ }, opts -> opts.setTimerPoolSize(1));
+ JraftServerImpl srv1 = startServer(1, x -> {
+ }, opts -> opts.setTimerPoolSize(1));
+ JraftServerImpl srv2 = startServer(2, x -> {
+ }, opts -> opts.setTimerPoolSize(1));
waitForTopology(srv0.clusterService(), 3, 5_000);
@@ -672,7 +693,8 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
for (int i = 0; i < groupsCnt; i++) {
int finalI = i;
futs.add(svc.submit(new Runnable() {
- @Override public void run() {
+ @Override
+ public void run() {
String grp = "counter" + finalI;
srv0.startRaftGroup(grp, listenerFactory.get(),
INITIAL_CONF, defaults());
@@ -709,6 +731,95 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
"All timer threads: " + timerThreads.toString());
}
+ /**
+ * The test shows that all committed updates are applied after a RAFT
group restart automatically.
+ * Actual data be available to read from state storage (not a state
machine) directly just after the RAFT node started.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testApplyUpdatesOutOfSnapshot() throws Exception {
+ HashMap<Integer, AtomicInteger> counters = new HashMap<>(3);
+ HashMap<Path, Integer> snapshotDataStorage = new HashMap<>(3);
+ HashMap<String, SnapshotMeta> snapshotMetaStorage = new HashMap<>(3);
+
+ for (int i = 0; i < 3; i++) {
+ AtomicInteger counter;
+
+ counters.put(i, counter = new AtomicInteger());
+
+ startServer(i, raftServer -> {
+ raftServer.startRaftGroup("test_raft_group", new
UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
+ defaults().snapshotStorageFactory(new
SnapshotInMemoryStorageFactory(snapshotMetaStorage)));
+ }, opts -> {});
+ }
+
+ var raftClient = startClient("test_raft_group");
+
+ raftClient.refreshMembers(true).get();
+ var peers = raftClient.peers();
+
+ raftClient.run(new TestWriteCommand());
+
+ assertTrue(TestUtils.waitForCondition(() -> counters.get(0).get() ==
1, 10_000));
+
+ raftClient.snapshot(peers.get(0)).get();
+
+ raftClient.run(new TestWriteCommand());
+
+ assertTrue(TestUtils.waitForCondition(() -> counters.get(1).get() ==
2, 10_000));
+
+ raftClient.snapshot(peers.get(1)).get();
+
+ raftClient.run(new TestWriteCommand());
+
+ for (AtomicInteger counter : counters.values()) {
+ assertTrue(TestUtils.waitForCondition(() -> counter.get() == 3,
10_000));
+ }
+
+ shutdownCluster();
+
+ Path peer0SnapPath = snapshotPath(peers.get(0).address());
+ Path peer1SnapPath = snapshotPath(peers.get(1).address());
+ Path peer2SnapPath = snapshotPath(peers.get(2).address());
+
+ assertEquals(1, snapshotDataStorage.get(peer0SnapPath));
+ assertEquals(2, snapshotDataStorage.get(peer1SnapPath));
+ assertNull(snapshotDataStorage.get(peer2SnapPath));
+
+ assertNotNull(snapshotMetaStorage.get(peer0SnapPath.toString()));
+ assertNotNull(snapshotMetaStorage.get(peer1SnapPath.toString()));
+ assertNull(snapshotMetaStorage.get(peer2SnapPath.toString()));
+
+ for (int i = 0; i < 3; i++) {
+ var counter = counters.get(i);
+
+ startServer(i, raftServer -> {
+ counter.set(0);
+
+ raftServer.startRaftGroup("test_raft_group", new
UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
+ defaults().snapshotStorageFactory(new
SnapshotInMemoryStorageFactory(snapshotMetaStorage)));
+ }, opts -> {});
+ }
+
+ for (AtomicInteger counter : counters.values()) {
+ assertEquals(3, counter.get());
+ }
+ }
+
+ /**
+ * Builds a snapshot path by the peer address of RAFT node.
+ *
+ * @param peerAddress Raft node peer address.
+ * @return Path to snapshot.
+ */
+ private Path snapshotPath(NetworkAddress peerAddress) {
+ int nodeId = peerAddress.port() - PORT;
+
+ return dataPath.resolve("node" + nodeId).resolve("test_raft_group" +
"_" + peerAddress.toString().replace(':', '_'))
+ .resolve("snapshot");
+ }
+
/**
* Returns {@code true} if thread is related to timers.
*
@@ -832,8 +943,8 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
* Applies increments.
*
* @param client The client
- * @param start Start element.
- * @param stop Stop element.
+ * @param start Start element.
+ * @param stop Stop element.
* @return The counter value.
* @throws Exception If failed.
*/
@@ -863,8 +974,8 @@ class ItJraftCounterServerTest extends
RaftServerAbstractTest {
* Validates state machine.
*
* @param expected Expected value.
- * @param server The server.
- * @param groupId Group id.
+ * @param server The server.
+ * @param groupId Group id.
* @return Validation result.
*/
private static boolean validateStateMachine(long expected, JraftServerImpl
server, String groupId) {
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index 679585ffbf..07ee77475a 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -42,6 +42,9 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.server.counter.CounterListener;
+import org.apache.ignite.raft.server.counter.GetValueCommand;
+import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterListener.java
similarity index 98%
rename from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
rename to
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterListener.java
index 37b99107ae..c88543437e 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
import java.io.File;
import java.io.IOException;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterSnapshotFile.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterSnapshotFile.java
similarity index 98%
rename from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterSnapshotFile.java
rename to
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterSnapshotFile.java
index d94e11c5a4..feb9346b74 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterSnapshotFile.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/CounterSnapshotFile.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
import java.io.File;
import java.io.IOException;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
similarity index 95%
copy from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
copy to
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
index 86028a1d44..01056cd886 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
import org.apache.ignite.raft.client.ReadCommand;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
similarity index 96%
rename from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
rename to
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
index 709ce6dbfe..672536beb7 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.counter;
import org.apache.ignite.raft.client.WriteCommand;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/SnapshotInMemoryStorageFactory.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/SnapshotInMemoryStorageFactory.java
new file mode 100644
index 0000000000..6f329b7ab5
--- /dev/null
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/SnapshotInMemoryStorageFactory.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server.snasphot;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+
+/**
+ * The factory for a snapshot storage, which is persisted in the map.
+ */
+public class SnapshotInMemoryStorageFactory implements SnapshotStorageFactory {
+ /** The map to store a store metadata. */
+ private final Map<String, SnapshotMeta> metaStorage;
+
+ /**
+ * The constructor.
+ *
+ * @param metaStorage Map storage for snapshot metadata.
+ */
+ public SnapshotInMemoryStorageFactory(Map<String, SnapshotMeta>
metaStorage) {
+ this.metaStorage = metaStorage;
+ }
+
+ @Override
+ public SnapshotStorage createSnapshotStorage(String uri, RaftOptions
raftOptions) {
+ return new SnapshotStorage() {
+ @Override
+ public boolean setFilterBeforeCopyRemote() {
+ return false;
+ }
+
+ @Override
+ public SnapshotWriter create() {
+ return new SnapshotWriter() {
+ @Override
+ public boolean saveMeta(SnapshotMeta meta) {
+ metaStorage.put(uri, meta);
+ return true;
+ }
+
+ @Override
+ public boolean addFile(String fileName, Message fileMeta) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeFile(String fileName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close(boolean keepDataOnError) throws
IOException {
+ // No-op.
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op.
+ }
+
+ @Override
+ public boolean init(Void opts) {
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // No-op.
+ }
+
+ @Override
+ public String getPath() {
+ return uri;
+ }
+
+ @Override
+ public Set<String> listFiles() {
+ // No files in the snapshot.
+ return Set.of();
+ }
+
+ @Override
+ public Message getFileMeta(String fileName) {
+ // No files in the snapshot.
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public SnapshotReader open() {
+ var snapMeta = metaStorage.get(uri);
+
+ if (snapMeta == null) {
+ return null;
+ }
+
+ return new SnapshotReader() {
+ @Override
+ public SnapshotMeta load() {
+ return snapMeta;
+ }
+
+ @Override
+ public String generateURIForCopy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op.
+ }
+
+ @Override
+ public boolean init(Void opts) {
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // No-op.
+ }
+
+ @Override
+ public String getPath() {
+ return uri;
+ }
+
+ @Override
+ public Set<String> listFiles() {
+ // No files in the snapshot.
+ return Set.of();
+ }
+
+ @Override
+ public Message getFileMeta(String fileName) {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public SnapshotReader copyFrom(String uri, SnapshotCopierOptions
opts) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SnapshotCopier startToCopyFrom(String uri,
SnapshotCopierOptions opts) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setSnapshotThrottle(SnapshotThrottle snapshotThrottle)
{
+
+ }
+
+ @Override
+ public boolean init(Void opts) {
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // No-op.
+ }
+ };
+ }
+}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
similarity index 82%
rename from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
rename to
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
index 86028a1d44..d0d702a082 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.server;
+package org.apache.ignite.raft.server.snasphot;
-import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
/**
- * Get a value command.
+ * Test write command.
*/
-public class GetValueCommand implements ReadCommand {
+public class TestWriteCommand implements WriteCommand {
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
new file mode 100644
index 0000000000..188443684f
--- /dev/null
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server.snasphot;
+
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The RAFT state machine counts applied write commands and stores the result
into {@link java.util.concurrent.atomic.AtomicLong} that is
+ * passed through constructor.
+ * {@see org.apache.ignite.raft.locked.TestWriteCommand}
+ */
+public class UpdateCountRaftListener implements RaftGroupListener {
+ /** The logger. */
+ private static IgniteLogger LOG =
Loggers.forClass(UpdateCountRaftListener.class);
+
+ /** Counter of received updates. */
+ private final AtomicInteger counter;
+
+ /** Storage to persist a state of the listener on snapshot. */
+ private final Map<Path, Integer> snapshotDataStorage;
+
+ /**
+ * The constructor.
+ *
+ * @param counter Counter to store amount of updates.
+ * @param snapshotDataStorage Storage for a snapshot.
+ */
+ public UpdateCountRaftListener(AtomicInteger counter, Map<Path, Integer>
snapshotDataStorage) {
+ this.counter = counter;
+ this.snapshotDataStorage = snapshotDataStorage;
+ }
+
+ @Override
+ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+ throw new UnsupportedOperationException("Read command is not supported
for the RAFT listener.");
+ }
+
+ @Override
+ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<WriteCommand> clo = iterator.next();
+
+ assert clo.command() instanceof TestWriteCommand;
+
+ int current = counter.incrementAndGet();
+
+ LOG.info("Increment value [curVal={}]", current);
+
+ clo.result(null);
+ }
+ }
+
+ @Override
+ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+ snapshotDataStorage.put(path, counter.get());
+
+ doneClo.accept(null);
+ }
+
+ @Override
+ public boolean onSnapshotLoad(Path path) {
+ counter.set(snapshotDataStorage.getOrDefault(path, 0));
+
+ return true;
+ }
+
+ @Override
+ public void onShutdown() {
+ }
+
+ @Override
+ public @Nullable CompletableFuture<Void> onBeforeApply(Command command) {
+ return null;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java
index 3d96cfc7dc..273d5f51a1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/FSMCaller.java
@@ -46,6 +46,11 @@ public interface FSMCaller extends
Lifecycle<FSMCallerOptions>, Describer {
*/
void addLastAppliedLogIndexListener(final LastAppliedLogIndexListener
listener);
+ /**
+ * Removes a LastAppliedLogIndexListener.
+ */
+ void removeLastAppliedLogIndexListener(final LastAppliedLogIndexListener
listener);
+
/**
* Called when log entry committed
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
index fd084f377f..820ed2cb62 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
@@ -175,8 +175,6 @@ public class BallotBox implements
Lifecycle<BallotBoxOptions>, Describer {
return false;
}
this.pendingIndex = newPendingIndex;
- //TODO Fix it properly:
https://issues.apache.org/jira/browse/IGNITE-17445
- this.lastCommittedIndex = newPendingIndex - 1;
this.closureQueue.resetFirstIndex(newPendingIndex);
return true;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index 82858da3e9..b95f4ca175 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -215,6 +215,11 @@ public class FSMCallerImpl implements FSMCaller {
this.lastAppliedLogIndexListeners.add(listener);
}
+ @Override
+ public void removeLastAppliedLogIndexListener(final
LastAppliedLogIndexListener listener) {
+ this.lastAppliedLogIndexListeners.remove(listener);
+ }
+
private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
if (this.shutdownLatch != null) {
// Shutting down
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 8f7d69178b..8db8fe7a4f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.FSMCaller;
+import org.apache.ignite.raft.jraft.FSMCaller.LastAppliedLogIndexListener;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
@@ -1033,6 +1034,33 @@ public class NodeImpl implements Node, RaftServerService
{
// Adds metric registry to RPC service.
this.options.setMetricRegistry(this.metrics.getMetricRegistry());
+ // Wait committed.
+ long commitIdx = logManager.getLastLogIndex();
+
+ if (commitIdx > fsmCaller.getLastAppliedIndex()) {
+ CountDownLatch applyCommitLatch = new CountDownLatch(1);
+
+ LastAppliedLogIndexListener lnsr = lastAppliedLogIndex -> {
+ if (lastAppliedLogIndex >= commitIdx) {
+ applyCommitLatch.countDown();
+ }
+ };
+
+ fsmCaller.addLastAppliedLogIndexListener(lnsr);
+
+ fsmCaller.onCommitted(commitIdx);
+
+ try {
+ applyCommitLatch.await();
+
+ fsmCaller.removeLastAppliedLogIndexListener(lnsr);
+ } catch (InterruptedException e) {
+ LOG.error("Fail to apply committed updates.", e);
+
+ return false;
+ }
+ }
+
if (!this.rpcClientService.init(this.options)) {
LOG.error("Fail to init rpc service.");
return false;