This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f6cab2c3b9 IGNITE-22214 Add persistence and recovery for an idempotent 
command cache (#3800)
f6cab2c3b9 is described below

commit f6cab2c3b9c5d894e449fb18add800ec23662e04
Author: Alexander Lapin <[email protected]>
AuthorDate: Thu May 23 17:48:36 2024 +0300

    IGNITE-22214 Add persistence and recovery for an idempotent command cache 
(#3800)
    
    Co-authored-by: Denis Chudov <[email protected]>
---
 check-rules/spotbugs-excludes.xml                  |  15 ++
 .../catalog/storage/UpdateLogImplTest.java         |   6 +-
 .../apache/ignite/internal/util/ArrayUtils.java    |  19 ++
 .../DistributionZoneManagerAlterFilterTest.java    |   2 +-
 ...istributionZoneManagerScaleUpScaleDownTest.java |   4 +-
 .../DistributionZoneRebalanceEngineTest.java       |  14 +-
 .../ignite/internal/metastorage/CommandId.java     |   3 +-
 .../impl/ItIdempotentCommandCacheTest.java         | 196 ++++++++++++++++-----
 .../ItMetaStorageMultipleNodesAbstractTest.java    |   2 +-
 .../metastorage/impl/ItMetaStorageServiceTest.java |   8 +-
 .../metastorage/server/KeyValueStorage.java        |  17 +-
 .../metastorage/server/WatchProcessor.java         |  16 +-
 .../server/persistence/RocksDbKeyValueStorage.java |  32 +++-
 .../server/raft/MetaStorageListener.java           |   1 +
 .../server/raft/MetaStorageWriteHandler.java       |  46 ++++-
 .../server/BasicOperationsKeyValueStorageTest.java |  76 ++++----
 .../server/SimpleInMemoryKeyValueStorage.java      |  36 +++-
 .../DistributedConfigurationStorageTest.java       |  10 +-
 18 files changed, 391 insertions(+), 112 deletions(-)

diff --git a/check-rules/spotbugs-excludes.xml 
b/check-rules/spotbugs-excludes.xml
index 27dec94abb..e7b4c71ba5 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -217,6 +217,21 @@
       <Field name="LEASE_PREFIX"/>
     </Or>
   </Match>
+  <Match>
+    <!-- Public byte array constants, not expected to be modified. -->
+    <Bug pattern="MS_MUTABLE_ARRAY"/>
+    <Class 
name="org.apache.ignite.internal.metastorage.server.KeyValueStorage"/>
+    <Or>
+      <Field name="INVOKE_RESULT_FALSE_BYTES"/>
+      <Field name="INVOKE_RESULT_TRUE_BYTES"/>
+    </Or>
+  </Match>
+  <Match>
+    <!-- Public byte array constants, not expected to be modified. -->
+    <Bug pattern="MS_MUTABLE_ARRAY"/>
+    <Class 
name="org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler"/>
+    <Field name="IDEMPOTENT_COMMAND_PREFIX_BYTES"/>
+  </Match>
   <Match>
     <Bug pattern="IT_NO_SUCH_ELEMENT"/>
     <Class 
name="org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage$ReadOnlyScanCursor"/>
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index d412f3085c..81c3603ebe 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -252,12 +252,14 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
         assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion + 
2)), willBe(true));
 
         List<Integer> expectedVersions = List.of(startVersion, startVersion + 
1, startVersion + 2);
-        List<Long> expectedTokens = List.of(revisionBefore + 1, revisionBefore 
+ 2, revisionBefore + 3);
+        List<Long> expectedTokens = List.of(revisionBefore + 1, revisionBefore 
+ 5, revisionBefore + 6);
 
+        // No-op operation also increases the revision.
+        int revisionsUpdateCount = 6;
         // wait till necessary revision is applied
         assertTrue(
                 waitForCondition(
-                        () -> metastore.appliedRevision() - 
expectedVersions.size() == revisionBefore,
+                        () -> metastore.appliedRevision() - 
revisionsUpdateCount == revisionBefore,
                         TimeUnit.SECONDS.toMillis(5)
                 )
         );
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index bc08178ce6..8bd8cfdac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -337,6 +337,25 @@ public final class ArrayUtils {
         return newArr;
     }
 
