This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f6cab2c3b9 IGNITE-22214 Add persistence and recovery for an idempotent
command cache (#3800)
f6cab2c3b9 is described below
commit f6cab2c3b9c5d894e449fb18add800ec23662e04
Author: Alexander Lapin <[email protected]>
AuthorDate: Thu May 23 17:48:36 2024 +0300
IGNITE-22214 Add persistence and recovery for an idempotent command cache
(#3800)
Co-authored-by: Denis Chudov <[email protected]>
---
check-rules/spotbugs-excludes.xml | 15 ++
.../catalog/storage/UpdateLogImplTest.java | 6 +-
.../apache/ignite/internal/util/ArrayUtils.java | 19 ++
.../DistributionZoneManagerAlterFilterTest.java | 2 +-
...istributionZoneManagerScaleUpScaleDownTest.java | 4 +-
.../DistributionZoneRebalanceEngineTest.java | 14 +-
.../ignite/internal/metastorage/CommandId.java | 3 +-
.../impl/ItIdempotentCommandCacheTest.java | 196 ++++++++++++++++-----
.../ItMetaStorageMultipleNodesAbstractTest.java | 2 +-
.../metastorage/impl/ItMetaStorageServiceTest.java | 8 +-
.../metastorage/server/KeyValueStorage.java | 17 +-
.../metastorage/server/WatchProcessor.java | 16 +-
.../server/persistence/RocksDbKeyValueStorage.java | 32 +++-
.../server/raft/MetaStorageListener.java | 1 +
.../server/raft/MetaStorageWriteHandler.java | 46 ++++-
.../server/BasicOperationsKeyValueStorageTest.java | 76 ++++----
.../server/SimpleInMemoryKeyValueStorage.java | 36 +++-
.../DistributedConfigurationStorageTest.java | 10 +-
18 files changed, 391 insertions(+), 112 deletions(-)
diff --git a/check-rules/spotbugs-excludes.xml
b/check-rules/spotbugs-excludes.xml
index 27dec94abb..e7b4c71ba5 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -217,6 +217,21 @@
<Field name="LEASE_PREFIX"/>
</Or>
</Match>
+ <Match>
+ <!-- Public byte array constants, not expected to be modified. -->
+ <Bug pattern="MS_MUTABLE_ARRAY"/>
+ <Class
name="org.apache.ignite.internal.metastorage.server.KeyValueStorage"/>
+ <Or>
+ <Field name="INVOKE_RESULT_FALSE_BYTES"/>
+ <Field name="INVOKE_RESULT_TRUE_BYTES"/>
+ </Or>
+ </Match>
+ <Match>
+ <!-- Public byte array constants, not expected to be modified. -->
+ <Bug pattern="MS_MUTABLE_ARRAY"/>
+ <Class
name="org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler"/>
+ <Field name="IDEMPOTENT_COMMAND_PREFIX_BYTES"/>
+ </Match>
<Match>
<Bug pattern="IT_NO_SUCH_ELEMENT"/>
<Class
name="org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage$ReadOnlyScanCursor"/>
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index d412f3085c..81c3603ebe 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -252,12 +252,14 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion +
2)), willBe(true));
List<Integer> expectedVersions = List.of(startVersion, startVersion +
1, startVersion + 2);
- List<Long> expectedTokens = List.of(revisionBefore + 1, revisionBefore
+ 2, revisionBefore + 3);
+ List<Long> expectedTokens = List.of(revisionBefore + 1, revisionBefore
+ 5, revisionBefore + 6);
+ // No-op operation also increases the revision.
+ int revisionsUpdateCount = 6;
// wait till necessary revision is applied
assertTrue(
waitForCondition(
- () -> metastore.appliedRevision() -
expectedVersions.size() == revisionBefore,
+ () -> metastore.appliedRevision() -
revisionsUpdateCount == revisionBefore,
TimeUnit.SECONDS.toMillis(5)
)
);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index bc08178ce6..8bd8cfdac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -337,6 +337,25 @@ public final class ArrayUtils {
return newArr;
}
+ /**
+ * Concatenates elements to an array.
+ *
+ * @param arr Array.
+ * @param bytes One or more elements.
+ * @return Concatenated array.
+ */
+ public static byte[] concat(@Nullable byte[] arr, byte... bytes) {
+ if (nullOrEmpty(arr)) {
+ return bytes;
+ }
+
+ byte[] newArr = Arrays.copyOf(arr, arr.length + bytes.length);
+
+ System.arraycopy(bytes, 0, newArr, arr.length, bytes.length);
+
+ return newArr;
+ }
+
/**
* Removes an element from an array with decrementing the array itself.
*
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
index 1e0efb9296..6ba28d7cb8 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
@@ -221,7 +221,7 @@ public class DistributionZoneManagerAlterFilterTest extends
BaseDistributionZone
topology.putNode(e);
}
return invocation.callRealMethod();
- }).when(keyValueStorage).invoke(any(), any());
+ }).when(keyValueStorage).invoke(any(), any(), any());
// Check that node E, that was added while filter's altering, is not
propagated to data nodes.
assertDataNodesFromManager(distributionZoneManager,
metaStorageManager::appliedRevision, catalogManager::latestCatalogVersion,
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
index 292f797067..e5c601ec3d 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
@@ -453,7 +453,7 @@ public class DistributionZoneManagerScaleUpScaleDownTest
extends BaseDistributio
}
return invocation.callRealMethod();
- }).when(keyValueStorage).invoke(any(), any());
+ }).when(keyValueStorage).invoke(any(), any(), any());
topology.putNode(NODE_1);
@@ -503,7 +503,7 @@ public class DistributionZoneManagerScaleUpScaleDownTest
extends BaseDistributio
}
return invocation.callRealMethod();
- }).when(keyValueStorage).invoke(any(), any());
+ }).when(keyValueStorage).invoke(any(), any(), any());
topology.removeNodes(Set.of(NODE_1));
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index d729aa0777..5fd90cf73e 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -299,7 +299,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(8)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(8)).invoke(any(), any(),
any());
}
@Test
@@ -322,7 +322,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(),
any());
nodes = Set.of("node3", "node4", "node5");
@@ -333,7 +333,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
checkAssignments(zoneNodes, RebalanceUtil::plannedPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any(),
any());
}
@Test
@@ -358,7 +358,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(),
any());
Set<String> emptyNodes = emptySet();
@@ -369,7 +369,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
checkAssignments(zoneNodes, RebalanceUtil::plannedPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(),
any());
}
@Test
@@ -392,7 +392,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any(),
any());
Set<String> nodes2 = Set.of("node3", "node4", "node5");
@@ -404,7 +404,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
assertNull(keyValueStorage.get(RebalanceUtil.plannedPartAssignmentsKey(partId).bytes()).value());
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any(),
any());
}
@Test
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
index 2870735bf7..db276400c4 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CommandId.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage;
+import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.internal.metastorage.dsl.MetaStorageMessageGroup;
import org.apache.ignite.internal.network.NetworkMessage;
@@ -27,7 +28,7 @@ import
org.apache.ignite.internal.network.annotations.Transferable;
* would be unique cluster-wide.
*/
@Transferable(MetaStorageMessageGroup.COMMAND_ID)
-public interface CommandId extends NetworkMessage {
+public interface CommandId extends NetworkMessage, Serializable {
UUID nodeId();
long counter();
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index ce87c643dc..2ced907a77 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -27,6 +29,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -55,10 +58,13 @@ import
org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
import org.apache.ignite.internal.metastorage.command.InvokeCommand;
import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
import
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -83,6 +89,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* Integration tests for idempotency of {@link
org.apache.ignite.internal.metastorage.command.IdempotentCommand}.
@@ -93,6 +101,22 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
private static final int NODES_COUNT = 2;
+ private static final ByteArray TEST_KEY = new
ByteArray("key".getBytes(StandardCharsets.UTF_8));
+ private static final byte[] TEST_VALUE =
"value".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] ANOTHER_VALUE =
"another".getBytes(StandardCharsets.UTF_8);
+
+ private static final ByteArray TEST_KEY_2 = new
ByteArray("key2".getBytes(StandardCharsets.UTF_8));
+ private static final byte[] TEST_VALUE_2 =
"value2".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] ANOTHER_VALUE_2 =
"another2".getBytes(StandardCharsets.UTF_8);
+
+ private static final int YIELD_RESULT = 10;
+ private static final int ANOTHER_YIELD_RESULT = 20;
+
+ @InjectConfiguration("mock.responseTimeout = 100")
+ private RaftConfiguration raftConfiguration;
+ @InjectConfiguration("mock.idleSyncTimeInterval = 100")
+ private MetaStorageConfiguration metaStorageConfiguration;
+
private List<Node> nodes;
private static class Node implements AutoCloseable {
@@ -164,7 +188,9 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
}
void start(CompletableFuture<Set<String>> metaStorageNodesFut) {
-
when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFut);
+ if (metaStorageNodesFut != null) {
+
when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFut);
+ }
assertThat(startAsync(clusterService, raftManager,
metaStorageManager), willCompleteSuccessfully());
}
@@ -203,26 +229,8 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
}
@BeforeEach
- void setUp(
- TestInfo testInfo,
- @InjectConfiguration("mock.responseTimeout = 100")
RaftConfiguration raftConfiguration,
- @InjectConfiguration("mock.idleSyncTimeInterval = 100")
MetaStorageConfiguration metaStorageConfiguration
- ) {
- nodes = new ArrayList<>();
-
- for (int i = 0; i < NODES_COUNT; i++) {
- Node node = new Node(testInfo, raftConfiguration,
metaStorageConfiguration, workDir, i);
- nodes.add(node);
- }
-
- Set<String> nodeNames = nodes.stream().map(n ->
n.clusterService.nodeName()).collect(toSet());
- CompletableFuture<Set<String>> metaStorageNodesFut = new
CompletableFuture<>();
-
- nodes.forEach(n -> n.start(metaStorageNodesFut));
-
- metaStorageNodesFut.complete(nodeNames);
-
- nodes.forEach(Node::deployWatches);
+ void setUp(TestInfo testInfo) {
+ startCluster(testInfo);
}
@AfterEach
@@ -232,10 +240,6 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
@Test
public void testIdempotentInvoke() throws InterruptedException {
- ByteArray testKey = new
ByteArray("key".getBytes(StandardCharsets.UTF_8));
- byte[] testValue = "value".getBytes(StandardCharsets.UTF_8);
- byte[] anotherValue = "another".getBytes(StandardCharsets.UTF_8);
-
AtomicInteger writeActionReqCount = new AtomicInteger();
CompletableFuture<Void> retryBlockingFuture = new
CompletableFuture<>();
@@ -274,9 +278,13 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
MetaStorageManager metaStorageManager = leader.metaStorageManager;
- CompletableFuture<Boolean> fut =
metaStorageManager.invoke(notExists(testKey), put(testKey, testValue),
put(testKey, anotherValue));
+ CompletableFuture<Boolean> fut = metaStorageManager.invoke(
+ notExists(TEST_KEY),
+ put(TEST_KEY, TEST_VALUE),
+ put(TEST_KEY, ANOTHER_VALUE)
+ );
- assertTrue(waitForCondition(() ->
leader.checkValueInStorage(testKey.bytes(), testValue), 10_000));
+ assertTrue(waitForCondition(() ->
leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE), 10_000));
log.info("Test: value appeared in storage.");
@@ -290,28 +298,15 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
log.info("Test: invoke complete.");
assertTrue(fut.join());
- assertTrue(leader.checkValueInStorage(testKey.bytes(), testValue));
+ assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE));
}
@Test
public void testIdempotentInvokeAfterLeaderChange() {
- ByteArray testKey = new
ByteArray("key".getBytes(StandardCharsets.UTF_8));
- byte[] testValue = "value".getBytes(StandardCharsets.UTF_8);
- byte[] anotherValue = "another".getBytes(StandardCharsets.UTF_8);
+ InvokeCommand invokeCommand = (InvokeCommand)
buildKeyNotExistsInvokeCommand(TEST_KEY, TEST_VALUE, ANOTHER_VALUE);
RaftGroupService raftClient = raftClient();
- HybridClock clock = new HybridClockImpl();
- CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() ->
UUID.randomUUID().toString());
-
- InvokeCommand invokeCommand = CMD_FACTORY.invokeCommand()
- .condition(notExists(testKey))
- .success(List.of(put(testKey, testValue)))
- .failure(List.of(put(testKey, anotherValue)))
- .initiatorTimeLong(clock.nowLong())
- .id(commandIdGenerator.newId())
- .build();
-
CompletableFuture<Boolean> fut = raftClient.run(invokeCommand);
Node currentLeader = leader(raftClient);
@@ -319,7 +314,7 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
assertThat(fut, willCompleteSuccessfully());
assertTrue(fut.join());
- assertTrue(currentLeader.checkValueInStorage(testKey.bytes(),
testValue));
+ assertTrue(currentLeader.checkValueInStorage(TEST_KEY.bytes(),
TEST_VALUE));
Node newLeader = nodes.stream()
.filter(n ->
!n.clusterService.nodeName().equals(currentLeader.clusterService.nodeName()))
@@ -334,8 +329,53 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
assertThat(futAfterLeaderChange, willCompleteSuccessfully());
assertTrue(futAfterLeaderChange.join());
- assertTrue(currentLeader.checkValueInStorage(testKey.bytes(),
testValue));
- assertTrue(newLeader.checkValueInStorage(testKey.bytes(), testValue));
+ assertTrue(currentLeader.checkValueInStorage(TEST_KEY.bytes(),
TEST_VALUE));
+ assertTrue(newLeader.checkValueInStorage(TEST_KEY.bytes(),
TEST_VALUE));
+ }
+
+ @ParameterizedTest
+ @MethodSource("idempotentCommandProvider")
+ public void testIdempotentCacheRestoreFromSnapshot(IdempotentCommand
idempotentCommand, TestInfo testInfo) throws Exception {
+ RaftGroupService raftClient = raftClient();
+ Node leader = leader(raftClient);
+
+ // Initial idempotent command run.
+ CompletableFuture<Object> commandProcessingResultFuture =
raftClient.run(idempotentCommand);
+ assertThat(commandProcessingResultFuture, willCompleteSuccessfully());
+ Object commandProcessingResult = commandProcessingResultFuture.get();
+ if (idempotentCommand instanceof InvokeCommand) {
+ assertTrue((Boolean) commandProcessingResult);
+ assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(),
TEST_VALUE));
+ } else {
+ assertEquals(YIELD_RESULT, ((StatementResult)
commandProcessingResult).getAsInt());
+ assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(),
TEST_VALUE_2));
+
+ }
+
+ // Do the snapshot.
+ nodes.forEach(n -> raftClient().snapshot(new
Peer(n.clusterService.nodeName())));
+
+ // Restart nodes in order to trigger idempotent volatile cache
initialization from snapshot.
+ for (Node node : nodes) {
+ node.stop();
+ }
+
+ // Restart cluster.
+ startCluster(testInfo);
+
+ leader = leader(raftClient());
+
+ // Run same idempotent command one more time and check that condition
wasn't re-evaluated, but was retrieved from the cache instead.
+ CompletableFuture<Object> commandProcessingResultFuture2 =
raftClient().run(idempotentCommand);
+ assertThat(commandProcessingResultFuture2, willCompleteSuccessfully());
+ Object commandProcessingResult2 = commandProcessingResultFuture2.get();
+ if (idempotentCommand instanceof InvokeCommand) {
+ assertTrue((Boolean) commandProcessingResult2);
+ assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(),
TEST_VALUE));
+ } else {
+ assertEquals(YIELD_RESULT, ((StatementResult)
commandProcessingResult).getAsInt());
+ assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(),
TEST_VALUE_2));
+ }
}
private Node leader(RaftGroupService raftClient) {
@@ -365,4 +405,70 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
throw new RuntimeException(e);
}
}
+
+ static List<IdempotentCommand> idempotentCommandProvider() {
+ return List.of(
+ buildKeyNotExistsInvokeCommand(TEST_KEY, TEST_VALUE,
ANOTHER_VALUE),
+ buildKeyNotExistsMultiInvokeCommand(TEST_KEY_2, TEST_VALUE_2,
ANOTHER_VALUE_2, YIELD_RESULT, ANOTHER_YIELD_RESULT)
+ );
+ }
+
+ private static IdempotentCommand buildKeyNotExistsInvokeCommand(
+ ByteArray testKey,
+ byte[] testValue,
+ byte[] anotherValue
+ ) {
+ HybridClock clock = new HybridClockImpl();
+ CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() ->
UUID.randomUUID().toString());
+
+ return CMD_FACTORY.invokeCommand()
+ .condition(notExists(testKey))
+ .success(List.of(put(testKey, testValue)))
+ .failure(List.of(put(testKey, anotherValue)))
+ .initiatorTimeLong(clock.nowLong())
+ .id(commandIdGenerator.newId())
+ .build();
+ }
+
+ private static IdempotentCommand buildKeyNotExistsMultiInvokeCommand(
+ ByteArray testKey,
+ byte[] testValue,
+ byte[] anotherValue,
+ int testYieldResult,
+ int anotherYieldResult
+ ) {
+ HybridClock clock = new HybridClockImpl();
+ CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() ->
UUID.randomUUID().toString());
+
+ Iif iif = iif(
+ notExists(testKey),
+ ops(put(testKey, testValue)).yield(testYieldResult),
+ ops(put(testKey, anotherValue)).yield(anotherYieldResult)
+ );
+
+ return CMD_FACTORY.multiInvokeCommand()
+ .id(commandIdGenerator.newId())
+ .iif(iif)
+ .safeTimeLong(clock.now().longValue())
+ .initiatorTimeLong(clock.now().longValue())
+ .build();
+ }
+
+ private void startCluster(TestInfo testInfo) {
+ nodes = new ArrayList<>();
+
+ for (int i = 0; i < NODES_COUNT; i++) {
+ Node node = new Node(testInfo, raftConfiguration,
metaStorageConfiguration, workDir, i);
+ nodes.add(node);
+ }
+
+ Set<String> nodeNames = nodes.stream().map(n ->
n.clusterService.nodeName()).collect(toSet());
+ CompletableFuture<Set<String>> metaStorageNodesFut = new
CompletableFuture<>();
+
+ nodes.forEach(n -> n.start(metaStorageNodesFut));
+
+ metaStorageNodesFut.complete(nodeNames);
+
+ nodes.forEach(Node::deployWatches);
+ }
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 8d7fa1b987..557ad690f1 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -325,7 +325,7 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
var expectedEntryEvent = new EntryEvent(
new EntryImpl(key.bytes(), value, 1, 1),
- new EntryImpl(key.bytes(), newValue, 2, 2)
+ new EntryImpl(key.bytes(), newValue, 2, 3)
);
assertThat(awaitFuture, willBe(expectedEntryEvent));
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index 75f1318cb0..0f0ea61c60 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -607,11 +607,11 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
var ifCaptor = ArgumentCaptor.forClass(If.class);
- when(node.mockStorage.invoke(any(),
any())).thenReturn(ops().yield(true).result());
+ when(node.mockStorage.invoke(any(), any(),
any())).thenReturn(ops().yield(true).result(), null, null);
assertTrue(node.metaStorageService.invoke(iif).get().getAsBoolean());
- verify(node.mockStorage).invoke(ifCaptor.capture(), any());
+ verify(node.mockStorage).invoke(ifCaptor.capture(), any(), any());
var resultIf = ifCaptor.getValue();
@@ -656,7 +656,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
byte[] expVal = {2};
- when(node.mockStorage.invoke(any(), any(), any(),
any())).thenReturn(true);
+ when(node.mockStorage.invoke(any(), any(), any(), any(),
any())).thenReturn(true);
Condition condition = Conditions.notExists(expKey);
@@ -672,7 +672,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
ArgumentCaptor<Collection<Operation>> failureCaptor =
ArgumentCaptor.forClass(Collection.class);
- verify(node.mockStorage).invoke(conditionCaptor.capture(),
successCaptor.capture(), failureCaptor.capture(), any());
+ verify(node.mockStorage).invoke(conditionCaptor.capture(),
successCaptor.capture(), failureCaptor.capture(), any(), any());
assertArrayEquals(expKey.bytes(), conditionCaptor.getValue().key());
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 414aabc03c..25c2324ba9 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -24,11 +24,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -36,6 +38,9 @@ import org.jetbrains.annotations.Nullable;
* Defines key/value storage interface.
*/
public interface KeyValueStorage extends ManuallyCloseable {
+ byte[] INVOKE_RESULT_TRUE_BYTES = {ByteUtils.booleanToByte(true)};
+ byte[] INVOKE_RESULT_FALSE_BYTES = {ByteUtils.booleanToByte(false)};
+
/**
* Starts the given storage, allocating the necessary resources.
*/
@@ -142,20 +147,28 @@ public interface KeyValueStorage extends
ManuallyCloseable {
* @param success Success operations.
* @param failure Failure operations.
* @param opTs Operation's timestamp.
+ * @param commandId Command Id.
* @return Result of test condition.
*/
- boolean invoke(Condition condition, Collection<Operation> success,
Collection<Operation> failure, HybridTimestamp opTs);
+ boolean invoke(
+ Condition condition,
+ Collection<Operation> success,
+ Collection<Operation> failure,
+ HybridTimestamp opTs,
+ CommandId commandId
+ );
/**
* Invoke, which supports nested conditional statements with left and
right branches of execution.
*
* @param iif {@link If} statement to invoke
* @param opTs Operation's timestamp.
+ * @param commandId Command Id.
* @return execution result
* @see If
* @see StatementResult
*/
- StatementResult invoke(If iif, HybridTimestamp opTs);
+ StatementResult invoke(If iif, HybridTimestamp opTs, CommandId commandId);
/**
* Returns cursor by entries which correspond to the given keys range.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 5036fc8c82..d24d683488 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -21,10 +21,12 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
import static
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -171,8 +173,18 @@ public class WatchProcessor implements ManuallyCloseable {
// Revision must be the same for all entries.
long newRevision = updatedEntries.get(0).revision();
+ List<Entry> filteredUpdatedEntries =
updatedEntries.stream()
+ .filter(entry ->
+ entry.key().length <=
IDEMPOTENT_COMMAND_PREFIX_BYTES.length
+ ||
+ entry.key().length >
IDEMPOTENT_COMMAND_PREFIX_BYTES.length
+ &&
!ByteBuffer.wrap(entry.key(), 0, IDEMPOTENT_COMMAND_PREFIX_BYTES.length)
+
.equals(ByteBuffer.wrap(IDEMPOTENT_COMMAND_PREFIX_BYTES)))
+ .collect(Collectors.toList());
+
// Collect all the events for each watch.
- CompletableFuture<List<WatchAndEvents>>
watchesAndEventsFuture = collectWatchesAndEvents(updatedEntries, newRevision);
+ CompletableFuture<List<WatchAndEvents>>
watchesAndEventsFuture =
+ collectWatchesAndEvents(filteredUpdatedEntries,
newRevision);
return watchesAndEventsFuture
.thenComposeAsync(watchAndEvents -> {
@@ -194,7 +206,7 @@ public class WatchProcessor implements ManuallyCloseable {
);
notificationFuture.whenComplete((unused, e) ->
{
- maybeLogLongProcessing(updatedEntries,
startTimeNanos);
+
maybeLogLongProcessing(filteredUpdatedEntries, startTimeNanos);
if (e != null) {
failureProcessor.process(new
FailureContext(CRITICAL_ERROR, e));
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 7a4102656e..f0fc6fab2a 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -32,6 +32,7 @@ import static
org.apache.ignite.internal.metastorage.server.persistence.StorageC
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
+import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
@@ -65,12 +66,15 @@ import java.util.function.LongConsumer;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
@@ -89,6 +93,7 @@ import
org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -660,7 +665,13 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
@Override
- public boolean invoke(Condition condition, Collection<Operation> success,
Collection<Operation> failure, HybridTimestamp opTs) {
+ public boolean invoke(
+ Condition condition,
+ Collection<Operation> success,
+ Collection<Operation> failure,
+ HybridTimestamp opTs,
+ CommandId commandId
+ ) {
rwLock.writeLock().lock();
try {
@@ -668,7 +679,12 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
boolean branch = condition.test(entries);
- Collection<Operation> ops = branch ? success : failure;
+ Collection<Operation> ops = branch ? new ArrayList<>(success) :
new ArrayList<>(failure);
+
+ ops.add(Operations.put(
+ new
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES,
ByteUtils.toBytes(commandId))),
+ branch ? INVOKE_RESULT_TRUE_BYTES :
INVOKE_RESULT_FALSE_BYTES)
+ );
applyOperations(ops, opTs);
@@ -681,7 +697,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
@Override
- public StatementResult invoke(If iif, HybridTimestamp opTs) {
+ public StatementResult invoke(If iif, HybridTimestamp opTs, CommandId
commandId) {
rwLock.writeLock().lock();
try {
@@ -703,7 +719,14 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
if (branch.isTerminal()) {
Update update = branch.update();
- applyOperations(update.operations(), opTs);
+ Collection<Operation> ops = new
ArrayList<>(update.operations());
+
+ ops.add(Operations.put(
+ new
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES,
ByteUtils.toBytes(commandId))),
+ update.result().result())
+ );
+
+ applyOperations(ops, opTs);
return update.result();
} else {
@@ -1415,7 +1438,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
UpdatedEntries copy = updatedEntries.transfer();
assert copy.ts != null;
-
watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 43cf5624e0..a49e594983 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -167,6 +167,7 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
@Override
public boolean onSnapshotLoad(Path path) {
storage.restoreSnapshot(path);
+ writeHandler.onSnapshotLoad();
return true;
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index a664c2254c..95c3685fb1 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -17,7 +17,11 @@
package org.apache.ignite.internal.metastorage.server.raft;
+import static java.util.Arrays.copyOfRange;
+import static org.apache.ignite.internal.util.ByteUtils.byteToBoolean;
+
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +30,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.CommandId;
+import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
import org.apache.ignite.internal.metastorage.command.InvokeCommand;
import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
@@ -38,6 +43,7 @@ import
org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.ConditionType;
import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Statement.IfStatement;
import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
@@ -55,6 +61,8 @@ import
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
/**
@@ -64,6 +72,10 @@ public class MetaStorageWriteHandler {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(MetaStorageWriteHandler.class);
+ public static final byte[] IDEMPOTENT_COMMAND_PREFIX_BYTES =
"icp.".getBytes(StandardCharsets.UTF_8);
+
+ private static final MetaStorageMessagesFactory MSG_FACTORY = new
MetaStorageMessagesFactory();
+
private final KeyValueStorage storage;
private final ClusterTimeImpl clusterTime;
@@ -173,11 +185,11 @@ public class MetaStorageWriteHandler {
} else if (command instanceof InvokeCommand) {
InvokeCommand cmd = (InvokeCommand) command;
- clo.result(storage.invoke(toCondition(cmd.condition()),
cmd.success(), cmd.failure(), opTime));
+ clo.result(storage.invoke(toCondition(cmd.condition()),
cmd.success(), cmd.failure(), opTime, cmd.id()));
} else if (command instanceof MultiInvokeCommand) {
MultiInvokeCommand cmd = (MultiInvokeCommand) command;
- clo.result(storage.invoke(toIf(cmd.iif()), opTime));
+ clo.result(storage.invoke(toIf(cmd.iif()), opTime, cmd.id()));
} else if (command instanceof SyncTimeCommand) {
storage.advanceSafeTime(command.safeTime());
@@ -310,6 +322,36 @@ public class MetaStorageWriteHandler {
return false;
}
+ /**
+ * The callback that is called right after storage is updated with a
snapshot.
+ */
+ void onSnapshotLoad() {
+ byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+ byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+ Cursor<Entry> cursor = storage.range(keyFrom, keyTo);
+ // It's fine to lose original command start time - in that case we
will store the entry a little bit longer that necessary.
+ HybridTimestamp now = clusterTime.now();
+
+ try (cursor) {
+ for (Entry entry : cursor) {
+ if (!entry.tombstone()) {
+ byte[] commandIdBytes = copyOfRange(entry.key(),
IDEMPOTENT_COMMAND_PREFIX_BYTES.length, entry.key().length);
+ CommandId commandId = ByteUtils.fromBytes(commandIdBytes);
+
+ Serializable result;
+ if (entry.value().length == 1) {
+ result = byteToBoolean(entry.value()[0]);
+ } else {
+ result =
MSG_FACTORY.statementResult().result(entry.value()).build();
+ }
+
+ idempotentCommandCache.put(commandId, new
IdempotentCommandCachedResult(result, now));
+ }
+ }
+ }
+ }
+
private static class IdempotentCommandCachedResult {
@Nullable
final Serializable result;
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 4c53d34fbc..d6b695729a 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
import org.apache.ignite.internal.metastorage.server.ValueCondition.Type;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -690,7 +692,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Success" branch is applied.
assertTrue(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -743,7 +745,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Failure" branch is applied.
assertFalse(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -796,7 +798,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Success" branch is applied.
assertTrue(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -849,7 +851,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Failure" branch is applied.
assertFalse(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -902,7 +904,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Success" branch is applied.
assertTrue(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -958,7 +960,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Failure" branch is applied.
assertFalse(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -1011,7 +1013,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Success" branch is applied.
assertTrue(branch);
assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -1063,7 +1065,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Failure" branch is applied.
assertFalse(branch);
assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
+ assertEquals(3, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -1115,7 +1117,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Success" branch is applied.
assertTrue(branch);
assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
+ assertEquals(3, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -1168,7 +1170,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Failure" branch is applied.
assertFalse(branch);
assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -1224,7 +1226,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Success" branch is applied.
assertTrue(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -1280,7 +1282,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
// "Failure" branch is applied.
assertFalse(branch);
assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
@@ -1333,8 +1335,8 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertTrue(branch);
// No updates.
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
+ assertEquals(2, storage.revision());
+ assertEquals(2, storage.updateCounter());
// Put.
branch = invokeOnMs(
@@ -1348,16 +1350,15 @@ public abstract class
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
assertTrue(branch);
- // +1 for revision, +2 for update counter.
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(3, storage.revision());
+ assertEquals(5, storage.updateCounter());
Entry e2 = storage.get(key2);
assertFalse(e2.empty());
assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(2, e2.updateCounter());
+ assertEquals(3, e2.revision());
+ assertEquals(3, e2.updateCounter());
assertArrayEquals(key2, e2.key());
assertArrayEquals(val2, e2.value());
@@ -1365,8 +1366,8 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertFalse(e3.empty());
assertFalse(e3.tombstone());
- assertEquals(2, e3.revision());
- assertEquals(3, e3.updateCounter());
+ assertEquals(3, e3.revision());
+ assertEquals(4, e3.updateCounter());
assertArrayEquals(key3, e3.key());
assertArrayEquals(val3, e3.value());
@@ -1382,24 +1383,23 @@ public abstract class
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
assertTrue(branch);
- // +1 for revision, +2 for update counter.
- assertEquals(3, storage.revision());
- assertEquals(5, storage.updateCounter());
+ assertEquals(4, storage.revision());
+ assertEquals(8, storage.updateCounter());
e2 = storage.get(key2);
assertFalse(e2.empty());
assertTrue(e2.tombstone());
- assertEquals(3, e2.revision());
- assertEquals(4, e2.updateCounter());
+ assertEquals(4, e2.revision());
+ assertEquals(6, e2.updateCounter());
assertArrayEquals(key2, e2.key());
e3 = storage.get(key3);
assertFalse(e3.empty());
assertTrue(e3.tombstone());
- assertEquals(3, e3.revision());
- assertEquals(5, e3.updateCounter());
+ assertEquals(4, e3.revision());
+ assertEquals(7, e3.updateCounter());
assertArrayEquals(key3, e3.key());
}
@@ -1463,7 +1463,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertEquals(1, branch.getAsInt());
assertEquals(4, storage.revision());
- assertEquals(4, storage.updateCounter());
+ assertEquals(5, storage.updateCounter());
Entry e1 = storage.get(key1);
assertEquals(4, e1.revision());
@@ -1541,7 +1541,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertEquals(2, branch.getAsInt());
assertEquals(5, storage.revision());
- assertEquals(6, storage.updateCounter());
+ assertEquals(7, storage.updateCounter());
Entry e1 = storage.get(key1);
assertEquals(5, e1.revision());
@@ -1610,7 +1610,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertEquals(3, branch.getAsInt());
assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
+ assertEquals(4, storage.updateCounter());
Entry e1 = storage.get(key1);
assertEquals(1, e1.revision());
@@ -2176,10 +2176,20 @@ public abstract class
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
}
private boolean invokeOnMs(Condition condition, Collection<Operation>
success, Collection<Operation> failure) {
- return storage.invoke(condition, success, failure,
HybridTimestamp.MIN_VALUE);
+ return storage.invoke(
+ condition,
+ success,
+ failure,
+ HybridTimestamp.MIN_VALUE,
+ new CommandIdGenerator(() ->
UUID.randomUUID().toString()).newId()
+ );
}
private StatementResult invokeOnMs(If iif) {
- return storage.invoke(iif, HybridTimestamp.MIN_VALUE);
+ return storage.invoke(
+ iif,
+ HybridTimestamp.MIN_VALUE,
+ new CommandIdGenerator(() ->
UUID.randomUUID().toString()).newId()
+ );
}
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index f130c60819..e5ad163d2d 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.metastorage.server;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
@@ -42,14 +43,19 @@ import java.util.function.LongConsumer;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.metastorage.impl.EntryImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -231,13 +237,26 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
@Override
- public boolean invoke(Condition condition, Collection<Operation> success,
Collection<Operation> failure, HybridTimestamp opTs) {
+ public boolean invoke(
+ Condition condition,
+ Collection<Operation> success,
+ Collection<Operation> failure,
+ HybridTimestamp opTs,
+ CommandId commandId
+ ) {
synchronized (mux) {
Collection<Entry> e = getAll(Arrays.asList(condition.keys()));
boolean branch = condition.test(e.toArray(new Entry[]{}));
- Collection<Operation> ops = branch ? success : failure;
+ Collection<Operation> ops = branch ? new ArrayList<>(success) :
new ArrayList<>(failure);
+
+ // In case of in-memory storage, there's no sense in "persisting"
invoke result, however same persistent source operations
+ // were added in order to have matching revisions count through
all storages.
+ ops.add(Operations.put(
+ new
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES,
ByteUtils.toBytes(commandId))),
+ branch ? INVOKE_RESULT_TRUE_BYTES :
INVOKE_RESULT_FALSE_BYTES)
+ );
long curRev = rev + 1;
@@ -274,7 +293,7 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
@Override
- public StatementResult invoke(If iif, HybridTimestamp opTs) {
+ public StatementResult invoke(If iif, HybridTimestamp opTs, CommandId
commandId) {
synchronized (mux) {
If currIf = iif;
while (true) {
@@ -287,7 +306,16 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
boolean modified = false;
- for (Operation op : branch.update().operations()) {
+ Collection<Operation> ops = new
ArrayList<>(branch.update().operations());
+
+ // In case of in-memory storage, there's no sense in
"persisting" invoke result, however same persistent source
+ // operations were added in order to have matching
revisions count through all storages.
+ ops.add(Operations.put(
+ new
ByteArray(ArrayUtils.concat(IDEMPOTENT_COMMAND_PREFIX_BYTES,
ByteUtils.toBytes(commandId))),
+ branch.update().result().result())
+ );
+
+ for (Operation op : ops) {
switch (op.type()) {
case PUT:
doPut(op.key(), op.value(), curRev);
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
index b3c0dcc838..74304e9d8a 100644
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
@@ -27,12 +27,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -84,7 +86,13 @@ public class DistributedConfigurationStorageTest extends
ConfigurationStorageTes
Collection<Operation> success = invocation.getArgument(1);
Collection<Operation> failure = invocation.getArgument(2);
- boolean invokeResult =
metaStorage.invoke(toServerCondition(condition), success, failure,
HybridTimestamp.MIN_VALUE);
+ boolean invokeResult = metaStorage.invoke(
+ toServerCondition(condition),
+ success,
+ failure,
+ HybridTimestamp.MIN_VALUE,
+ new CommandIdGenerator(() ->
UUID.randomUUID().toString()).newId()
+ );
return CompletableFuture.completedFuture(invokeResult);
});