This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5cc9b65b7a IGNITE-22914 Store a checksum with each Metastorage
revision (#4528)
5cc9b65b7a is described below
commit 5cc9b65b7a450653160254157a8a282d053cbe8a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 9 19:55:21 2024 +0400
IGNITE-22914 Store a checksum with each Metastorage revision (#4528)
---
.../internal/metastorage/MetaStorageManager.java | 3 +-
.../ignite/internal/metastorage/dsl/Update.java | 4 +-
.../metastorage/impl/ItMetaStorageServiceTest.java | 11 +-
.../metastorage/command/InvokeCommand.java | 12 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 3 +-
.../metastorage/impl/MetaStorageService.java | 4 +-
.../metastorage/impl/MetaStorageServiceImpl.java | 5 +-
.../ignite/internal/metastorage/server/If.java | 2 +-
.../metastorage/server/KeyValueStorage.java | 12 +-
.../metastorage/server/MetastorageChecksum.java | 218 +++++++++++++++++
.../server/persistence/RocksDbKeyValueStorage.java | 150 +++++++++---
.../persistence/StorageColumnFamilyType.java | 5 +-
.../server/raft/MetaStorageWriteHandler.java | 4 +-
.../server/BasicOperationsKeyValueStorageTest.java | 33 ++-
.../RocksDbCompactionKeyValueStorageTest.java | 17 ++
.../server/RocksDbKeyValueStorageTest.java | 265 +++++++++++++++++++++
.../server/SimpleInMemoryKeyValueStorageTest.java | 5 +
.../server/TestRocksDbKeyValueStorageTest.java | 5 +
.../server/SimpleInMemoryKeyValueStorage.java | 13 +-
.../replicator/ItReplicaLifecycleTest.java | 6 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../storage/DistributedConfigurationStorage.java | 8 +-
.../DistributedConfigurationStorageTest.java | 10 +-
.../ItDisasterRecoveryReconfigurationTest.java | 3 +-
.../table/distributed/TableManagerTest.java | 5 +-
25 files changed, 717 insertions(+), 90 deletions(-)
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index e24cd3877f..4685ff6f3b 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.metastorage;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -325,7 +324,7 @@ public interface MetaStorageManager extends IgniteComponent
{
/**
* Updates an entry for the given key conditionally.
*/
- CompletableFuture<Boolean> invoke(Condition cond, Collection<Operation>
success, Collection<Operation> failure);
+ CompletableFuture<Boolean> invoke(Condition cond, List<Operation> success,
List<Operation> failure);
/**
* Invoke, which supports nested conditional statements.
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Update.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Update.java
index e59f8b720e..ece29be898 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Update.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Update.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.metastorage.dsl;
-import java.util.Collection;
+import java.util.List;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
@@ -27,7 +27,7 @@ import
org.apache.ignite.internal.network.annotations.Transferable;
@Transferable(MetaStorageMessageGroup.UPDATE)
public interface Update extends NetworkMessage {
/** Operations. */
- Collection<Operation> operations();
+ List<Operation> operations();
/** Result. */
StatementResult result();
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index c7cec67a67..e04d73c5b7 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -57,7 +57,6 @@ import static org.mockito.Mockito.when;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -693,18 +692,18 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
var conditionCaptor =
ArgumentCaptor.forClass(AbstractSimpleCondition.class);
- ArgumentCaptor<Collection<Operation>> successCaptor =
ArgumentCaptor.forClass(Collection.class);
+ ArgumentCaptor<List<Operation>> successCaptor =
ArgumentCaptor.forClass(List.class);
- ArgumentCaptor<Collection<Operation>> failureCaptor =
ArgumentCaptor.forClass(Collection.class);
+ ArgumentCaptor<List<Operation>> failureCaptor =
ArgumentCaptor.forClass(List.class);
verify(node.mockStorage).invoke(conditionCaptor.capture(),
successCaptor.capture(), failureCaptor.capture(), any(), any());
assertArrayEquals(expKey.bytes(), conditionCaptor.getValue().key());
- assertArrayEquals(expKey.bytes(),
toByteArray(successCaptor.getValue().iterator().next().key()));
- assertArrayEquals(expVal,
toByteArray(successCaptor.getValue().iterator().next().value()));
+ assertArrayEquals(expKey.bytes(),
toByteArray(successCaptor.getValue().get(0).key()));
+ assertArrayEquals(expVal,
toByteArray(successCaptor.getValue().get(0).value()));
- assertEquals(OperationType.NO_OP,
failureCaptor.getValue().iterator().next().type());
+ assertEquals(OperationType.NO_OP,
failureCaptor.getValue().get(0).type());
}
// TODO: IGNITE-14693 Add tests for exception handling logic: onError,
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
index 0fdea026cc..cbbb9be3c0 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.metastorage.command;
-import java.util.Collection;
+import java.util.List;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.network.annotations.Transferable;
@@ -29,22 +29,16 @@ import
org.apache.ignite.internal.network.annotations.Transferable;
public interface InvokeCommand extends IdempotentCommand {
/**
* Returns condition.
- *
- * @return Condition.
*/
Condition condition();
/**
* Returns success operations.
- *
- * @return Success operations.
*/
- Collection<Operation> success();
+ List<Operation> success();
/**
* Returns failure operations.
- *
- * @return Failure operations.
*/
- Collection<Operation> failure();
+ List<Operation> failure();
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 668ffe73d8..a22eeded55 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -26,7 +26,6 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -821,7 +820,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
@Override
- public CompletableFuture<Boolean> invoke(Condition cond,
Collection<Operation> success, Collection<Operation> failure) {
+ public CompletableFuture<Boolean> invoke(Condition cond, List<Operation>
success, List<Operation> failure) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index 563a025b7e..c2c5d41e92 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.metastorage.impl;
-import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -164,7 +164,7 @@ public interface MetaStorageService extends
ManuallyCloseable {
* @see Condition
* @see Operation
*/
- CompletableFuture<Boolean> invoke(Condition condition,
Collection<Operation> success, Collection<Operation> failure);
+ CompletableFuture<Boolean> invoke(Condition condition, List<Operation>
success, List<Operation> failure);
/**
* Invoke, which supports nested conditional statements. For detailed docs
about construction of new if statement, look at {@link Iif}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 1ec59b36cf..90703ee62d 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.metastorage.command.GetAllCommand.getAl
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -170,8 +169,8 @@ public class MetaStorageServiceImpl implements
MetaStorageService {
@Override
public CompletableFuture<Boolean> invoke(
Condition condition,
- Collection<Operation> success,
- Collection<Operation> failure
+ List<Operation> success,
+ List<Operation> failure
) {
InvokeCommand invokeCommand = context.commandsFactory().invokeCommand()
.condition(condition)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/If.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/If.java
index 62014f5da0..bed39b01ff 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/If.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/If.java
@@ -21,7 +21,7 @@ import org.apache.ignite.internal.metastorage.dsl.Update;
/**
* Root building block for the compound meta storage invoke command.
- * Contains of boolean condition and 2 branches of execution, like usual
programming language's if.
+ * Consists of boolean condition and 2 branches of execution, like usual
programming language's if.
* Every branch can be either a new {@link If} statement (non-terminal) or a
result terminal statement {@link Update}.
*/
public class If {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 104c2d8bb4..15d9357bbd 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -196,8 +196,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
*/
boolean invoke(
Condition condition,
- Collection<Operation> success,
- Collection<Operation> failure,
+ List<Operation> success,
+ List<Operation> failure,
HybridTimestamp opTs,
CommandId commandId
);
@@ -424,4 +424,12 @@ public interface KeyValueStorage extends ManuallyCloseable
{
* @see #saveCompactionRevision(long)
*/
long getCompactionRevision();
+
+ /**
+ * Returns checksum corresponding to the revision.
+ *
+ * @param revision Revision.
+ * @throws CompactedException If the requested revision has been compacted.
+ */
+ long checksum(long revision);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/MetastorageChecksum.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/MetastorageChecksum.java
new file mode 100644
index 0000000000..1d743c46ab
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/MetastorageChecksum.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.List;
+import java.util.zip.Checksum;
+import org.apache.ignite.raft.jraft.util.CRC64;
+
+/**
+ * Checksum calculation logic for the Metastorage.
+ *
+ * <p>An object of this class has the following state:
+ *
+ * <ul>
+ * <li>Last revision checksum (0 if the Metastorage has no revisions
yet)</li>
+ * <li>Current round checksum state (accumulated during the round)</li>
+ * </ul>
+ *
+ * <p>A round per revision is performed. For simple (non-invoke) commands, the
whole round is performed in one call (see
+ * {@link #wholePut(byte[], byte[])} and so on). For compound commands
(invokes), a round is started
+ * ({@link #prepareForInvoke(boolean, int, byte[])}) and finished ({@link
#roundValue()}) explicitly, and the checksum is updated in it
+ * per each operation that actually gets executed ({@link
#appendPutAsPart(byte[], byte[])} and so on).
+ *
+ * <p>During a round (including its finish), only the current round checksum
state is updated. To update the last revision checksum, call
+ * {@link #commitRound(long)}.
+ */
+public class MetastorageChecksum {
+ private long lastChecksum;
+
+ private final Checksum checksum = new CRC64();
+
+ /** Constructor. */
+ public MetastorageChecksum(long lastChecksum) {
+ this.lastChecksum = lastChecksum;
+ }
+
+ /**
+ * Calculates a checksum for a PUT command.
+ *
+ * @param key Key.
+ * @param value Value.
+ */
+ public long wholePut(byte[] key, byte[] value) {
+ return checksumWholeOperation(Op.PUT, () -> updateForPut(key, value));
+ }
+
+ private void updateForPut(byte[] key, byte[] value) {
+ updateWithBytes(key);
+ updateWithBytes(value);
+ }
+
+ private long checksumWholeOperation(Op operation, Updater updater) {
+ prepareRound(operation);
+
+ updater.update();
+
+ return roundValue();
+ }
+
+ private void prepareRound(Op operation) {
+ checksum.reset();
+ updateWithLong(lastChecksum);
+ checksum.update(operation.code);
+ }
+
+ private void updateWithBytes(byte[] bytes) {
+ updateWithInt(bytes.length);
+ checksum.update(bytes);
+ }
+
+ private void updateWithInt(int value) {
+ for (int i = 0; i < Integer.BYTES; i++) {
+ checksum.update((byte) (value >> (Integer.SIZE - 8)));
+ value <<= 8;
+ }
+ }
+
+ private void updateWithLong(long value) {
+ for (int i = 0; i < Long.BYTES; i++) {
+ checksum.update((byte) (value >> (Long.SIZE - 8)));
+ value <<= 8;
+ }
+ }
+
+ /**
+ * Calculates a checksum for a PUT_ALL command.
+ *
+ * @param keys Keys.
+ * @param values Values.
+ */
+ public long wholePutAll(List<byte[]> keys, List<byte[]> values) {
+ return checksumWholeOperation(Op.PUT_ALL, () -> updateForPutAll(keys,
values));
+ }
+
+ private void updateForPutAll(List<byte[]> keys, List<byte[]> values) {
+ updateWithInt(keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ updateForPut(keys.get(i), values.get(i));
+ }
+ }
+
+ /**
+ * Calculates a checksum for a REMOVE command.
+ *
+ * @param key Key.
+ */
+ public long wholeRemove(byte[] key) {
+ return checksumWholeOperation(Op.REMOVE, () -> updateForRemove(key));
+ }
+
+ private void updateForRemove(byte[] key) {
+ updateWithBytes(key);
+ }
+
+ /**
+ * Calculates a checksum for a REMOVE command.
+ *
+ * @param keys Keys.
+ */
+ public long wholeRemoveAll(List<byte[]> keys) {
+ return checksumWholeOperation(Op.REMOVE_ALL, () ->
updateForRemoveAll(keys));
+ }
+
+ private void updateForRemoveAll(List<byte[]> keys) {
+ updateWithInt(keys.size());
+ for (byte[] key : keys) {
+ updateForRemove(key);
+ }
+ }
+
+ /**
+ * Initiates a round for an invocation command.
+ *
+ * @param multiInvoke Whether this is a multi-invoke.
+ * @param opCount Number of operations that get executed.
+ * @param updateResult Update result.
+ */
+ public void prepareForInvoke(boolean multiInvoke, int opCount, byte[]
updateResult) {
+ prepareRound(multiInvoke ? Op.MULTI_INVOKE : Op.SINGLE_INVOKE);
+ updateWithBytes(updateResult);
+ updateWithInt(opCount);
+ }
+
+ /**
+ * Appends a PUT command as a part of an invocation command.
+ *
+ * @param key Key.
+ * @param value Value.
+ */
+ public void appendPutAsPart(byte[] key, byte[] value) {
+ appendPartOfCompound(Op.PUT, () -> updateForPut(key, value));
+ }
+
+ /**
+ * Appends a REMOVE command as a part of an invocation command.
+ *
+ * @param key Key.
+ */
+ public void appendRemoveAsPart(byte[] key) {
+ appendPartOfCompound(Op.REMOVE, () -> updateForRemove(key));
+ }
+
+ private void appendPartOfCompound(Op operation, Updater updater) {
+ checksum.update(operation.code);
+ updater.update();
+ }
+
+ /**
+ * Saves the new checksum as the last checksum.
+ *
+ * @param newChecksum New checksum.
+ */
+ public void commitRound(long newChecksum) {
+ lastChecksum = newChecksum;
+ }
+
+ /**
+ * Returns current round checksum value.
+ */
+ public long roundValue() {
+ return checksum.getValue();
+ }
+
+ private enum Op {
+ PUT(1),
+ PUT_ALL(2),
+ REMOVE(3),
+ REMOVE_ALL(4),
+ SINGLE_INVOKE(5),
+ MULTI_INVOKE(6);
+
+ private final int code;
+
+ Op(int code) {
+ this.code = code;
+ }
+ }
+
+ @FunctionalInterface
+ private interface Updater {
+ void update();
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index dbc0c08e97..39e96deda2 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -39,6 +39,7 @@ import static
org.apache.ignite.internal.metastorage.server.persistence.RocksSto
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_CHECKSUM;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
@@ -46,6 +47,7 @@ import static
org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR;
@@ -53,6 +55,7 @@ import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.STARTING_STORAGE_ER
import static org.rocksdb.util.SizeUnit.MB;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -84,6 +87,7 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.OperationType;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.dsl.Update;
@@ -95,6 +99,7 @@ import
org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils;
+import org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
import org.apache.ignite.internal.metastorage.server.Statement;
import org.apache.ignite.internal.metastorage.server.Value;
@@ -193,6 +198,9 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
/** Revision to timestamp mapping column family. */
private volatile ColumnFamily revisionToTs;
+ /** Revision to checksum mapping column family. */
+ private volatile ColumnFamily revisionToChecksum;
+
/** Snapshot manager. */
private volatile RocksSnapshotManager snapshotManager;
@@ -209,6 +217,13 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
*/
private long rev;
+ /**
+ * Facility to work with checksums.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
+ private MetastorageChecksum checksum;
+
/**
* Last compaction revision that was set or restored from a snapshot.
*
@@ -320,11 +335,15 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
ColumnFamilyOptions revToTsFamilyOptions = new
ColumnFamilyOptions(baseOptions);
this.rocksResources.add(revToTsFamilyOptions);
+ ColumnFamilyOptions revToChecksumFamilyOptions = new
ColumnFamilyOptions(baseOptions);
+ this.rocksResources.add(revToChecksumFamilyOptions);
+
return List.of(
new ColumnFamilyDescriptor(DATA.nameAsBytes(),
dataFamilyOptions),
new ColumnFamilyDescriptor(INDEX.nameAsBytes(),
indexFamilyOptions),
new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(),
tsToRevFamilyOptions),
- new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(),
revToTsFamilyOptions)
+ new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(),
revToTsFamilyOptions),
+ new ColumnFamilyDescriptor(REVISION_TO_CHECKSUM.nameAsBytes(),
revToChecksumFamilyOptions)
);
}
@@ -341,7 +360,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
private void createDb() throws RocksDBException {
List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
- assert descriptors.size() == 4;
+ assert descriptors.size() == 5 : descriptors.size();
var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
@@ -359,8 +378,10 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
revisionToTs = ColumnFamily.wrap(db, handles.get(3));
+ revisionToChecksum = ColumnFamily.wrap(db, handles.get(4));
+
snapshotManager = new RocksSnapshotManager(db,
- List.of(fullRange(data), fullRange(index),
fullRange(tsToRevision), fullRange(revisionToTs)),
+ List.of(fullRange(data), fullRange(index),
fullRange(tsToRevision), fullRange(revisionToTs),
fullRange(revisionToChecksum)),
snapshotExecutor
);
@@ -370,6 +391,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rev = ByteUtils.bytesToLong(revision);
}
+ checksum = new MetastorageChecksum(revision == null ? 0 :
checksumByRevision(rev));
+
byte[] compactionRevisionBytes = data.get(COMPACTION_REVISION_KEY);
if (compactionRevisionBytes != null) {
@@ -377,6 +400,16 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
+ private long checksumByRevision(long revision) throws RocksDBException {
+ byte[] bytes = revisionToChecksum.get(longToBytes(revision));
+
+ if (bytes == null) {
+ throw new CompactedException(revision, compactionRevision);
+ }
+
+ return bytesToLong(bytes);
+ }
+
/**
* Notifies of revision update.
* Must be called under the {@link #rwLock}.
@@ -473,13 +506,15 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rwLock.writeLock().lock();
try (WriteBatch batch = new WriteBatch()) {
+ long newChecksum = checksum.wholePut(key, value);
+
long curRev = rev + 1;
addDataToBatch(batch, key, value, curRev, opTs);
updateKeysIndex(batch, key, curRev);
- fillAndWriteBatch(batch, curRev, opTs);
+ completeAndWriteBatch(batch, curRev, opTs, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
@@ -512,24 +547,26 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
* @param batch Write batch.
* @param newRev New revision.
* @param ts Operation's timestamp.
+ * @param newChecksum Checksum corresponding to the revision.
* @throws RocksDBException If failed.
*/
- private void fillAndWriteBatch(WriteBatch batch, long newRev, @Nullable
HybridTimestamp ts) throws RocksDBException {
+ private void completeAndWriteBatch(WriteBatch batch, long newRev,
HybridTimestamp ts, long newChecksum) throws RocksDBException {
byte[] revisionBytes = longToBytes(newRev);
data.put(batch, REVISION_KEY, revisionBytes);
- if (ts != null) {
- byte[] tsBytes = hybridTsToArray(ts);
+ byte[] tsBytes = hybridTsToArray(ts);
- tsToRevision.put(batch, tsBytes, revisionBytes);
- revisionToTs.put(batch, revisionBytes, tsBytes);
- }
+ tsToRevision.put(batch, tsBytes, revisionBytes);
+ revisionToTs.put(batch, revisionBytes, tsBytes);
+
+ validateNoChecksumConflict(newRev, newChecksum);
+ revisionToChecksum.put(batch, revisionBytes, longToBytes(newChecksum));
db.write(defaultWriteOptions, batch);
rev = newRev;
-
+ checksum.commitRound(newChecksum);
updatedEntries.ts = ts;
queueWatchEvent();
@@ -537,6 +574,25 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
notifyRevisionUpdate();
}
+ private void validateNoChecksumConflict(long newRev, long newChecksum)
throws RocksDBException {
+ byte[] existingChecksumBytes =
revisionToChecksum.get(longToBytes(newRev));
+
+ if (existingChecksumBytes != null) {
+ long existingChecksum = bytesToLong(existingChecksumBytes);
+ if (existingChecksum != newChecksum) {
+ throw new MetaStorageException(
+ INTERNAL_ERR,
+ String.format(
+ "Metastorage revision checksum differs from a
checksum for the same revision saved earlier. "
+ + "This probably means that the
Metastorage has diverged. [revision=%d, existingChecksum=%d, "
+ + "newChecksum=%d]",
+ newRev, existingChecksum, newChecksum
+ )
+ );
+ }
+ }
+ }
+
private static byte[] hybridTsToArray(HybridTimestamp ts) {
return longToBytes(ts.longValue());
}
@@ -552,6 +608,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rwLock.writeLock().lock();
try (WriteBatch batch = new WriteBatch()) {
+ long newChecksum = checksum.wholePutAll(keys, values);
+
long curRev = rev + 1;
addAllToBatch(batch, keys, values, curRev, opTs);
@@ -560,7 +618,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
updateKeysIndex(batch, key, curRev);
}
- fillAndWriteBatch(batch, curRev, opTs);
+ completeAndWriteBatch(batch, curRev, opTs, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
@@ -628,13 +686,15 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rwLock.writeLock().lock();
try (WriteBatch batch = new WriteBatch()) {
+ long newChecksum = checksum.wholeRemove(key);
+
long curRev = rev + 1;
if (addToBatchForRemoval(batch, key, curRev, opTs)) {
updateKeysIndex(batch, key, curRev);
}
- fillAndWriteBatch(batch, curRev, opTs);
+ completeAndWriteBatch(batch, curRev, opTs, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
@@ -647,6 +707,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rwLock.writeLock().lock();
try (WriteBatch batch = new WriteBatch()) {
+ long newChecksum = checksum.wholeRemoveAll(keys);
+
long curRev = rev + 1;
List<byte[]> existingKeys = new ArrayList<>(keys.size());
@@ -661,7 +723,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
updateKeysIndex(batch, key, curRev);
}
- fillAndWriteBatch(batch, curRev, opTs);
+ completeAndWriteBatch(batch, curRev, opTs, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
@@ -672,8 +734,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
@Override
public boolean invoke(
Condition condition,
- Collection<Operation> success,
- Collection<Operation> failure,
+ List<Operation> success,
+ List<Operation> failure,
HybridTimestamp opTs,
CommandId commandId
) {
@@ -683,15 +745,16 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
Entry[] entries =
getAll(Arrays.asList(condition.keys())).toArray(new Entry[]{});
boolean branch = condition.test(entries);
+ ByteBuffer updateResult = ByteBuffer.wrap(branch ?
INVOKE_RESULT_TRUE_BYTES : INVOKE_RESULT_FALSE_BYTES);
- Collection<Operation> ops = branch ? new ArrayList<>(success) :
new ArrayList<>(failure);
+ List<Operation> ops = new ArrayList<>(branch ? success : failure);
ops.add(Operations.put(
new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString()),
- branch ? INVOKE_RESULT_TRUE_BYTES :
INVOKE_RESULT_FALSE_BYTES)
- );
+ updateResult
+ ));
- applyOperations(ops, opTs);
+ applyOperations(ops, opTs, false, updateResult);
return branch;
} catch (RocksDBException e) {
@@ -723,15 +786,16 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
if (branch.isTerminal()) {
Update update = branch.update();
+ ByteBuffer updateResult = update.result().result();
- Collection<Operation> ops = new
ArrayList<>(update.operations());
+ List<Operation> ops = new ArrayList<>(update.operations());
ops.add(Operations.put(
new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString()),
- update.result().result())
- );
+ updateResult
+ ));
- applyOperations(ops, opTs);
+ applyOperations(ops, opTs, true, updateResult);
return update.result();
} else {
@@ -745,21 +809,30 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- private void applyOperations(Collection<Operation> ops, HybridTimestamp
opTs) throws RocksDBException {
+ private void applyOperations(List<Operation> ops, HybridTimestamp opTs,
boolean multiInvoke, ByteBuffer updateResult)
+ throws RocksDBException {
long curRev = rev + 1;
List<byte[]> updatedKeys = new ArrayList<>();
+ int nonDummyOps = (int) ops.stream()
+ .filter(op -> op.type() != OperationType.NO_OP)
+ .count();
+ checksum.prepareForInvoke(multiInvoke, nonDummyOps,
toByteArray(updateResult));
+
try (WriteBatch batch = new WriteBatch()) {
for (Operation op : ops) {
byte @Nullable [] key = op.key() == null ? null :
toByteArray(op.key());
switch (op.type()) {
case PUT:
- addDataToBatch(batch, key, toByteArray(op.value()),
curRev, opTs);
+ byte[] value = toByteArray(op.value());
+ addDataToBatch(batch, key, value, curRev, opTs);
updatedKeys.add(key);
+ checksum.appendPutAsPart(key, value);
+
break;
case REMOVE:
@@ -767,6 +840,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
updatedKeys.add(key);
}
+ checksum.appendRemoveAsPart(key);
+
break;
case NO_OP:
@@ -781,7 +856,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
updateKeysIndex(batch, key, curRev);
}
- fillAndWriteBatch(batch, curRev, opTs);
+ completeAndWriteBatch(batch, curRev, opTs, checksum.roundValue());
}
}
@@ -964,7 +1039,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
try {
compactKeys(revision);
- compactRevisionToTimestampAndViceVersa(revision);
+ compactAuxiliaryMappings(revision);
} catch (Throwable t) {
throw new MetaStorageException(COMPACTION_ERR, "Error during
compaction: " + revision, t);
}
@@ -1605,6 +1680,21 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
+ @Override
+ public long checksum(long revision) {
+ rwLock.readLock().lock();
+
+ try {
+ assertRequestedRevisionLessThanOrEqualToCurrent(revision, rev);
+
+ return checksumByRevision(revision);
+ } catch (RocksDBException e) {
+ throw new MetaStorageException(INTERNAL_ERR, "Cannot get checksum
by revision: " + revision, e);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
private void compactKeys(long compactionRevision) throws RocksDBException {
compactInBatches(index, (it, batch) -> {
compactForKey(batch, it.key(), getAsLongs(it.value()),
compactionRevision);
@@ -1613,7 +1703,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
});
}
- private void compactRevisionToTimestampAndViceVersa(long
compactionRevision) throws RocksDBException {
+ private void compactAuxiliaryMappings(long compactionRevision) throws
RocksDBException {
compactInBatches(revisionToTs, (it, batch) -> {
long revision = bytesToLong(it.key());
@@ -1624,6 +1714,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
revisionToTs.delete(batch, it.key());
tsToRevision.delete(batch, it.value());
+ revisionToChecksum.delete(batch, it.key());
+
return true;
});
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
index ef2f0aa685..efe92fd3af 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
@@ -34,7 +34,10 @@ enum StorageColumnFamilyType {
TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
/** Column family for the revision to timestamp mapping. */
- REVISION_TO_TS("REVTOTS".getBytes(StandardCharsets.UTF_8));
+ REVISION_TO_TS("REVTOTS".getBytes(StandardCharsets.UTF_8)),
+
+ /** Column family for the revision to checksum mapping. */
+ REVISION_TO_CHECKSUM("REVTOCHSUM".getBytes(StandardCharsets.UTF_8));
/** Byte representation of the column family's name. */
private final byte[] nameAsBytes;
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index bba9fafb5e..6ddb1ddb6b 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -352,9 +352,7 @@ public class MetaStorageWriteHandler {
byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
- Cursor<Entry> cursor = storage.range(keyFrom, keyTo);
-
- try (cursor) {
+ try (Cursor<Entry> cursor = storage.range(keyFrom, keyTo)) {
for (Entry entry : cursor) {
if (!entry.tombstone()) {
CommandId commandId = CommandId.fromString(
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 88f3b15937..e5f2ff6f08 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -31,6 +31,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -85,6 +86,9 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
@WorkDirectory
Path workDir;
+ /** Whether the implemention under test supports checksums. */
+ protected abstract boolean supportsChecksums();
+
@Test
public void testPut() {
byte[] key = key(1);
@@ -2141,6 +2145,11 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
storage.put(key, value, hybridTimestamp(10));
+ long checksum1 = 0;
+ if (supportsChecksums()) {
+ checksum1 = storage.checksum(1);
+ }
+
Path snapshotDir = workDir.resolve("snapshotDir");
assertThat(storage.snapshot(snapshotDir), willCompleteSuccessfully());
@@ -2153,6 +2162,10 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertEquals(1L, storage.revision());
assertFalse(storage.get(key).empty());
+
+ if (supportsChecksums()) {
+ assertThat(storage.checksum(1), is(checksum1));
+ }
}
@Test
@@ -2245,33 +2258,41 @@ public abstract class
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
storage.put(key, value, MIN_VALUE);
}
- private void putAllToMs(List<byte[]> keys, List<byte[]> values) {
+ void putAllToMs(List<byte[]> keys, List<byte[]> values) {
storage.putAll(keys, values, MIN_VALUE);
}
- private void removeFromMs(byte[] key) {
+ void removeFromMs(byte[] key) {
storage.remove(key, MIN_VALUE);
}
- private void removeAllFromMs(List<byte[]> keys) {
+ void removeAllFromMs(List<byte[]> keys) {
storage.removeAll(keys, MIN_VALUE);
}
- private boolean invokeOnMs(Condition condition, Collection<Operation>
success, Collection<Operation> failure) {
+ private boolean invokeOnMs(Condition condition, List<Operation> success,
List<Operation> failure) {
+ return invokeOnMs(condition, success, failure, createCommandId());
+ }
+
+ boolean invokeOnMs(Condition condition, List<Operation> success,
List<Operation> failure, CommandId commandId) {
return storage.invoke(
condition,
success,
failure,
MIN_VALUE,
- createCommandId()
+ commandId
);
}
private StatementResult invokeOnMs(If iif) {
+ return invokeOnMs(iif, createCommandId());
+ }
+
+ StatementResult invokeOnMs(If iif, CommandId commandId) {
return storage.invoke(
iif,
MIN_VALUE,
- createCommandId()
+ commandId
);
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
index c224a25654..84352b7dc8 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
@@ -17,8 +17,13 @@
package org.apache.ignite.internal.metastorage.server;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.junit.jupiter.api.Test;
/** Compaction test for the RocksDB implementation of {@link KeyValueStorage}.
*/
public class RocksDbCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
@@ -26,4 +31,16 @@ public class RocksDbCompactionKeyValueStorageTest extends
AbstractCompactionKeyV
public KeyValueStorage createStorage() {
return new RocksDbKeyValueStorage("test", workDir.resolve("storage"),
new NoOpFailureManager());
}
+
+ @Test
+ void checksumsAreRemovedForCompactedRevisions() {
+ assertDoesNotThrow(() -> storage.checksum(3));
+
+ storage.compact(3);
+
+ assertThrows(CompactedException.class, () -> storage.checksum(1));
+ assertThrows(CompactedException.class, () -> storage.checksum(2));
+ assertThrows(CompactedException.class, () -> storage.checksum(3));
+ assertDoesNotThrow(() -> storage.checksum(4));
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index aafb437601..7bf9e94b49 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -17,8 +17,27 @@
package org.apache.ignite.internal.metastorage.server;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static
org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type.NOT_EXISTS;
+import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
+import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.zip.Checksum;
import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.CommandId;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.raft.jraft.util.CRC64;
+import org.junit.jupiter.api.Test;
/**
* Tests for RocksDB key-value storage implementation.
@@ -28,4 +47,250 @@ public class RocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStorageTe
public KeyValueStorage createStorage() {
return new RocksDbKeyValueStorage("test", workDir.resolve("storage"),
new NoOpFailureManager());
}
+
+ @Override
+ protected boolean supportsChecksums() {
+ return true;
+ }
+
+ @Test
+ public void putChecksum() {
+ byte[] key = key(1);
+ byte[] val = keyValue(1, 1);
+
+ putToMs(key, val);
+ long checksum1 = storage.checksum(1);
+
+ assertThat(checksum1, is(checksum(
+ longToBytes(0), // prev checksum
+ bytes(1), // PUT
+ intToBytes(key.length), key,
+ intToBytes(val.length), val
+ )));
+
+ // Repeating the same command, the checksum must be different.
+ putToMs(key, val);
+ assertThat(storage.checksum(2), is(checksum(
+ longToBytes(checksum1),
+ bytes(1),
+ intToBytes(key.length), key,
+ intToBytes(val.length), val
+ )));
+ }
+
+ @Test
+ public void putAllChecksum() {
+ byte[] key1 = key(1);
+ byte[] val1 = keyValue(1, 1);
+ byte[] key2 = key(2);
+ byte[] val2 = keyValue(2, 2);
+
+ putAllToMs(List.of(key1, key2), List.of(val1, val2));
+ long checksum1 = storage.checksum(1);
+
+ assertThat(checksum1, is(checksum(
+ longToBytes(0), // prev checksum
+ bytes(2), // PUT_ALL
+ intToBytes(2), // entry count
+ intToBytes(key1.length), key1,
+ intToBytes(val1.length), val1,
+ intToBytes(key2.length), key2,
+ intToBytes(val2.length), val2
+ )));
+
+ // Repeating the same command, the checksum must be different.
+ putAllToMs(List.of(key1, key2), List.of(val1, val2));
+ assertThat(storage.checksum(2), is(checksum(
+ longToBytes(checksum1),
+ bytes(2), // PUT_ALL
+ intToBytes(2), // entry count
+ intToBytes(key1.length), key1,
+ intToBytes(val1.length), val1,
+ intToBytes(key2.length), key2,
+ intToBytes(val2.length), val2
+ )));
+ }
+
+ @Test
+ public void removeChecksum() {
+ byte[] key = key(1);
+ byte[] val = keyValue(1, 1);
+
+ putToMs(key, val);
+ long checksum1 = storage.checksum(1);
+
+ removeFromMs(key);
+ long checksum2 = storage.checksum(2);
+ assertThat(checksum2, is(checksum(
+ longToBytes(checksum1),
+ bytes(3), // REMOVE
+ intToBytes(key.length), key
+ )));
+
+ // Repeating the same command, the checksum must be different.
+ removeFromMs(key);
+ assertThat(storage.checksum(3), is(checksum(
+ longToBytes(checksum2),
+ bytes(3), // REMOVE
+ intToBytes(key.length), key
+ )));
+ }
+
+ @Test
+ public void removeAllChecksum() {
+ byte[] key1 = key(1);
+ byte[] val1 = keyValue(1, 1);
+ byte[] key2 = key(2);
+ byte[] val2 = keyValue(2, 2);
+
+ putAllToMs(List.of(key1, key2), List.of(val1, val2));
+ long checksum1 = storage.checksum(1);
+
+ removeAllFromMs(List.of(key1, key2));
+ long checksum2 = storage.checksum(2);
+ assertThat(checksum2, is(checksum(
+ longToBytes(checksum1),
+ bytes(4), // REMOVE_ALL
+ intToBytes(2), // key count
+ intToBytes(key1.length), key1,
+ intToBytes(key2.length), key2
+ )));
+
+ // Repeating the same command, the checksum must be different.
+ removeAllFromMs(List.of(key1, key2));
+ assertThat(storage.checksum(3), is(checksum(
+ longToBytes(checksum2),
+ bytes(4), // REMOVE_ALL
+ intToBytes(2), // key count
+ intToBytes(key1.length), key1,
+ intToBytes(key2.length), key2
+ )));
+ }
+
+ @Test
+ public void singleInvokeChecksum() {
+ byte[] key = key(1);
+ byte[] val = keyValue(1, 1);
+ CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() ->
new UUID(1, 2));
+
+ ExistenceCondition condition = new ExistenceCondition(NOT_EXISTS, key);
+ List<Operation> successfulBranch = List.of(put(new ByteArray(key),
val));
+ List<Operation> failureBranch = List.of(remove(new ByteArray(key)));
+
+ CommandId commandId1 = commandIdGenerator.newId();
+ invokeOnMs(condition, successfulBranch, failureBranch, commandId1);
+
+ long checksum1 = storage.checksum(1);
+
+ byte[] idempotentCommandPutKey1 = idempotentCommandPutKey(commandId1);
+ byte[] updateResult1 = KeyValueStorage.INVOKE_RESULT_TRUE_BYTES;
+ assertThat(checksum1, is(checksum(
+ longToBytes(0), // prev checksum
+ bytes(5), // SINGLE_INVOKE
+ intToBytes(updateResult1.length), updateResult1, // successful
branch
+ intToBytes(2), // op count (as there is also a system command)
+ bytes(1), // PUT
+ intToBytes(key.length), key,
+ intToBytes(val.length), val,
+ bytes(1), // PUT
+ intToBytes(idempotentCommandPutKey1.length),
idempotentCommandPutKey1,
+ intToBytes(updateResult1.length), updateResult1
+ )));
+
+ // Repeating the same command, but it now executes another branch.
+ CommandId commandId2 = commandIdGenerator.newId();
+ invokeOnMs(condition, successfulBranch, failureBranch, commandId2);
+
+ long checksum2 = storage.checksum(2);
+
+ byte[] idempotentCommandPutKey2 = idempotentCommandPutKey(commandId2);
+ byte[] updateResult2 = KeyValueStorage.INVOKE_RESULT_FALSE_BYTES;
+ assertThat(checksum2, is(checksum(
+ longToBytes(checksum1),
+ bytes(5), // SINGLE_INVOKE
+ intToBytes(updateResult2.length), updateResult2, // failure
branch
+ intToBytes(2), // op count (as there is also a system command)
+ bytes(3), // REMOVE
+ intToBytes(key.length), key,
+ bytes(1), // PUT
+ intToBytes(idempotentCommandPutKey2.length),
idempotentCommandPutKey2,
+ intToBytes(updateResult2.length), updateResult2
+ )));
+ }
+
+ private static byte[] idempotentCommandPutKey(CommandId commandId) {
+ return new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString()).bytes();
+ }
+
+ @Test
+ public void multiInvokeChecksum() {
+ byte[] key = key(1);
+ byte[] val = keyValue(1, 1);
+ CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() ->
new UUID(1, 2));
+
+ If iif = new If(
+ new ExistenceCondition(NOT_EXISTS, key),
+ new Statement(ops(put(new ByteArray(key), val)).yield(1)),
+ new Statement(ops(remove(new ByteArray(key))).yield(2))
+ );
+
+ CommandId commandId1 = commandIdGenerator.newId();
+ invokeOnMs(iif, commandId1);
+
+ long checksum1 = storage.checksum(1);
+
+ byte[] idempotentCommandPutKey1 = idempotentCommandPutKey(commandId1);
+ byte[] updateResult1 = intToBytes(1);
+ assertThat(checksum1, is(checksum(
+ longToBytes(0), // prev checksum
+ bytes(6), // MULTI_INVOKE
+ intToBytes(updateResult1.length), updateResult1, // successful
branch
+ intToBytes(2), // op count (as there is also a system command)
+ bytes(1), // PUT
+ intToBytes(key.length), key,
+ intToBytes(val.length), val,
+ bytes(1), // PUT
+ intToBytes(idempotentCommandPutKey1.length),
idempotentCommandPutKey1,
+ intToBytes(updateResult1.length), updateResult1
+ )));
+
+ // Repeating the same command, but it now executes another branch.
+ CommandId commandId2 = commandIdGenerator.newId();
+ invokeOnMs(iif, commandId2);
+
+ long checksum2 = storage.checksum(2);
+
+ byte[] idempotentCommandPutKey2 = idempotentCommandPutKey(commandId2);
+ byte[] updateResult2 = intToBytes(2);
+ assertThat(checksum2, is(checksum(
+ longToBytes(checksum1),
+ bytes(6), // MULTI_INVOKE
+ intToBytes(updateResult2.length), updateResult2, // failure
branch
+ intToBytes(2), // op count (as there is also a system command)
+ bytes(3), // REMOVE
+ intToBytes(key.length), key,
+ bytes(1), // PUT
+ intToBytes(idempotentCommandPutKey2.length),
idempotentCommandPutKey2,
+ intToBytes(updateResult2.length), updateResult2
+ )));
+ }
+
+ private static byte[] bytes(int... ints) {
+ byte[] bytes = new byte[ints.length];
+ for (int i = 0; i < ints.length; i++) {
+ //noinspection NumericCastThatLosesPrecision
+ bytes[i] = (byte) ints[i];
+ }
+ return bytes;
+ }
+
+ private static long checksum(byte[]... arrays) {
+ Checksum checksum = new CRC64();
+
+ for (byte[] array : arrays) {
+ checksum.update(array);
+ }
+
+ return checksum.getValue();
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 6761656c5c..f27b4358e4 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -25,4 +25,9 @@ class SimpleInMemoryKeyValueStorageTest extends
BasicOperationsKeyValueStorageTe
public KeyValueStorage createStorage() {
return new SimpleInMemoryKeyValueStorage("test");
}
+
+ @Override
+ protected boolean supportsChecksums() {
+ return false;
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRocksDbKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRocksDbKeyValueStorageTest.java
index 49540a170a..1bec5565ab 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRocksDbKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRocksDbKeyValueStorageTest.java
@@ -37,6 +37,11 @@ public class TestRocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStora
return testRocksDbKeyValueStorage;
}
+ @Override
+ protected boolean supportsChecksums() {
+ return true;
+ }
+
@Test
void testRestoreAfterRestart() throws Exception {
byte[] key = key(1);
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index eb0df212ce..4068ab8531 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -284,8 +284,8 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
@Override
public boolean invoke(
Condition condition,
- Collection<Operation> success,
- Collection<Operation> failure,
+ List<Operation> success,
+ List<Operation> failure,
HybridTimestamp opTs,
CommandId commandId
) {
@@ -294,7 +294,7 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
boolean branch = condition.test(e.toArray(new Entry[]{}));
- Collection<Operation> ops = branch ? new ArrayList<>(success) :
new ArrayList<>(failure);
+ List<Operation> ops = branch ? new ArrayList<>(success) : new
ArrayList<>(failure);
// In case of in-memory storage, there's no sense in "persisting"
invoke result, however same persistent source operations
// were added in order to have matching revisions count through
all storages.
@@ -343,7 +343,7 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
if (branch.isTerminal()) {
long curRev = rev + 1;
- Collection<Operation> ops = new
ArrayList<>(branch.update().operations());
+ List<Operation> ops = new
ArrayList<>(branch.update().operations());
// In case of in-memory storage, there's no sense in
"persisting" invoke result, however same persistent source
// operations were added in order to have matching
revisions count through all storages.
@@ -974,6 +974,11 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
}
+ @Override
+ public long checksum(long revision) {
+ throw new UnsupportedOperationException();
+ }
+
private static long[] toLongArray(@Nullable List<Long> list) {
if (list == null) {
return LONG_EMPTY_ARRAY;
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 4f2200afd8..4e00dffe24 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -280,7 +280,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
);
/**
- * Interceptor of {@link MetaStorageManager#invoke(Condition, Collection,
Collection)}.
+ * Interceptor of {@link MetaStorageManager#invoke(Condition, List, List)}.
*/
private final Map<Integer, InvokeInterceptor>
metaStorageInvokeInterceptorByNode = new ConcurrentHashMap<>();
@@ -1114,8 +1114,8 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
@Override
public CompletableFuture<Boolean> invoke(
Condition condition,
- Collection<Operation> success,
- Collection<Operation> failure
+ List<Operation> success,
+ List<Operation> failure
) {
InvokeInterceptor invokeInterceptor =
metaStorageInvokeInterceptorByNode.get(idx);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 10490f5f08..b7c7ce04cd 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -298,7 +298,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
private ReplicationConfiguration replicationConfiguration;
/**
- * Interceptor of {@link MetaStorageManager#invoke(Condition, Collection,
Collection)}.
+ * Interceptor of {@link MetaStorageManager#invoke(Condition, List, List)}.
*/
private final Map<Integer, InvokeInterceptor>
metaStorageInvokeInterceptorByNode = new ConcurrentHashMap<>();
@@ -502,7 +502,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
msRaftConfigurer
) {
@Override
- public CompletableFuture<Boolean> invoke(Condition condition,
Collection<Operation> success, Collection<Operation> failure) {
+ public CompletableFuture<Boolean> invoke(Condition condition,
List<Operation> success, List<Operation> failure) {
if (metaStorageInvokeInterceptor != null) {
var res = metaStorageInvokeInterceptor.invoke(condition,
success, failure);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index c2f18f6271..bdee06027f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -23,11 +23,11 @@ import static
org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -242,7 +242,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
return falseCompletedFuture();
}
- Set<Operation> operations = new HashSet<>();
+ var operations = new ArrayList<Operation>();
for (Map.Entry<String, ? extends Serializable> entry :
newValues.entrySet()) {
ByteArray key = new ByteArray(DISTRIBUTED_PREFIX + entry.getKey());
@@ -263,7 +263,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
? notExists(MASTER_KEY)
: revision(MASTER_KEY).eq(curChangeId);
- return metaStorageMgr.invoke(condition, operations,
Set.of(Operations.noop()));
+ return metaStorageMgr.invoke(condition, operations,
List.of(Operations.noop()));
}
@Override
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
index a1463c5ed7..b093fa2db9 100644
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
@@ -23,11 +23,11 @@ import static
org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -83,10 +83,10 @@ public class DistributedConfigurationStorageTest extends
ConfigurationStorageTes
private MetaStorageManager mockMetaStorageManager() {
var mock = mock(MetaStorageManager.class);
- when(mock.invoke(any(), anyCollection(),
anyCollection())).thenAnswer(invocation -> {
+ when(mock.invoke(any(), anyList(), anyList())).thenAnswer(invocation
-> {
SimpleCondition condition = invocation.getArgument(0);
- Collection<Operation> success = invocation.getArgument(1);
- Collection<Operation> failure = invocation.getArgument(2);
+ List<Operation> success = invocation.getArgument(1);
+ List<Operation> failure = invocation.getArgument(2);
boolean invokeResult = metaStorage.invoke(
toServerCondition(condition),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index c66bafcea2..f73f488c24 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -51,7 +51,6 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -448,7 +447,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
if (andThen instanceof UpdateStatement) {
UpdateStatement updateStatement = (UpdateStatement)
andThen;
- Collection<Operation> operations =
updateStatement.update().operations();
+ List<Operation> operations =
updateStatement.update().operations();
ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(new TablePartitionId(tableId, partId));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index cf07489a3b..58c3e53e18 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -46,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doReturn;
@@ -407,7 +408,7 @@ public class TableManagerTest extends IgniteAbstractTest {
CompletableFuture<Boolean> invokeTimeoutFuture = new
CompletableFuture<>();
var innerExceptionMsg = "Inner future is interrupted";
invokeTimeoutFuture.completeExceptionally(new
TimeoutException(innerExceptionMsg));
- when(msm.invoke(any(), any(List.class),
any(List.class))).thenReturn(invokeTimeoutFuture);
+ when(msm.invoke(any(), anyList(),
anyList())).thenReturn(invokeTimeoutFuture);
writtenAssignmentsFuture =
tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
assertThrowsWithCause(writtenAssignmentsFuture::get,
TimeoutException.class, innerExceptionMsg);
@@ -717,7 +718,7 @@ public class TableManagerTest extends IgniteAbstractTest {
});
when(msm.invoke(any(), any(Operation.class),
any(Operation.class))).thenReturn(trueCompletedFuture());
- when(msm.invoke(any(), any(List.class),
any(List.class))).thenReturn(trueCompletedFuture());
+ when(msm.invoke(any(), anyList(),
anyList())).thenReturn(trueCompletedFuture());
when(msm.get(any())).thenReturn(nullCompletedFuture());
when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(2L));