+    /**
+     * Concatenates elements to an array.
+     *
+     * @param arr Array.
+     * @param bytes One or more elements.
+     * @return Concatenated array.
+     */
+    public static byte[] concat(@Nullable byte[] arr, byte... bytes) {
+        if (nullOrEmpty(arr)) {
+            return bytes;
+        }
+
+        byte[] newArr = Arrays.copyOf(arr, arr.length + bytes.length);
+
+        System.arraycopy(bytes, 0, newArr, arr.length, bytes.length);
+
+        return newArr;
+    }
+
     /**
      * Removes an element from an array with decrementing the array itself.
      *
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
index 1e0efb9296..6ba28d7cb8 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
@@ -221,7 +221,7 @@ public class DistributionZoneManagerAlterFilterTest extends 
BaseDistributionZone
                 topology.putNode(e);
             }
             return invocation.callRealMethod();
-        }).when(keyValueStorage).invoke(any(), any());
+        }).when(keyValueStorage).invoke(any(), any(), any());
 
         // Check that node E, that was added while filter's altering, is not 
propagated to data nodes.
         assertDataNodesFromManager(distributionZoneManager, 
metaStorageManager::appliedRevision, catalogManager::latestCatalogVersion,
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
index 292f797067..e5c601ec3d 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
@@ -453,7 +453,7 @@ public class DistributionZoneManagerScaleUpScaleDownTest 
extends BaseDistributio
             }
 
             return invocation.callRealMethod();
-        }).when(keyValueStorage).invoke(any(), any());
+        }).when(keyValueStorage).invoke(any(), any(), any());
 
         topology.putNode(NODE_1);
 
@@ -503,7 +503,7 @@ public class DistributionZoneManagerScaleUpScaleDownTest 
extends BaseDistributio
             }
 
             return invocation.callRealMethod();
-        }).when(keyValueStorage).invoke(any(), any());
+        }).when(keyValueStorage).invoke(any(), any(), any());
 
         topology.removeNodes(Set.of(NODE_1));
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index d729aa0777..5fd90cf73e 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -299,7 +299,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
 
-        verify(keyValueStorage, timeout(1000).times(8)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(8)).invoke(any(), any(), 
any());
     }
 
     @Test
@@ -322,7 +322,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
 
-        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(), 
any());
 
         nodes = Set.of("node3", "node4", "node5");
 
@@ -333,7 +333,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         checkAssignments(zoneNodes, RebalanceUtil::plannedPartAssignmentsKey);
 
-        verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any(), 
any());
     }
 
     @Test
@@ -358,7 +358,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
 
-        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(), 
any());
 
         Set<String> emptyNodes = emptySet();
 
@@ -369,7 +369,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         checkAssignments(zoneNodes, RebalanceUtil::plannedPartAssignmentsKey);
 
-        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(), 
any());
     }
 
     @Test
@@ -392,7 +392,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
 
-        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(), 
any());
 
         Set<String> nodes2 = Set.of("node3", "node4", "node5");
 
@@ -404,7 +404,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         
assertNull(keyValueStorage.get(RebalanceUtil.plannedPartAssignmentsKey(partId).bytes()).value());
 
-        verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any(), 
any());
     }
 
     @Test
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
index 2870735bf7..db276400c4 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage;
 
+import java.io.Serializable;
 import java.util.UUID;
 import org.apache.ignite.internal.metastorage.dsl.MetaStorageMessageGroup;
 import org.apache.ignite.internal.network.NetworkMessage;
@@ -27,7 +28,7 @@ import 
org.apache.ignite.internal.network.annotations.Transferable;
  * would be unique cluster-wide.
  */
 @Transferable(MetaStorageMessageGroup.COMMAND_ID)
-public interface CommandId extends NetworkMessage {
+public interface CommandId extends NetworkMessage, Serializable {
     UUID nodeId();
 
     long counter();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index ce87c643dc..2ced907a77 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -27,6 +29,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -55,10 +58,13 @@ import 
org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
 import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
 import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
 import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -83,6 +89,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Integration tests for idempotency of {@link 
org.apache.ignite.internal.metastorage.command.IdempotentCommand}.
@@ -93,6 +101,22 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
     private static final int NODES_COUNT = 2;
 
+    private static final ByteArray TEST_KEY = new 
ByteArray("key".getBytes(StandardCharsets.UTF_8));
+    private static final byte[] TEST_VALUE = 
"value".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] ANOTHER_VALUE = 
"another".getBytes(StandardCharsets.UTF_8);
+
+    private static final ByteArray TEST_KEY_2 = new 
ByteArray("key2".getBytes(StandardCharsets.UTF_8));
+    private static final byte[] TEST_VALUE_2 = 
"value2".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] ANOTHER_VALUE_2 = 
"another2".getBytes(StandardCharsets.UTF_8);
+
+    private static final int YIELD_RESULT = 10;
+    private static final int ANOTHER_YIELD_RESULT = 20;
+
+    @InjectConfiguration("mock.responseTimeout = 100")
+    private RaftConfiguration raftConfiguration;
+    @InjectConfiguration("mock.idleSyncTimeInterval = 100")
+    private MetaStorageConfiguration metaStorageConfiguration;
+
     private List<Node> nodes;
 
     private static class Node implements AutoCloseable {
@@ -164,7 +188,9 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         }
 
         void start(CompletableFuture<Set<String>> metaStorageNodesFut) {
-            
when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFut);
+            if (metaStorageNodesFut != null) {
+                
when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFut);
+            }
 
             assertThat(startAsync(clusterService, raftManager, 
metaStorageManager), willCompleteSuccessfully());
         }
@@ -203,26 +229,8 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
     }
 
     @BeforeEach
-    void setUp(
-            TestInfo testInfo,
-            @InjectConfiguration("mock.responseTimeout = 100") 
RaftConfiguration raftConfiguration,
-            @InjectConfiguration("mock.idleSyncTimeInterval = 100") 
MetaStorageConfiguration metaStorageConfiguration
-    ) {
-        nodes = new ArrayList<>();
-
-        for (int i = 0; i < NODES_COUNT; i++) {
-            Node node = new Node(testInfo, raftConfiguration, 
metaStorageConfiguration, workDir, i);
-            nodes.add(node);
-        }
-
-        Set<String> nodeNames = nodes.stream().map(n -> 
n.clusterService.nodeName()).collect(toSet());
-        CompletableFuture<Set<String>> metaStorageNodesFut = new 
CompletableFuture<>();
-
-        nodes.forEach(n -> n.start(metaStorageNodesFut));
-
-        metaStorageNodesFut.complete(nodeNames);
-
-        nodes.forEach(Node::deployWatches);
+    void setUp(TestInfo testInfo) {
+        startCluster(testInfo);
     }
 
     @AfterEach
@@ -232,10 +240,6 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
     @Test
     public void testIdempotentInvoke() throws InterruptedException {
-        ByteArray testKey = new 
ByteArray("key".getBytes(StandardCharsets.UTF_8));
-        byte[] testValue = "value".getBytes(StandardCharsets.UTF_8);
-        byte[] anotherValue = "another".getBytes(StandardCharsets.UTF_8);
-
         AtomicInteger writeActionReqCount = new AtomicInteger();
         CompletableFuture<Void> retryBlockingFuture = new 
CompletableFuture<>();
 
@@ -274,9 +278,13 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
         MetaStorageManager metaStorageManager = leader.metaStorageManager;
 
-        CompletableFuture<Boolean> fut = 
metaStorageManager.invoke(notExists(testKey), put(testKey, testValue), 
put(testKey, anotherValue));
+        CompletableFuture<Boolean> fut = metaStorageManager.invoke(
+                notExists(TEST_KEY),
+                put(TEST_KEY, TEST_VALUE),
+                put(TEST_KEY, ANOTHER_VALUE)
+        );
 
-        assertTrue(waitForCondition(() -> 
leader.checkValueInStorage(testKey.bytes(), testValue), 10_000));
+        assertTrue(waitForCondition(() -> 
leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE), 10_000));
 
         log.info("Test: value appeared in storage.");
 
@@ -290,28 +298,15 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         log.info("Test: invoke complete.");
 
         assertTrue(fut.join());
-        assertTrue(leader.checkValueInStorage(testKey.bytes(), testValue));
+        assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE));
     }
 
     @Test
     public void testIdempotentInvokeAfterLeaderChange() {
-        ByteArray testKey = new 
ByteArray("key".getBytes(StandardCharsets.UTF_8));
-        byte[] testValue = "value".getBytes(StandardCharsets.UTF_8);
-        byte[] anotherValue = "another".getBytes(StandardCharsets.UTF_8);
+        InvokeCommand invokeCommand = (InvokeCommand) 
buildKeyNotExistsInvokeCommand(TEST_KEY, TEST_VALUE, ANOTHER_VALUE);
 
         RaftGroupService raftClient = raftClient();
 
-        HybridClock clock = new HybridClockImpl();
-        CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() -> 
UUID.randomUUID().toString());
-
-        InvokeCommand invokeCommand = CMD_FACTORY.invokeCommand()
-                .condition(notExists(testKey))
-                .success(List.of(put(testKey, testValue)))
-                .failure(List.of(put(testKey, anotherValue)))
-                .initiatorTimeLong(clock.nowLong())
-                .id(commandIdGenerator.newId())
-                .build();
-
         CompletableFuture<Boolean> fut = raftClient.run(invokeCommand);
 
         Node currentLeader = leader(raftClient);
@@ -319,7 +314,7 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         assertThat(fut, willCompleteSuccessfully());
         assertTrue(fut.join());
 
-        assertTrue(currentLeader.checkValueInStorage(testKey.bytes(), 
testValue));
+        assertTrue(currentLeader.checkValueInStorage(TEST_KEY.bytes(), 
TEST_VALUE));
 
         Node newLeader = nodes.stream()
                 .filter(n -> 
!n.clusterService.nodeName().equals(currentLeader.clusterService.nodeName()))
@@ -334,8 +329,53 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         assertThat(futAfterLeaderChange, willCompleteSuccessfully());
         assertTrue(futAfterLeaderChange.join());
 
-        assertTrue(currentLeader.checkValueInStorage(testKey.bytes(), 
testValue));
-        assertTrue(newLeader.checkValueInStorage(testKey.bytes(), testValue));
+        assertTrue(currentLeader.checkValueInStorage(TEST_KEY.bytes(), 
TEST_VALUE));
+        assertTrue(newLeader.checkValueInStorage(TEST_KEY.bytes(), 
TEST_VALUE));
+    }
+
+    @ParameterizedTest
+    @MethodSource("idempotentCommandProvider")
+    public void testIdempotentCacheRestoreFromSnapshot(IdempotentCommand 
idempotentCommand, TestInfo testInfo) throws Exception {
+        RaftGroupService raftClient = raftClient();
+        Node leader = leader(raftClient);
+
+        // Initial idempotent command run.
+        CompletableFuture<Object> commandProcessingResultFuture = 
raftClient.run(idempotentCommand);
+        assertThat(commandProcessingResultFuture, willCompleteSuccessfully());
+        Object commandProcessingResult = commandProcessingResultFuture.get();
+        if (idempotentCommand instanceof InvokeCommand) {
+            assertTrue((Boolean) commandProcessingResult);
+            assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), 
TEST_VALUE));
+        } else {
+            assertEquals(YIELD_RESULT, ((StatementResult) 
commandProcessingResult).getAsInt());
+            assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(), 
TEST_VALUE_2));
+
+        }
+
+        // Do the snapshot.
+        nodes.forEach(n -> raftClient().snapshot(new 
Peer(n.clusterService.nodeName())));
+
+        // Restart nodes in order to trigger idempotent volatile cache 
initialization from snapshot.
+        for (Node node : nodes) {
+            node.stop();
+        }
+
+        // Restart cluster.
+        startCluster(testInfo);
+
+        leader = leader(raftClient());
+
+        // Run same idempotent command one more time and check that condition 
wasn't re-evaluated, but was retrieved from the cache instead.
+        CompletableFuture<Object> commandProcessingResultFuture2 = 
raftClient().run(idempotentCommand);
+        assertThat(commandProcessingResultFuture2, willCompleteSuccessfully());
+        Object commandProcessingResult2 = commandProcessingResultFuture2.get();
+        if (idempotentCommand instanceof InvokeCommand) {
+            assertTrue((Boolean) commandProcessingResult2);
+            assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), 
TEST_VALUE));
+        } else {
+            assertEquals(YIELD_RESULT, ((StatementResult) 
commandProcessingResult).getAsInt());
+            assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(), 
TEST_VALUE_2));
+        }
     }
 
     private Node leader(RaftGroupService raftClient) {
@@ -365,4 +405,70 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
             throw new RuntimeException(e);
         }
     }
+
+    static List<IdempotentCommand> idempotentCommandProvider() {
+        return List.of(
+                buildKeyNotExistsInvokeCommand(TEST_KEY, TEST_VALUE, 
ANOTHER_VALUE),
+                buildKeyNotExistsMultiInvokeCommand(TEST_KEY_2, TEST_VALUE_2, 
ANOTHER_VALUE_2, YIELD_RESULT, ANOTHER_YIELD_RESULT)
+        );
+    }
+
+    private static IdempotentCommand buildKeyNotExistsInvokeCommand(
+            ByteArray testKey,
+            byte[] testValue,
+            byte[] anotherValue
+    ) {
+        HybridClock clock = new HybridClockImpl();
+        CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() -> 
UUID.randomUUID().toString());
+
+        return CMD_FACTORY.invokeCommand()
+                .condition(notExists(testKey))
+                .success(List.of(put(testKey, testValue)))
+                .failure(List.of(put(testKey, anotherValue)))
+                .initiatorTimeLong(clock.nowLong())
+                .id(commandIdGenerator.newId())
+                .build();
+    }
+
+    private static IdempotentCommand buildKeyNotExistsMultiInvokeCommand(
+            ByteArray testKey,
+            byte[] testValue,
+            byte[] anotherValue,
+            int testYieldResult,
+            int anotherYieldResult
+    ) {
+        HybridClock clock = new HybridClockImpl();
+        CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() -> 
UUID.randomUUID().toString());
+
+        Iif iif = iif(
+                notExists(testKey),
+                ops(put(testKey, testValue)).yield(testYieldResult),
+                ops(put(testKey, anotherValue)).yield(anotherYieldResult)
+        );
+
+        return CMD_FACTORY.multiInvokeCommand()
+                .id(commandIdGenerator.newId())
+                .iif(iif)
+                .safeTimeLong(clock.now().longValue())
+                .initiatorTimeLong(clock.now().longValue())
+                .build();
+    }
+
+    private void startCluster(TestInfo testInfo) {
+        nodes = new ArrayList<>();
+
+        for (int i = 0; i < NODES_COUNT; i++) {
+            Node node = new Node(testInfo, raftConfiguration, 
metaStorageConfiguration, workDir, i);
+            nodes.add(node);
+        }
+
+        Set<String> nodeNames = nodes.stream().map(n -> 
n.clusterService.nodeName()).collect(toSet());
+        CompletableFuture<Set<String>> metaStorageNodesFut = new 
CompletableFuture<>();
+
+        nodes.forEach(n -> n.start(metaStorageNodesFut));
+
+        metaStorageNodesFut.complete(nodeNames);
+
+        nodes.forEach(Node::deployWatches);
+    }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 8d7fa1b987..557ad690f1 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -325,7 +325,7 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
 
         var expectedEntryEvent = new EntryEvent(
                 new EntryImpl(key.bytes(), value, 1, 1),
-                new EntryImpl(key.bytes(), newValue, 2, 2)
+                new EntryImpl(key.bytes(), newValue, 2, 3)
         );
 
         assertThat(awaitFuture, willBe(expectedEntryEvent));
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index 75f1318cb0..0f0ea61c60 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -607,11 +607,11 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
         var ifCaptor = ArgumentCaptor.forClass(If.class);
 
-        when(node.mockStorage.invoke(any(), 
any())).thenReturn(ops().yield(true).result());
+        when(node.mockStorage.invoke(any(), any(), 
any())).thenReturn(ops().yield(true).result(), null, null);
 
         assertTrue(node.metaStorageService.invoke(iif).get().getAsBoolean());
 
-        verify(node.mockStorage).invoke(ifCaptor.capture(), any());
+        verify(node.mockStorage).invoke(ifCaptor.capture(), any(), any());
 
         var resultIf = ifCaptor.getValue();
 
@@ -656,7 +656,7 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
         byte[] expVal = {2};
 
-        when(node.mockStorage.invoke(any(), any(), any(), 
any())).thenReturn(true);
+        when(node.mockStorage.invoke(any(), any(), any(), any(), 
any())).thenReturn(true);
 
         Condition condition = Conditions.notExists(expKey);
 
@@ -672,7 +672,7 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
         ArgumentCaptor<Collection<Operation>> failureCaptor = 
ArgumentCaptor.forClass(Collection.class);
 
-        verify(node.mockStorage).invoke(conditionCaptor.capture(), 
successCaptor.capture(), failureCaptor.capture(), any());
+        verify(node.mockStorage).invoke(conditionCaptor.capture(), 
successCaptor.capture(), failureCaptor.capture(), any(), any());
 
         assertArrayEquals(expKey.bytes(), conditionCaptor.getValue().key());
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 414aabc03c..25c2324ba9 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -24,11 +24,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.LongConsumer;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,6 +38,9 @@ import org.jetbrains.annotations.Nullable;
  * Defines key/value storage interface.
  */
 public interface KeyValueStorage extends ManuallyCloseable {
+    byte[] INVOKE_RESULT_TRUE_BYTES = {ByteUtils.booleanToByte(true)};
+    byte[] INVOKE_RESULT_FALSE_BYTES = {ByteUtils.booleanToByte(false)};
+
     /**
      * Starts the given storage, allocating the necessary resources.
      */
@@ -142,20 +147,28 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
      * @param success Success operations.
      * @param failure Failure operations.
      * @param opTs Operation's timestamp.
+     * @param commandId Command Id.
      * @return Result of test condition.
      */
-    boolean invoke(Condition condition, Collection<Operation> success, 
Collection<Operation> failure, HybridTimestamp opTs);
+    boolean invoke(
+            Condition condition,
+            Collection<Operation> success,
+            Collection<Operation> failure,
+            HybridTimestamp opTs,
+            CommandId commandId
+    );
 
     /**
      * Invoke, which supports nested conditional statements with left and 
right branches of execution.
      *
      * @param iif {@link If} statement to invoke
      * @param opTs Operation's timestamp.
+     * @param commandId Command Id.
      * @return execution result
      * @see If
      * @see StatementResult
      */
-    StatementResult invoke(If iif, HybridTimestamp opTs);
+    StatementResult invoke(If iif, HybridTimestamp opTs, CommandId commandId);
 
     /**
      * Returns cursor by entries which correspond to the given keys range.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 5036fc8c82..d24d683488 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -21,10 +21,12 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
@@ -171,8 +173,18 @@ public class WatchProcessor implements ManuallyCloseable {
                     // Revision must be the same for all entries.
                     long newRevision = updatedEntries.get(0).revision();
 
+                    List<Entry> filteredUpdatedEntries = 
updatedEntries.stream()
+                            .filter(entry ->
+                                    entry.key().length <= 
IDEMPOTENT_COMMAND_PREFIX_BYTES.length
+                                            ||
+                                            entry.key().length > 
IDEMPOTENT_COMMAND_PREFIX_BYTES.length
+                                                    && 
!ByteBuffer.wrap(entry.key(), 0, IDEMPOTENT_COMMAND_PREFIX_BYTES.length)
+                                                            
.equals(ByteBuffer.wrap(IDEMPOTENT_COMMAND_PREFIX_BYTES)))
+                            .collect(Collectors.toList());
+
                     // Collect all the events for each watch.
-                    CompletableFuture<List<WatchAndEvents>> 
watchesAndEventsFuture = collectWatchesAndEvents(updatedEntries, newRevision);
+                    CompletableFuture<List<WatchAndEvents>> 
watchesAndEventsFuture =
+                            collectWatchesAndEvents(filteredUpdatedEntries, 
newRevision);
 
                     return watchesAndEventsFuture
                             .thenComposeAsync(watchAndEvents -> {
@@ -194,7 +206,7 @@ public class WatchProcessor implements ManuallyCloseable {
                                         );
 
                                 notificationFuture.whenComplete((unused, e) -> 
{
-                                    maybeLogLongProcessing(updatedEntries, 
startTimeNanos);
+                                    
maybeLogLongProcessing(filteredUpdatedEntries, startTimeNanos);
 
                                     if (e != null) {
                                         failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, e));
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 7a4102656e..f0fc6fab2a 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -32,6 +32,7 @@ import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageC
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
+import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
@@ -65,12 +66,15 @@ import java.util.function.LongConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
@@ -89,6 +93,7 @@ import 
org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -660,7 +665,13 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public boolean invoke(Condition condition, Collection<Operation> success, 
Collection<Operation> failure, HybridTimestamp opTs) {
+    public boolean invoke(
+            Condition condition,
+            Collection<Operation> success,
+            Collection<Operation> failure,
+            HybridTimestamp opTs,
+            CommandId commandId
+    ) {
         rwLock.writeLock().lock();
 
         try {
@@ -668,7 +679,12 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
             boolean branch = condition.test(entries);
 
-            Collection<Operation> ops = branch ? success : failure;
+            Collection<Operation> ops = branch ? new ArrayList<>(success) : 
new ArrayList<>(failure);
+
+            ops.add(Operations.put(
+                    new 
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES, 
ByteUtils.toBytes(commandId))),
+                    branch ? INVOKE_RESULT_TRUE_BYTES : 
INVOKE_RESULT_FALSE_BYTES)
+            );
 
             applyOperations(ops, opTs);
 
@@ -681,7 +697,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public StatementResult invoke(If iif, HybridTimestamp opTs) {
+    public StatementResult invoke(If iif, HybridTimestamp opTs, CommandId 
commandId) {
         rwLock.writeLock().lock();
 
         try {
@@ -703,7 +719,14 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                 if (branch.isTerminal()) {
                     Update update = branch.update();
 
-                    applyOperations(update.operations(), opTs);
+                    Collection<Operation> ops = new 
ArrayList<>(update.operations());
+
+                    ops.add(Operations.put(
+                            new 
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES, 
ByteUtils.toBytes(commandId))),
+                            update.result().result())
+                    );
+
+                    applyOperations(ops, opTs);
 
                     return update.result();
                 } else {
@@ -1415,7 +1438,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         UpdatedEntries copy = updatedEntries.transfer();
 
         assert copy.ts != null;
-
         watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 43cf5624e0..a49e594983 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -167,6 +167,7 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
     @Override
     public boolean onSnapshotLoad(Path path) {
         storage.restoreSnapshot(path);
+        writeHandler.onSnapshotLoad();
         return true;
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index a664c2254c..95c3685fb1 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -17,7 +17,11 @@
 
 package org.apache.ignite.internal.metastorage.server.raft;
 
+import static java.util.Arrays.copyOfRange;
+import static org.apache.ignite.internal.util.ByteUtils.byteToBoolean;
+
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +30,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.CommandId;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
 import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
@@ -38,6 +43,7 @@ import 
org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
 import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
 import org.apache.ignite.internal.metastorage.dsl.ConditionType;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
 import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.metastorage.dsl.Statement.IfStatement;
 import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
@@ -55,6 +61,8 @@ import 
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -64,6 +72,10 @@ public class MetaStorageWriteHandler {
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(MetaStorageWriteHandler.class);
 
+    public static final byte[] IDEMPOTENT_COMMAND_PREFIX_BYTES = 
"icp.".getBytes(StandardCharsets.UTF_8);
+
+    private static final MetaStorageMessagesFactory MSG_FACTORY = new 
MetaStorageMessagesFactory();
+
     private final KeyValueStorage storage;
     private final ClusterTimeImpl clusterTime;
 
@@ -173,11 +185,11 @@ public class MetaStorageWriteHandler {
         } else if (command instanceof InvokeCommand) {
             InvokeCommand cmd = (InvokeCommand) command;
 
-            clo.result(storage.invoke(toCondition(cmd.condition()), 
cmd.success(), cmd.failure(), opTime));
+            clo.result(storage.invoke(toCondition(cmd.condition()), 
cmd.success(), cmd.failure(), opTime, cmd.id()));
         } else if (command instanceof MultiInvokeCommand) {
             MultiInvokeCommand cmd = (MultiInvokeCommand) command;
 
-            clo.result(storage.invoke(toIf(cmd.iif()), opTime));
+            clo.result(storage.invoke(toIf(cmd.iif()), opTime, cmd.id()));
         } else if (command instanceof SyncTimeCommand) {
             storage.advanceSafeTime(command.safeTime());
 
@@ -310,6 +322,36 @@ public class MetaStorageWriteHandler {
         return false;
     }
 
+    /**
+     * The callback that is called right after storage is updated with a 
snapshot.
+     */
+    void onSnapshotLoad() {
+        byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+        byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+        Cursor<Entry> cursor = storage.range(keyFrom, keyTo);
+        // It's fine to lose original command start time - in that case we 
will store the entry a little bit longer that necessary.
+        HybridTimestamp now = clusterTime.now();
+
+        try (cursor) {
+            for (Entry entry : cursor) {
+                if (!entry.tombstone()) {
+                    byte[] commandIdBytes = copyOfRange(entry.key(), 
IDEMPOTENT_COMMAND_PREFIX_BYTES.length, entry.key().length);
+                    CommandId commandId = ByteUtils.fromBytes(commandIdBytes);
+
+                    Serializable result;
+                    if (entry.value().length == 1) {
+                        result = byteToBoolean(entry.value()[0]);
+                    } else {
+                        result = 
MSG_FACTORY.statementResult().result(entry.value()).build();
+                    }
+
+                    idempotentCommandCache.put(commandId, new 
IdempotentCommandCachedResult(result, now));
+                }
+            }
+        }
+    }
+
     private static class IdempotentCommandCachedResult {
         @Nullable
         final Serializable result;
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 4c53d34fbc..d6b695729a 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -47,6 +47,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
 import org.apache.ignite.internal.metastorage.server.ValueCondition.Type;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -690,7 +692,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Success" branch is applied.
         assertTrue(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -743,7 +745,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Failure" branch is applied.
         assertFalse(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -796,7 +798,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Success" branch is applied.
         assertTrue(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -849,7 +851,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Failure" branch is applied.
         assertFalse(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -902,7 +904,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Success" branch is applied.
         assertTrue(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -958,7 +960,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Failure" branch is applied.
         assertFalse(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -1011,7 +1013,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Success" branch is applied.
         assertTrue(branch);
         assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -1063,7 +1065,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Failure" branch is applied.
         assertFalse(branch);
         assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
+        assertEquals(3, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -1115,7 +1117,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Success" branch is applied.
         assertTrue(branch);
         assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
+        assertEquals(3, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -1168,7 +1170,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Failure" branch is applied.
         assertFalse(branch);
         assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -1224,7 +1226,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Success" branch is applied.
         assertTrue(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -1280,7 +1282,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         // "Failure" branch is applied.
         assertFalse(branch);
         assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
 
@@ -1333,8 +1335,8 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         assertTrue(branch);
 
         // No updates.
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
 
         // Put.
         branch = invokeOnMs(
@@ -1348,16 +1350,15 @@ public abstract class 
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         assertTrue(branch);
 
-        // +1 for revision, +2 for update counter.
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(3, storage.revision());
+        assertEquals(5, storage.updateCounter());
 
         Entry e2 = storage.get(key2);
 
         assertFalse(e2.empty());
         assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(2, e2.updateCounter());
+        assertEquals(3, e2.revision());
+        assertEquals(3, e2.updateCounter());
         assertArrayEquals(key2, e2.key());
         assertArrayEquals(val2, e2.value());
 
@@ -1365,8 +1366,8 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         assertFalse(e3.empty());
         assertFalse(e3.tombstone());
-        assertEquals(2, e3.revision());
-        assertEquals(3, e3.updateCounter());
+        assertEquals(3, e3.revision());
+        assertEquals(4, e3.updateCounter());
         assertArrayEquals(key3, e3.key());
         assertArrayEquals(val3, e3.value());
 
@@ -1382,24 +1383,23 @@ public abstract class 
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         assertTrue(branch);
 
-        // +1 for revision, +2 for update counter.
-        assertEquals(3, storage.revision());
-        assertEquals(5, storage.updateCounter());
+        assertEquals(4, storage.revision());
+        assertEquals(8, storage.updateCounter());
 
         e2 = storage.get(key2);
 
         assertFalse(e2.empty());
         assertTrue(e2.tombstone());
-        assertEquals(3, e2.revision());
-        assertEquals(4, e2.updateCounter());
+        assertEquals(4, e2.revision());
+        assertEquals(6, e2.updateCounter());
         assertArrayEquals(key2, e2.key());
 
         e3 = storage.get(key3);
 
         assertFalse(e3.empty());
         assertTrue(e3.tombstone());
-        assertEquals(3, e3.revision());
-        assertEquals(5, e3.updateCounter());
+        assertEquals(4, e3.revision());
+        assertEquals(7, e3.updateCounter());
         assertArrayEquals(key3, e3.key());
     }
 
@@ -1463,7 +1463,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         assertEquals(1, branch.getAsInt());
 
         assertEquals(4, storage.revision());
-        assertEquals(4, storage.updateCounter());
+        assertEquals(5, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
         assertEquals(4, e1.revision());
@@ -1541,7 +1541,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         assertEquals(2, branch.getAsInt());
 
         assertEquals(5, storage.revision());
-        assertEquals(6, storage.updateCounter());
+        assertEquals(7, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
         assertEquals(5, e1.revision());
@@ -1610,7 +1610,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         assertEquals(3, branch.getAsInt());
 
         assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
+        assertEquals(4, storage.updateCounter());
 
         Entry e1 = storage.get(key1);
         assertEquals(1, e1.revision());
@@ -2176,10 +2176,20 @@ public abstract class 
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
     }
 
     private boolean invokeOnMs(Condition condition, Collection<Operation> 
success, Collection<Operation> failure) {
-        return storage.invoke(condition, success, failure, 
HybridTimestamp.MIN_VALUE);
+        return storage.invoke(
+                condition,
+                success,
+                failure,
+                HybridTimestamp.MIN_VALUE,
+                new CommandIdGenerator(() -> 
UUID.randomUUID().toString()).newId()
+        );
     }
 
     private StatementResult invokeOnMs(If iif) {
-        return storage.invoke(iif, HybridTimestamp.MIN_VALUE);
+        return storage.invoke(
+                iif,
+                HybridTimestamp.MIN_VALUE,
+                new CommandIdGenerator(() -> 
UUID.randomUUID().toString()).newId()
+        );
     }
 }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index f130c60819..e5ad163d2d 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.metastorage.server;
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
 
@@ -42,14 +43,19 @@ import java.util.function.LongConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
@@ -231,13 +237,26 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public boolean invoke(Condition condition, Collection<Operation> success, 
Collection<Operation> failure, HybridTimestamp opTs) {
+    public boolean invoke(
+            Condition condition,
+            Collection<Operation> success,
+            Collection<Operation> failure,
+            HybridTimestamp opTs,
+            CommandId commandId
+    ) {
         synchronized (mux) {
             Collection<Entry> e = getAll(Arrays.asList(condition.keys()));
 
             boolean branch = condition.test(e.toArray(new Entry[]{}));
 
-            Collection<Operation> ops = branch ? success : failure;
+            Collection<Operation> ops = branch ? new ArrayList<>(success) : 
new ArrayList<>(failure);
+
+            // In case of in-memory storage, there's no sense in "persisting" 
invoke result, however same persistent source operations
+            // were added in order to have matching revisions count through 
all storages.
+            ops.add(Operations.put(
+                    new 
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES, 
ByteUtils.toBytes(commandId))),
+                    branch ? INVOKE_RESULT_TRUE_BYTES : 
INVOKE_RESULT_FALSE_BYTES)
+            );
 
             long curRev = rev + 1;
 
@@ -274,7 +293,7 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public StatementResult invoke(If iif, HybridTimestamp opTs) {
+    public StatementResult invoke(If iif, HybridTimestamp opTs, CommandId 
commandId) {
         synchronized (mux) {
             If currIf = iif;
             while (true) {
@@ -287,7 +306,16 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
 
                     boolean modified = false;
 
-                    for (Operation op : branch.update().operations()) {
+                    Collection<Operation> ops = new 
ArrayList<>(branch.update().operations());
+
+                    // In case of in-memory storage, there's no sense in 
"persisting" invoke result, however same persistent source
+                    // operations were added in order to have matching 
revisions count through all storages.
+                    ops.add(Operations.put(
+                            new 
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES, 
ByteUtils.toBytes(commandId))),
+                            branch.update().result().result())
+                    );
+
+                    for (Operation op : ops) {
                         switch (op.type()) {
                             case PUT:
                                 doPut(op.key(), op.value(), curRev);
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
index b3c0dcc838..74304e9d8a 100644
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
@@ -27,12 +27,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
 import org.apache.ignite.internal.metastorage.server.Condition;
 import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -84,7 +86,13 @@ public class DistributedConfigurationStorageTest extends 
ConfigurationStorageTes
             Collection<Operation> success = invocation.getArgument(1);
             Collection<Operation> failure = invocation.getArgument(2);
 
-            boolean invokeResult = 
metaStorage.invoke(toServerCondition(condition), success, failure, 
HybridTimestamp.MIN_VALUE);
+            boolean invokeResult = metaStorage.invoke(
+                    toServerCondition(condition),
+                    success,
+                    failure,
+                    HybridTimestamp.MIN_VALUE,
+                    new CommandIdGenerator(() -> 
UUID.randomUUID().toString()).newId()
+            );
 
             return CompletableFuture.completedFuture(invokeResult);
         });

Reply via email to