This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d36d92b795 IGNITE-23409 Merge implementations of KeyValueStorage 
(#4546)
d36d92b795 is described below

commit d36d92b795bc429edfc67f7d6969c3f5651eba83
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Oct 15 09:46:11 2024 +0300

    IGNITE-23409 Merge implementations of KeyValueStorage (#4546)
---
 .../impl/ItMetaStorageMaintenanceTest.java         |   3 +-
 .../server/AbstractKeyValueStorage.java            | 368 ++++++++++++++
 .../server/persistence/RocksDbKeyValueStorage.java | 373 ++-------------
 .../AbstractCompactionKeyValueStorageTest.java     | 106 +++++
 .../server/SimpleInMemoryKeyValueStorage.java      | 527 +++++++--------------
 .../internal/test/WatchListenerInhibitor.java      |   9 +-
 6 files changed, 679 insertions(+), 707 deletions(-)

diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
index 6705f543e1..b4e45c0ec1 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.WatchProcessor;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -185,7 +186,7 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
 
         // TODO: IGNITE-15723 After a component factory is implemented, need 
to get rid of reflection here.
         var storage = (SimpleInMemoryKeyValueStorage) 
getFieldValue(node.metaStorageManager, MetaStorageManagerImpl.class, "storage");
-        var watchProcessor = (WatchProcessor) getFieldValue(storage, 
SimpleInMemoryKeyValueStorage.class, "watchProcessor");
+        var watchProcessor = (WatchProcessor) getFieldValue(storage, 
AbstractKeyValueStorage.class, "watchProcessor");
 
         CompletableFuture<Void> notificationFuture = 
getFieldValue(watchProcessor, WatchProcessor.class, "notificationFuture");
         if (notificationFuture != null) {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
new file mode 100644
index 0000000000..e694a9249a
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongConsumer;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/** Abstract implementation of {@link KeyValueStorage}. */
+public abstract class AbstractKeyValueStorage implements KeyValueStorage {
+    protected static final Comparator<byte[]> KEY_COMPARATOR = 
Arrays::compareUnsigned;
+
+    protected final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    protected final WatchProcessor watchProcessor;
+
+    /**
+     * Revision listener for recovery only. Notifies {@link 
MetaStorageManagerImpl} of revision update.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
+    private @Nullable LongConsumer recoveryRevisionListener;
+
+    /**
+     * Revision. Will be incremented for each single-entry or multi-entry 
update operation.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
+    protected long rev;
+
+    /**
+     * Last compaction revision that was set or restored from a snapshot.
+     *
+     * <p>This field is used by metastorage read methods to determine whether 
{@link CompactedException} should be thrown.</p>
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
+    protected long compactionRevision = -1;
+
+    protected final AtomicBoolean stopCompaction = new AtomicBoolean();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param failureManager Failure processor that is used to handle critical 
errors.
+     */
+    protected AbstractKeyValueStorage(String nodeName, FailureManager 
failureManager) {
+        this.watchProcessor = new WatchProcessor(nodeName, this::get, 
failureManager);
+    }
+
+    /** Returns the key revisions for operation, an empty array if not found. 
*/
+    protected abstract long[] keyRevisionsForOperation(byte[] key);
+
+    /** Returns key values by revision for operation. */
+    protected abstract Value valueForOperation(byte[] key, long revision);
+
+    @Override
+    public Entry get(byte[] key) {
+        rwLock.readLock().lock();
+
+        try {
+            return doGet(key, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public Entry get(byte[] key, long revUpperBound) {
+        rwLock.readLock().lock();
+
+        try {
+            return doGet(key, revUpperBound);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound) 
{
+        rwLock.readLock().lock();
+
+        try {
+            return doGet(key, revLowerBound, revUpperBound);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public List<Entry> getAll(List<byte[]> keys) {
+        rwLock.readLock().lock();
+
+        try {
+            return doGetAll(keys, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+        rwLock.readLock().lock();
+
+        try {
+            return doGetAll(keys, revUpperBound);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
+        rwLock.readLock().lock();
+
+        try {
+            return range(keyFrom, keyTo, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public long revision() {
+        rwLock.readLock().lock();
+
+        try {
+            return rev;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void setCompactionRevision(long revision) {
+        assert revision >= 0 : revision;
+
+        rwLock.writeLock().lock();
+
+        try {
+            assertCompactionRevisionLessThanCurrent(revision, rev);
+
+            compactionRevision = revision;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long getCompactionRevision() {
+        rwLock.readLock().lock();
+
+        try {
+            return compactionRevision;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void stopCompaction() {
+        stopCompaction.set(true);
+    }
+
+    @Override
+    public byte @Nullable [] nextKey(byte[] key) {
+        return incrementPrefix(key);
+    }
+
+    @Override
+    public void registerRevisionUpdateListener(RevisionUpdateListener 
listener) {
+        watchProcessor.registerRevisionUpdateListener(listener);
+    }
+
+    @Override
+    public void unregisterRevisionUpdateListener(RevisionUpdateListener 
listener) {
+        watchProcessor.unregisterRevisionUpdateListener(listener);
+    }
+
+    @Override
+    public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long 
newRevision) {
+        return watchProcessor.notifyUpdateRevisionListeners(newRevision);
+    }
+
+    @Override
+    public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
+        rwLock.writeLock().lock();
+
+        try {
+            this.recoveryRevisionListener = listener;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void removeWatch(WatchListener listener) {
+        watchProcessor.removeWatch(listener);
+    }
+
+    @Override
+    public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, 
WatchListener listener) {
+        assert rev > 0 : rev;
+
+        Predicate<byte[]> rangePredicate = keyTo == null
+                ? k -> KEY_COMPARATOR.compare(keyFrom, k) <= 0
+                : k -> KEY_COMPARATOR.compare(keyFrom, k) <= 0 && 
KEY_COMPARATOR.compare(keyTo, k) > 0;
+
+        watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
+    }
+
+    @Override
+    public void watchExact(Collection<byte[]> keys, long rev, WatchListener 
listener) {
+        assert rev > 0 : rev;
+        assert !keys.isEmpty();
+
+        TreeSet<byte[]> keySet = new TreeSet<>(KEY_COMPARATOR);
+
+        keySet.addAll(keys);
+
+        Predicate<byte[]> inPredicate = keySet::contains;
+
+        watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
+    }
+
+    @Override
+    public void watchExact(byte[] key, long rev, WatchListener listener) {
+        assert rev > 0 : rev;
+
+        Predicate<byte[]> exactPredicate = k -> KEY_COMPARATOR.compare(k, key) 
== 0;
+
+        watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
+    }
+
+    /** Notifies of revision update. Must be called under the {@link #rwLock}. 
*/
+    protected void notifyRevisionUpdate() {
+        if (recoveryRevisionListener != null) {
+            // Listener must be invoked only on recovery, after recovery 
listener must be null.
+            recoveryRevisionListener.accept(rev);
+        }
+    }
+
+    protected Entry doGet(byte[] key, long revUpperBound) {
+        assert revUpperBound >= 0 : revUpperBound;
+
+        long[] keyRevisions = keyRevisionsForOperation(key);
+        int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
+
+        if (maxRevisionIndex == NOT_FOUND) {
+            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
 compactionRevision);
+
+            return EntryImpl.empty(key);
+        }
+
+        long revision = keyRevisions[maxRevisionIndex];
+
+        Value value = valueForOperation(key, revision);
+
+        if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions, 
maxRevisionIndex) || value.tombstone())) {
+            throw new CompactedException(revUpperBound, compactionRevision);
+        }
+
+        return EntryImpl.toEntry(key, revision, value);
+    }
+
+    private List<Entry> doGet(byte[] key, long revLowerBound, long 
revUpperBound) {
+        assert revLowerBound >= 0 : revLowerBound;
+        assert revUpperBound >= 0 : revUpperBound;
+        assert revUpperBound >= revLowerBound : "revLowerBound=" + 
revLowerBound + ", revUpperBound=" + revUpperBound;
+
+        long[] keyRevisions = keyRevisionsForOperation(key);
+
+        int minRevisionIndex = minRevisionIndex(keyRevisions, revLowerBound);
+        int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
+
+        if (minRevisionIndex == NOT_FOUND || maxRevisionIndex == NOT_FOUND) {
+            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
 compactionRevision);
+
+            return List.of();
+        }
+
+        var entries = new ArrayList<Entry>();
+
+        for (int i = minRevisionIndex; i <= maxRevisionIndex; i++) {
+            long revision = keyRevisions[i];
+
+            Value value;
+
+            // More complex check to read less from disk.
+            // Optimization for persistent storage.
+            if (revision <= compactionRevision) {
+                if (!isLastIndex(keyRevisions, i)) {
+                    continue;
+                }
+
+                value = valueForOperation(key, revision);
+
+                if (value.tombstone()) {
+                    continue;
+                }
+            } else {
+                value = valueForOperation(key, revision);
+            }
+
+            entries.add(EntryImpl.toEntry(key, revision, value));
+        }
+
+        if (entries.isEmpty()) {
+            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
 compactionRevision);
+        }
+
+        return entries;
+    }
+
+    private List<Entry> doGetAll(List<byte[]> keys, long revUpperBound) {
+        assert !keys.isEmpty();
+        assert revUpperBound >= 0 : revUpperBound;
+
+        var res = new ArrayList<Entry>(keys.size());
+
+        for (byte[] key : keys) {
+            res.add(doGet(key, revUpperBound));
+        }
+
+        return res;
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index bf743489de..b1c60c9f50 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -23,9 +23,7 @@ import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
-import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
-import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
@@ -45,7 +43,6 @@ import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageC
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
 import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
-import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
@@ -62,23 +59,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.LongConsumer;
-import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -86,8 +75,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
-import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.OperationType;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
@@ -96,7 +83,7 @@ import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
-import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.Condition;
 import org.apache.ignite.internal.metastorage.server.If;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -104,8 +91,6 @@ import 
org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
 import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
 import org.apache.ignite.internal.metastorage.server.Statement;
 import org.apache.ignite.internal.metastorage.server.Value;
-import org.apache.ignite.internal.metastorage.server.Watch;
-import org.apache.ignite.internal.metastorage.server.WatchProcessor;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -145,7 +130,7 @@ import org.rocksdb.WriteOptions;
  * The mapping from the key to the set of the storage's revisions is stored in 
the "index" column family. A key represents the key of an
  * entry and the value is a {@code byte[]} that represents a {@code long[]} 
where every item is a revision of the storage.
  */
-public class RocksDbKeyValueStorage implements KeyValueStorage {
+public class RocksDbKeyValueStorage extends AbstractKeyValueStorage {
     private static final IgniteLogger LOG = 
Loggers.forClass(RocksDbKeyValueStorage.class);
 
     /** A revision to store with system entries. */
@@ -163,9 +148,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
             "SYSTEM_COMPACTION_REVISION_KEY".getBytes(UTF_8)
     );
 
-    /** Lexicographic order comparator. */
-    private static final Comparator<byte[]> CMP = Arrays::compareUnsigned;
-
     /** Batch size (number of keys) for storage compaction. The value is 
arbitrary. */
     private static final int COMPACT_BATCH_SIZE = 10;
 
@@ -173,9 +155,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         RocksDB.loadLibrary();
     }
 
-    /** RW lock. */
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
     /** Thread-pool for snapshot operations execution. */
     private final ExecutorService snapshotExecutor;
 
@@ -206,19 +185,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     /** Snapshot manager. */
     private volatile RocksSnapshotManager snapshotManager;
 
-    /**
-     * Revision listener for recovery only. Notifies {@link 
MetaStorageManagerImpl} of revision update.
-     * Guarded by {@link #rwLock}.
-     */
-    private @Nullable LongConsumer recoveryRevisionListener;
-
-    /**
-     * Revision. Will be incremented for each single-entry or multi-entry 
update operation.
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
-     */
-    private long rev;
-
     /**
      * Facility to work with checksums.
      *
@@ -226,18 +192,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
      */
     private MetastorageChecksum checksum;
 
-    /**
-     * Last compaction revision that was set or restored from a snapshot.
-     *
-     * <p>This field is used by metastorage read methods to determine whether 
{@link CompactedException} should be thrown.</p>
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
-     */
-    private long compactionRevision = -1;
-
-    /** Watch processor. */
-    private final WatchProcessor watchProcessor;
-
     /** Status of the watch recovery process. */
     private enum RecoveryStatus {
         INITIAL,
@@ -255,7 +209,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
      * Buffer used to cache new events while an event replay is in progress. 
After replay finishes, the cache gets drained and is never
      * used again.
      *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
     @Nullable
     private List<UpdatedEntries> eventCache;
@@ -263,7 +217,9 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     /**
      * Current list of updated entries.
      *
-     * <p>Since this list gets read and updated only on writes (under a write 
lock), no extra synchronisation is needed.
+     * <p>Since this list gets read and updated only on writes (under a write 
lock), no extra synchronisation is needed.</p>
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
     private final UpdatedEntries updatedEntries = new UpdatedEntries();
 
@@ -273,18 +229,17 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     /** Metastorage recovery is based on the snapshot & external log. WAL is 
never used for recovery, and can be safely disabled. */
     private final WriteOptions defaultWriteOptions = new 
WriteOptions().setDisableWAL(true);
 
-    private final AtomicBoolean stopCompaction = new AtomicBoolean();
-
     /**
      * Constructor.
      *
      * @param nodeName Node name.
      * @param dbPath RocksDB path.
+     * @param failureManager Failure processor that is used to handle critical 
errors.
      */
     public RocksDbKeyValueStorage(String nodeName, Path dbPath, FailureManager 
failureManager) {
-        this.dbPath = dbPath;
+        super(nodeName, failureManager);
 
-        this.watchProcessor = new WatchProcessor(nodeName, this::get, 
failureManager);
+        this.dbPath = dbPath;
 
         this.snapshotExecutor = Executors.newFixedThreadPool(2, 
NamedThreadFactory.create(nodeName, "metastorage-snapshot-executor", LOG));
     }
@@ -412,17 +367,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         return bytesToLong(bytes);
     }
 
-    /**
-     * Notifies of revision update.
-     * Must be called under the {@link #rwLock}.
-     */
-    private void notifyRevisionUpdate() {
-        if (recoveryRevisionListener != null) {
-            // Listener must be invoked only on recovery, after recovery 
listener must be null.
-            recoveryRevisionListener.accept(rev);
-        }
-    }
-
     /**
      * Clear the RocksDB instance.
      *
@@ -459,7 +403,13 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
     @Override
     public CompletableFuture<Void> snapshot(Path snapshotPath) {
-        return snapshotManager.createSnapshot(snapshotPath);
+        rwLock.writeLock().lock();
+
+        try {
+            return snapshotManager.createSnapshot(snapshotPath);
+        } finally {
+            rwLock.writeLock().unlock();
+        }
     }
 
     @Override
@@ -492,17 +442,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public long revision() {
-        rwLock.readLock().lock();
-
-        try {
-            return rev;
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
     @Override
     public void put(byte[] key, byte[] value, HybridTimestamp opTs) {
         rwLock.writeLock().lock();
@@ -599,12 +538,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         return longToBytes(ts.longValue());
     }
 
-    private static Entry entry(byte[] key, long revision, Value value) {
-        return value.tombstone()
-                ? EntryImpl.tombstone(key, revision, 
value.operationTimestamp())
-                : new EntryImpl(key, value.bytes(), revision, 
value.operationTimestamp());
-    }
-
     @Override
     public void putAll(List<byte[]> keys, List<byte[]> values, HybridTimestamp 
opTs) {
         rwLock.writeLock().lock();
@@ -628,61 +561,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public Entry get(byte[] key) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGet(key, rev);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public Entry get(byte[] key, long revUpperBound) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGet(key, revUpperBound);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound) 
{
-        rwLock.readLock().lock();
-
-        try {
-            return doGet(key, revLowerBound, revUpperBound);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public List<Entry> getAll(List<byte[]> keys) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGetAll(keys, rev);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGetAll(keys, revUpperBound);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
     @Override
     public void remove(byte[] key, HybridTimestamp opTs) {
         rwLock.writeLock().lock();
@@ -884,45 +762,9 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, 
WatchListener listener) {
-        assert keyFrom != null : "keyFrom couldn't be null.";
-        assert rev > 0 : "rev must be positive.";
-
-        Predicate<byte[]> rangePredicate = keyTo == null
-                ? k -> CMP.compare(keyFrom, k) <= 0
-                : k -> CMP.compare(keyFrom, k) <= 0 && CMP.compare(keyTo, k) > 
0;
-
-        watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
-    }
-
-    @Override
-    public void watchExact(byte[] key, long rev, WatchListener listener) {
-        assert key != null : "key couldn't be null.";
-        assert rev > 0 : "rev must be positive.";
-
-        Predicate<byte[]> exactPredicate = k -> CMP.compare(k, key) == 0;
-
-        watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
-    }
-
-    @Override
-    public void watchExact(Collection<byte[]> keys, long rev, WatchListener 
listener) {
-        assert keys != null && !keys.isEmpty() : "keys couldn't be null or 
empty: " + keys;
-        assert rev > 0 : "rev must be positive.";
-
-        TreeSet<byte[]> keySet = new TreeSet<>(CMP);
-
-        keySet.addAll(keys);
-
-        Predicate<byte[]> inPredicate = keySet::contains;
-
-        watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
-    }
-
     @Override
     public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
-        assert startRevision != 0 : "First meaningful revision is 1";
+        assert startRevision > 0 : startRevision;
 
         long currentRevision;
 
@@ -950,11 +792,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public void removeWatch(WatchListener listener) {
-        watchProcessor.removeWatch(listener);
-    }
-
     @Override
     public void compact(long revision) {
         assert revision >= 0 : revision;
@@ -968,16 +805,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public void stopCompaction() {
-        stopCompaction.set(true);
-    }
-
-    @Override
-    public byte @Nullable [] nextKey(byte[] key) {
-        return incrementPrefix(key);
-    }
-
     /**
      * Adds a key to a batch marking the value as a tombstone.
      *
@@ -1044,90 +871,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    private List<Entry> doGetAll(Collection<byte[]> keys, long revUpperBound) {
-        assert !keys.isEmpty();
-        assert revUpperBound >= 0 : revUpperBound;
-
-        var res = new ArrayList<Entry>(keys.size());
-
-        for (byte[] key : keys) {
-            res.add(doGet(key, revUpperBound));
-        }
-
-        return res;
-    }
-
-    private Entry doGet(byte[] key, long revUpperBound) {
-        assert revUpperBound >= 0 : revUpperBound;
-
-        long[] keyRevisions = getRevisionsForOperation(key);
-        int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
-        if (maxRevisionIndex == NOT_FOUND) {
-            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
 compactionRevision);
-
-            return EntryImpl.empty(key);
-        }
-
-        long revision = keyRevisions[maxRevisionIndex];
-
-        Value value = getValueForOperation(key, revision);
-
-        if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions, 
maxRevisionIndex) || value.tombstone())) {
-            throw new CompactedException(revUpperBound, compactionRevision);
-        }
-
-        return EntryImpl.toEntry(key, revision, value);
-    }
-
-    private List<Entry> doGet(byte[] key, long revLowerBound, long 
revUpperBound) {
-        assert revLowerBound >= 0 : revLowerBound;
-        assert revUpperBound >= 0 : revUpperBound;
-        assert revUpperBound >= revLowerBound : "revLowerBound=" + 
revLowerBound + ", revUpperBound=" + revUpperBound;
-
-        long[] keyRevisions = getRevisionsForOperation(key);
-
-        int minRevisionIndex = minRevisionIndex(keyRevisions, revLowerBound);
-        int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
-        if (minRevisionIndex == NOT_FOUND || maxRevisionIndex == NOT_FOUND) {
-            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
 compactionRevision);
-
-            return List.of();
-        }
-
-        var entries = new ArrayList<Entry>();
-
-        for (int i = minRevisionIndex; i <= maxRevisionIndex; i++) {
-            long revision = keyRevisions[i];
-
-            Value value;
-
-            // More complex check to read less from disk.
-            if (revision <= compactionRevision) {
-                if (!isLastIndex(keyRevisions, i)) {
-                    continue;
-                }
-
-                value = getValueForOperation(key, revision);
-
-                if (value.tombstone()) {
-                    continue;
-                }
-            } else {
-                value = getValueForOperation(key, revision);
-            }
-
-            entries.add(EntryImpl.toEntry(key, revision, value));
-        }
-
-        if (entries.isEmpty()) {
-            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
 compactionRevision);
-        }
-
-        return entries;
-    }
-
     /**
      * Returns array of revisions of the entry corresponding to the key.
      *
@@ -1144,20 +887,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         return getAsLongs(revisions);
     }
 
-    /**
-     * Returns array of revisions of the entry corresponding to the key.
-     *
-     * @param key Key.
-     * @throws MetaStorageException If there was an error while getting the 
revisions for the key.
-     */
-    private long[] getRevisionsForOperation(byte[] key) {
-        try {
-            return getRevisions(key);
-        } catch (RocksDBException e) {
-            throw new MetaStorageException(OP_EXECUTION_ERR, "Failed to get 
revisions for the key: " + toUtf8String(key), e);
-        }
-    }
-
     /**
      * Adds an entry to the batch.
      *
@@ -1181,7 +910,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         data.put(batch, rocksKey, rocksValue);
 
-        updatedEntries.add(entry(key, curRev, new Value(value, opTs)));
+        updatedEntries.add(EntryImpl.toEntry(key, curRev, new Value(value, 
opTs)));
     }
 
     /**
@@ -1297,7 +1026,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                     ts = hybridTimestamp(timestampFromRocksValue(rocksValue));
                 }
 
-                updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, 
bytesToValue(rocksValue)));
+                
updatedEntries.add(EntryImpl.toEntry(rocksKeyToBytes(rocksKey), revision, 
bytesToValue(rocksValue)));
             }
 
             try {
@@ -1384,17 +1113,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
-        rwLock.writeLock().lock();
-
-        try {
-            this.recoveryRevisionListener = listener;
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
     @TestOnly
     public Path getDbPath() {
         return dbPath;
@@ -1440,21 +1158,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public void registerRevisionUpdateListener(RevisionUpdateListener 
listener) {
-        watchProcessor.registerRevisionUpdateListener(listener);
-    }
-
-    @Override
-    public void unregisterRevisionUpdateListener(RevisionUpdateListener 
listener) {
-        watchProcessor.unregisterRevisionUpdateListener(listener);
-    }
-
-    @Override
-    public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long 
newRevision) {
-        return watchProcessor.notifyUpdateRevisionListeners(newRevision);
-    }
-
     @Override
     public void advanceSafeTime(HybridTimestamp newSafeTime) {
         rwLock.writeLock().lock();
@@ -1487,32 +1190,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public void setCompactionRevision(long revision) {
-        assert revision >= 0 : revision;
-
-        rwLock.writeLock().lock();
-
-        try {
-            assertCompactionRevisionLessThanCurrent(revision, rev);
-
-            compactionRevision = revision;
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
-    @Override
-    public long getCompactionRevision() {
-        rwLock.readLock().lock();
-
-        try {
-            return compactionRevision;
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
     @Override
     public long checksum(long revision) {
         rwLock.readLock().lock();
@@ -1621,7 +1298,17 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    private Value getValueForOperation(byte[] key, long revision) {
+    @Override
+    protected long[] keyRevisionsForOperation(byte[] key) {
+        try {
+            return getRevisions(key);
+        } catch (RocksDBException e) {
+            throw new MetaStorageException(OP_EXECUTION_ERR, "Failed to get 
revisions for the key: " + toUtf8String(key), e);
+        }
+    }
+
+    @Override
+    protected Value valueForOperation(byte[] key, long revision) {
         Value value = getValueForOperationNullable(key, revision);
 
         assert value != null : "key=" + toUtf8String(key) + ", revision=" + 
revision;
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 956382ca5b..805e0bab04 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -108,6 +108,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(4, 6/* Tombstone */), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#compact(long)} for a specific single 
revision, to simplify testing, see examples in the method
+     * description. Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactRevision1() {
         storage.compact(1);
@@ -117,6 +121,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#compact(long)} for a specific single 
revision, to simplify testing, see examples in the method
+     * description. Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactRevision2() {
         storage.compact(2);
@@ -126,6 +134,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#compact(long)} for a specific single 
revision, to simplify testing, see examples in the method
+     * description. Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactRevision3() {
         storage.compact(3);
@@ -135,6 +147,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#compact(long)} for a specific single 
revision, to simplify testing, see examples in the method
+     * description. Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactRevision4() {
         storage.compact(4);
@@ -144,6 +160,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(6), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#compact(long)} for a specific single 
revision, to simplify testing, see examples in the method
+     * description. Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactRevision5() {
         storage.compact(5);
@@ -153,6 +173,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(6), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#compact(long)} for a specific single 
revision, to simplify testing, see examples in the method
+     * description. Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactRevision6() {
         storage.compact(6);
@@ -162,6 +186,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#compact(long)} as if it were called for 
each revision sequentially, see examples in the method
+     * description. Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactRevisionSequentially() {
         testCompactRevision1();
@@ -172,6 +200,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         testCompactRevision6();
     }
 
+    /**
+     * Tests that after the storage is recovered, compacted keys will not be 
returned. Keys with their revisions are added in
+     * {@link #setUp()}.
+     */
     @Test
     void testRevisionsAfterRestart() {
         storage.compact(6);
@@ -187,6 +219,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(), collectRevisions(SOME_KEY));
     }
 
+    /**
+     * Tests stopping the compaction. Since it is impossible to predict what 
the result will be if you stop somewhere in the middle of the
+     * compaction, it is easiest to stop before the compaction starts. Keys 
with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testCompactBeforeStopIt() {
         storage.stopCompaction();
@@ -281,6 +317,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(-1, storage.getCompactionRevision());
     }
 
+    /**
+     * Tests {@link Entry#timestamp()} for a key that will be fully removed 
from storage after compaction. This case would be suitable for
+     * the {@link #BAR_KEY}, since its last revision is a tombstone. Keys with 
their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testEntryOperationTimestampAfterCompaction() {
         storage.compact(6);
@@ -340,6 +380,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertThrows(CompactedException.class, () -> 
storage.revisionByTimestamp(timestamp3.subtractPhysicalTime(1)));
     }
 
+    /**
+     * Tests that {@link KeyValueStorage#get(byte[])} will not throw the 
{@link CompactedException} for all keys after compacting to the
+     * penultimate repository revision. Keys with their revisions are added in 
{@link #setUp()}.
+     */
     @Test
     void testGetSingleEntryLatestAndCompaction() {
         storage.setCompactionRevision(6);
@@ -349,6 +393,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrow(() -> storage.get(NOT_EXISTS_KEY));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#get(byte[], long)} using examples from the 
description only for the {@link #FOO_KEY} for which the last
+     * revision is <b>not</b> tombstone. Only one key is considered so that 
the tests are not too long. Keys with their revisions are
+     * added in {@link #setUp()}.
+     */
     @Test
     void testGetSingleEntryAndCompactionForFooKey() {
         // FOO_KEY has revisions: [1, 3, 5].
@@ -377,6 +426,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetSingleValue(FOO_KEY, 5);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#get(byte[], long)} using examples from the 
description only for the {@link #BAR_KEY} for which the last
+     * revision is tombstone. Only one key is considered so that the tests are 
not too long. Keys with their revisions are added in
+     * {@link #setUp()}.
+     */
     @Test
     void testGetSingleEntryAndCompactionForBarKey() {
         // BAR_KEY has revisions: [1, 2, 5 (tombstone)].
@@ -405,6 +459,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetSingleValue(BAR_KEY, 7);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#get(byte[], long)} using examples from the 
description only for the {@link #NOT_EXISTS_KEY} for which
+     * was never present in the storage. Only one key is considered so that 
the tests are not too long. Keys with their revisions are added
+     * in {@link #setUp()}.
+     */
     @Test
     void testGetSingleEntryAndCompactionForNotExistsKey() {
         storage.setCompactionRevision(1);
@@ -432,6 +491,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetSingleValue(NOT_EXISTS_KEY, 
7);
     }
 
+    /**
+     * Tests that {@link KeyValueStorage#getAll(List)} will not throw the 
{@link CompactedException} for all keys after compacting to the
+     * penultimate repository revision. Keys with their revisions are added in 
{@link #setUp()}.
+     */
     @Test
     void testGetAllLatestAndCompaction() {
         storage.setCompactionRevision(6);
@@ -439,6 +502,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrow(() -> storage.getAll(List.of(FOO_KEY, BAR_KEY, 
NOT_EXISTS_KEY)));
     }
 
+    /**
+     * Tests {@link KeyValueStorage#getAll(List, long)} using examples from 
the description only for the {@link #FOO_KEY} for which the
+     * last revision is <b>not</b> tombstone. Only one key is considered so 
that the tests are not too long. Keys with their revisions are
+     * added in {@link #setUp()}.
+     */
     @Test
     void testGetAllAndCompactionForFooKey() {
         // FOO_KEY has revisions: [1, 3, 5].
@@ -467,6 +535,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetAll(5, FOO_KEY);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#getAll(List, long)} using examples from 
the description only for the {@link #BAR_KEY} for which the
+     * last revision is tombstone. Only one key is considered so that the 
tests are not too long. Keys with their revisions are added in
+     * {@link #setUp()}.
+     */
     @Test
     void testGetAllAndCompactionForBarKey() {
         // BAR_KEY has revisions: [1, 2, 5 (tombstone)].
@@ -495,6 +568,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetAll(7, BAR_KEY);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#getAll(List, long)} using examples from 
the description only for the {@link #NOT_EXISTS_KEY} for which
+     * was never present in the storage. Only one key is considered so that 
the tests are not too long. Keys with their revisions are added
+     * in {@link #setUp()}.
+     */
     @Test
     void testGetAllAndCompactionForNotExistsKey() {
         storage.setCompactionRevision(1);
@@ -522,6 +600,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetAll(7, NOT_EXISTS_KEY);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#getAll(List, long)} using examples from 
the description for several keys at once; it is enough to
+     * consider only two cases of compaction. Only one key is considered so 
that the tests are not too long. Keys with their revisions are
+     * added in {@link #setUp()}.
+     */
     @Test
     void testGetAllAndCompactionForMultipleKeys() {
         storage.setCompactionRevision(1);
@@ -536,6 +619,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetAll(6, FOO_KEY, BAR_KEY, 
NOT_EXISTS_KEY);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#get(byte[], long, long)} using examples 
from the description only for the {@link #FOO_KEY} for which
+     * the last revision is <b>not</b> tombstone. Only one key is considered 
so that the tests are not too long. Keys with their revisions
+     * are added in {@link #setUp()}.
+     */
     @Test
     void testGetListAndCompactionForFooKey() {
         // FOO_KEY has revisions: [1, 3, 5].
@@ -593,6 +681,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertThrowsCompactedExceptionForGetList(FOO_KEY, 6, 7);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#get(byte[], long, long)} using examples 
from the description only for the {@link #BAR_KEY} for which
+     * the last revision is tombstone. Only one key is considered so that the 
tests are not too long. Keys with their revisions are added
+     * in {@link #setUp()}.
+     */
     @Test
     void testGetListAndCompactionForBarKey() {
         // BAR_KEY has revisions: [1, 2, 5 (tombstone)].
@@ -652,6 +745,11 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowsCompactedExceptionForGetList(BAR_KEY, 7, 7);
     }
 
+    /**
+     * Tests {@link KeyValueStorage#get(byte[], long, long)} using examples 
from the description only for the {@link #NOT_EXISTS_KEY} for
+     * which was never present in the storage. Only one key is considered so 
that the tests are not too long. Keys with their revisions are
+     * added in {@link #setUp()}.
+     */
     @Test
     void testGetListAndCompactionForNotExistsKey() {
         storage.setCompactionRevision(1);
@@ -690,6 +788,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowsCompactedExceptionForGetList(NOT_EXISTS_KEY, 7, 7);
     }
 
+    /**
+     * Tests that {@link KeyValueStorage#range(byte[], byte[])} and cursor 
methods will not throw {@link CompactedException} after
+     * compacting to the penultimate revision. The key is chosen randomly. 
Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testRangeLatestAndCompaction() {
         storage.setCompactionRevision(6);
@@ -702,6 +804,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         });
     }
 
+    /**
+     * Tests {@link KeyValueStorage#range(byte[], byte[], long)} and cursor 
methods as described in the method. The key is chosen randomly.
+     * Keys with their revisions are added in {@link #setUp()}.
+     */
     @Test
     void testRangeAndCompaction() {
         try (Cursor<Entry> cursorBeforeSetCompactionRevision = 
storage.range(FOO_KEY, null, 5)) {
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index fad0672cb5..251ae61005 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -26,13 +26,10 @@ import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
-import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
-import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
 import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
-import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -45,7 +42,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -53,26 +49,19 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.LongConsumer;
-import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
-import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
-import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -81,26 +70,27 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Simple in-memory key/value storage for tests.
  */
-public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
-    /** Lexicographical comparator. */
-    private static final Comparator<byte[]> CMP = Arrays::compareUnsigned;
-
+public class SimpleInMemoryKeyValueStorage extends AbstractKeyValueStorage {
     /**
      * Keys index. Value is the list of all revisions under which entry 
corresponding to the key was modified.
      *
      * <p>Concurrent map to avoid {@link 
java.util.ConcurrentModificationException} on compaction.</p>
      *
-     * <p>Guarded by {@link #mux}.</p>
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
-    private final NavigableMap<byte[], List<Long>> keysIdx = new 
ConcurrentSkipListMap<>(CMP);
+    private final NavigableMap<byte[], List<Long>> keysIdx = new 
ConcurrentSkipListMap<>(KEY_COMPARATOR);
 
-    /** Timestamp to revision mapping. */
+    /**
+     * Timestamp to revision mapping.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
     private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
 
     /**
      * Revision to timestamp mapping.
      *
-     * <p>Guarded by {@link #mux}.</p>
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
     private final Map<Long, HybridTimestamp> revToTsMap = new HashMap<>();
 
@@ -109,52 +99,28 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
      *
      * <p>Concurrent map to avoid {@link 
java.util.ConcurrentModificationException} on compaction.</p>
      *
-     * <p>Guarded by {@link #mux}.</p>
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
     private final NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = 
new ConcurrentSkipListMap<>();
 
-    /**
-     * Revision. Will be incremented for each single-entry or multi-entry 
update operation.
-     *
-     * <p>Multi-threaded access is guarded by {@link #mux}.</p>
-     */
-    private long rev;
-
-    /**
-     * Last compaction revision that was set or restored from a snapshot.
-     *
-     * <p>This field is used by metastorage read methods to determine whether 
{@link CompactedException} should be thrown.</p>
-     *
-     * <p>Multi-threaded access is guarded by {@link #mux}.</p>
-     */
-    private long compactionRevision = -1;
-
     /**
      * Last {@link #saveCompactionRevision saved} compaction revision.
      *
      * <p>Used only when working with snapshots.</p>
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
     private long savedCompactionRevision = -1;
 
-    /** All operations are queued on this lock. */
-    private final Object mux = new Object();
-
+    /** Multi-threaded access is guarded by {@link #rwLock}. */
     private boolean areWatchesEnabled = false;
 
-    private final WatchProcessor watchProcessor;
-
+    /** Multi-threaded access is guarded by {@link #rwLock}. */
     private final List<Entry> updatedEntries = new ArrayList<>();
 
-    /**
-     * Revision listener for recovery only. Notifies {@link 
MetaStorageManagerImpl} of revision update.
-     * Guarded by {@link #mux}.
-     */
-    private @Nullable LongConsumer recoveryRevisionListener;
-
-    private final AtomicBoolean stopCompaction = new AtomicBoolean();
-
+    /** Constructor. */
     public SimpleInMemoryKeyValueStorage(String nodeName) {
-        this.watchProcessor = new WatchProcessor(nodeName, this::get, new 
NoOpFailureManager());
+        super(nodeName, new NoOpFailureManager());
     }
 
     @Override
@@ -162,21 +128,18 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         // no-op
     }
 
-    @Override
-    public long revision() {
-        synchronized (mux) {
-            return rev;
-        }
-    }
-
     @Override
     public void put(byte[] key, byte[] value, HybridTimestamp opTs) {
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             long curRev = rev + 1;
 
             doPut(key, value, curRev, opTs);
 
             updateRevision(curRev, opTs);
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
@@ -191,75 +154,38 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         notifyRevisionUpdate();
     }
 
-    /**
-     * Notifies of revision update.
-     * Must be called under the {@link #mux} lock.
-     */
-    private void notifyRevisionUpdate() {
-        if (recoveryRevisionListener != null) {
-            // Listener must be invoked only on recovery, after recovery 
listener must be null.
-            recoveryRevisionListener.accept(rev);
-        }
-    }
-
     @Override
     public void putAll(List<byte[]> keys, List<byte[]> values, HybridTimestamp 
opTs) {
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             long curRev = rev + 1;
 
             doPutAll(curRev, keys, values, opTs);
-        }
-    }
-
-    @Override
-    public Entry get(byte[] key) {
-        synchronized (mux) {
-            return doGet(key, rev);
-        }
-    }
-
-    @Override
-    public Entry get(byte[] key, long revUpperBound) {
-        synchronized (mux) {
-            return doGet(key, revUpperBound);
-        }
-    }
-
-
-    @Override
-    public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound) 
{
-        synchronized (mux) {
-            return doGet(key, revLowerBound, revUpperBound);
-        }
-    }
-
-    @Override
-    public List<Entry> getAll(List<byte[]> keys) {
-        synchronized (mux) {
-            return doGetAll(keys, rev);
-        }
-    }
-
-    @Override
-    public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
-        synchronized (mux) {
-            return doGetAll(keys, revUpperBound);
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public void remove(byte[] key, HybridTimestamp opTs) {
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             long curRev = rev + 1;
 
             doRemove(key, curRev, opTs);
             updateRevision(curRev, opTs);
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public void removeAll(List<byte[]> keys, HybridTimestamp opTs) {
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             long curRev = rev + 1;
 
             List<byte[]> existingKeys = new ArrayList<>(keys.size());
@@ -279,6 +205,8 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             }
 
             doPutAll(curRev, existingKeys, vals, opTs);
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
@@ -290,7 +218,9 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             HybridTimestamp opTs,
             CommandId commandId
     ) {
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             Collection<Entry> e = getAll(Arrays.asList(condition.keys()));
 
             boolean branch = condition.test(e.toArray(new Entry[]{}));
@@ -329,12 +259,16 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             updateRevision(curRev, opTs);
 
             return branch;
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public StatementResult invoke(If iif, HybridTimestamp opTs, CommandId 
commandId) {
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             If currIf = iif;
             while (true) {
                 Collection<Entry> e = 
getAll(Arrays.asList(currIf.cond().keys()));
@@ -380,33 +314,40 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
                     currIf = branch.iif();
                 }
             }
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
-        synchronized (mux) {
+        rwLock.readLock().lock();
+
+        try {
             return doRange(keyFrom, keyTo, rev);
+        } finally {
+            rwLock.readLock().unlock();
         }
     }
 
     @Override
     public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long 
revUpperBound) {
-        synchronized (mux) {
+        rwLock.readLock().lock();
+
+        try {
             return doRange(keyFrom, keyTo, revUpperBound);
+        } finally {
+            rwLock.readLock().unlock();
         }
     }
 
-    @Override
-    public byte @Nullable [] nextKey(byte[] key) {
-        return incrementPrefix(key);
-    }
-
     @Override
     public HybridTimestamp timestampByRevision(long revision) {
         assert revision >= 0 : revision;
 
-        synchronized (mux) {
+        rwLock.readLock().lock();
+
+        try {
             assertRequestedRevisionLessThanOrEqualToCurrent(revision, rev);
 
             HybridTimestamp timestamp = revToTsMap.get(revision);
@@ -416,12 +357,16 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             }
 
             return timestamp;
+        } finally {
+            rwLock.readLock().unlock();
         }
     }
 
     @Override
     public long revisionByTimestamp(HybridTimestamp timestamp) {
-        synchronized (mux) {
+        rwLock.readLock().lock();
+
+        try {
             Map.Entry<Long, Long> revisionEntry = 
tsToRevMap.floorEntry(timestamp.longValue());
 
             if (revisionEntry == null) {
@@ -429,62 +374,25 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             }
 
             return revisionEntry.getValue();
+        } finally {
+            rwLock.readLock().unlock();
         }
     }
 
-    @Override
-    public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
-        synchronized (mux) {
-            this.recoveryRevisionListener = listener;
-        }
-    }
-
-    @Override
-    public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, 
WatchListener listener) {
-        assert keyFrom != null : "keyFrom couldn't be null.";
-        assert rev > 0 : "rev must be positive.";
-
-        Predicate<byte[]> rangePredicate = keyTo == null
-                ? k -> CMP.compare(keyFrom, k) <= 0
-                : k -> CMP.compare(keyFrom, k) <= 0 && CMP.compare(keyTo, k) > 
0;
-
-        watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
-    }
-
-    @Override
-    public void watchExact(byte[] key, long rev, WatchListener listener) {
-        assert key != null : "key couldn't be null.";
-        assert rev > 0 : "rev must be positive.";
-
-        Predicate<byte[]> exactPredicate = k -> CMP.compare(k, key) == 0;
-
-        watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
-    }
-
-    @Override
-    public void watchExact(Collection<byte[]> keys, long rev, WatchListener 
listener) {
-        assert keys != null && !keys.isEmpty() : "keys couldn't be null or 
empty: " + keys;
-        assert rev > 0 : "rev must be positive.";
-
-        TreeSet<byte[]> keySet = new TreeSet<>(CMP);
-
-        keySet.addAll(keys);
-
-        Predicate<byte[]> inPredicate = keySet::contains;
-
-        watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
-    }
-
     @Override
     public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
-        assert startRevision != 0 : "First meaningful revision is 1";
+        assert startRevision > 0 : startRevision;
 
-        synchronized (mux) {
+        rwLock.readLock().lock();
+
+        try {
             areWatchesEnabled = true;
 
             watchProcessor.setRevisionCallback(revisionCallback);
 
             replayUpdates(startRevision);
+        } finally {
+            rwLock.readLock().unlock();
         }
     }
 
@@ -524,17 +432,14 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         updatedEntries.clear();
     }
 
-    @Override
-    public void removeWatch(WatchListener listener) {
-        watchProcessor.removeWatch(listener);
-    }
-
     @Override
     public void compact(long revision) {
         assert revision >= 0 : revision;
 
         for (Map.Entry<byte[], List<Long>> entry : keysIdx.entrySet()) {
-            synchronized (mux) {
+            rwLock.writeLock().lock();
+
+            try {
                 assertCompactionRevisionLessThanCurrent(revision, rev);
 
                 if (stopCompaction.get()) {
@@ -542,10 +447,14 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
                 }
 
                 compactForKey(entry.getKey(), toLongArray(entry.getValue()), 
revision);
+            } finally {
+                rwLock.writeLock().unlock();
             }
         }
 
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             for (Iterator<Map.Entry<Long, HybridTimestamp>> it = 
revToTsMap.entrySet().iterator(); it.hasNext(); ) {
                 Map.Entry<Long, HybridTimestamp> e = it.next();
 
@@ -557,14 +466,11 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
                     break;
                 }
             }
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
-    @Override
-    public void stopCompaction() {
-        stopCompaction.set(true);
-    }
-
     @Override
     public void close() {
         stopCompaction();
@@ -574,78 +480,82 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
 
     @Override
     public CompletableFuture<Void> snapshot(Path snapshotPath) {
-        synchronized (mux) {
-            try {
-                Files.createDirectories(snapshotPath);
+        rwLock.writeLock().lock();
 
-                Path snapshotFile = 
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
+        try {
+            Files.createDirectories(snapshotPath);
 
-                assertTrue(IgniteUtils.deleteIfExists(snapshotFile), 
snapshotFile.toString());
+            Path snapshotFile = 
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
 
-                Files.createFile(snapshotFile);
+            assertTrue(IgniteUtils.deleteIfExists(snapshotFile), 
snapshotFile.toString());
 
-                Map<Long, Map<byte[], ValueSnapshot>> revsIdxCopy = 
revsIdx.entrySet().stream()
-                        .collect(toMap(
-                                Map.Entry::getKey,
-                                revIdxEntry -> revIdxEntry.getValue()
-                                        .entrySet()
-                                        .stream()
-                                        .collect(toMap(Map.Entry::getKey, e -> 
new ValueSnapshot(e.getValue())))
-                        ));
+            Files.createFile(snapshotFile);
 
-                var snapshot = new SimpleInMemoryKeyValueStorageSnapshot(
-                        Map.copyOf(keysIdx),
-                        Map.copyOf(tsToRevMap),
-                        Map.copyOf(revToTsMap),
-                        revsIdxCopy,
-                        rev,
-                        savedCompactionRevision
-                );
+            Map<Long, Map<byte[], ValueSnapshot>> revsIdxCopy = 
revsIdx.entrySet().stream()
+                    .collect(toMap(
+                            Map.Entry::getKey,
+                            revIdxEntry -> revIdxEntry.getValue()
+                                    .entrySet()
+                                    .stream()
+                                    .collect(toMap(Map.Entry::getKey, e -> new 
ValueSnapshot(e.getValue())))
+                    ));
 
-                byte[] snapshotBytes = ByteUtils.toBytes(snapshot);
+            var snapshot = new SimpleInMemoryKeyValueStorageSnapshot(
+                    Map.copyOf(keysIdx),
+                    Map.copyOf(tsToRevMap),
+                    Map.copyOf(revToTsMap),
+                    revsIdxCopy,
+                    rev,
+                    savedCompactionRevision
+            );
 
-                Files.write(snapshotFile, snapshotBytes, WRITE);
+            byte[] snapshotBytes = ByteUtils.toBytes(snapshot);
 
-                return nullCompletedFuture();
-            } catch (Throwable t) {
-                return failedFuture(t);
-            }
+            Files.write(snapshotFile, snapshotBytes, WRITE);
+
+            return nullCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public void restoreSnapshot(Path snapshotPath) {
-        synchronized (mux) {
-            try {
-                keysIdx.clear();
-                tsToRevMap.clear();
-                revToTsMap.clear();
-                revsIdx.clear();
+        rwLock.writeLock().lock();
 
-                Path snapshotFile = 
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
+        try {
+            keysIdx.clear();
+            tsToRevMap.clear();
+            revToTsMap.clear();
+            revsIdx.clear();
 
-                assertTrue(Files.exists(snapshotPath), 
snapshotFile.toString());
+            Path snapshotFile = 
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
 
-                byte[] snapshotBytes = Files.readAllBytes(snapshotFile);
+            assertTrue(Files.exists(snapshotPath), snapshotFile.toString());
 
-                var snapshot = (SimpleInMemoryKeyValueStorageSnapshot) 
ByteUtils.fromBytes(snapshotBytes);
+            byte[] snapshotBytes = Files.readAllBytes(snapshotFile);
 
-                keysIdx.putAll(snapshot.keysIdx);
-                tsToRevMap.putAll(snapshot.tsToRevMap);
-                revToTsMap.putAll(snapshot.revToTsMap);
-                snapshot.revsIdx.forEach((revision, entries) -> {
-                    TreeMap<byte[], Value> entries0 = new TreeMap<>(CMP);
-                    entries.forEach((keyBytes, valueSnapshot) -> 
entries0.put(keyBytes, valueSnapshot.toValue()));
+            var snapshot = (SimpleInMemoryKeyValueStorageSnapshot) 
ByteUtils.fromBytes(snapshotBytes);
 
-                    revsIdx.put(revision, entries0);
-                });
+            keysIdx.putAll(snapshot.keysIdx);
+            tsToRevMap.putAll(snapshot.tsToRevMap);
+            revToTsMap.putAll(snapshot.revToTsMap);
+            snapshot.revsIdx.forEach((revision, entries) -> {
+                TreeMap<byte[], Value> entries0 = new 
TreeMap<>(KEY_COMPARATOR);
+                entries.forEach((keyBytes, valueSnapshot) -> 
entries0.put(keyBytes, valueSnapshot.toValue()));
 
-                rev = snapshot.rev;
-                compactionRevision = snapshot.savedCompactionRevision;
-                savedCompactionRevision = snapshot.savedCompactionRevision;
-            } catch (Throwable t) {
-                throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
-            }
+                revsIdx.put(revision, entries0);
+            });
+
+            rev = snapshot.rev;
+            compactionRevision = snapshot.savedCompactionRevision;
+            savedCompactionRevision = snapshot.savedCompactionRevision;
+        } catch (Throwable t) {
+            throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
@@ -699,79 +609,6 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    private List<Entry> doGetAll(List<byte[]> keys, long revUpperBound) {
-        assert !keys.isEmpty();
-        assert revUpperBound >= 0 : revUpperBound;
-
-        var res = new ArrayList<Entry>(keys.size());
-
-        for (byte[] key : keys) {
-            res.add(doGet(key, revUpperBound));
-        }
-
-        return res;
-    }
-
-    private Entry doGet(byte[] key, long revUpperBound) {
-        assert revUpperBound >= 0 : revUpperBound;
-
-        long[] keyRevisions = toLongArray(keysIdx.get(key));
-        int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
-        if (maxRevisionIndex == NOT_FOUND) {
-            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
 compactionRevision);
-
-            return EntryImpl.empty(key);
-        }
-
-        long revision = keyRevisions[maxRevisionIndex];
-
-        Value value = getValue(key, revision);
-
-        if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions, 
maxRevisionIndex) || value.tombstone())) {
-            throw new CompactedException(revUpperBound, compactionRevision);
-        }
-
-        return EntryImpl.toEntry(key, revision, value);
-    }
-
-    private List<Entry> doGet(byte[] key, long revLowerBound, long 
revUpperBound) {
-        assert revLowerBound >= 0 : revLowerBound;
-        assert revUpperBound >= 0 : revUpperBound;
-        assert revUpperBound >= revLowerBound : "revLowerBound=" + 
revLowerBound + ", revUpperBound=" + revUpperBound;
-
-        long[] keyRevisions = toLongArray(keysIdx.get(key));
-
-        int minRevisionIndex = minRevisionIndex(keyRevisions, revLowerBound);
-        int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
-        if (minRevisionIndex == NOT_FOUND || maxRevisionIndex == NOT_FOUND) {
-            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
 compactionRevision);
-
-            return List.of();
-        }
-
-        var entries = new ArrayList<Entry>();
-
-        for (int i = minRevisionIndex; i <= maxRevisionIndex; i++) {
-            long revision = keyRevisions[i];
-
-            Value value = getValue(key, revision);
-
-            if (revision <= compactionRevision && (!isLastIndex(keyRevisions, 
i) || value.tombstone())) {
-                continue;
-            }
-
-            entries.add(EntryImpl.toEntry(key, revision, value));
-        }
-
-        if (entries.isEmpty()) {
-            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
 compactionRevision);
-        }
-
-        return entries;
-    }
-
     private void doPut(byte[] key, byte[] bytes, long curRev, HybridTimestamp 
opTs) {
         // Update keysIdx.
         List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
@@ -785,7 +622,7 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
                 curRev,
                 (rev, entries) -> {
                     if (entries == null) {
-                        entries = new TreeMap<>(CMP);
+                        entries = new TreeMap<>(KEY_COMPARATOR);
                     }
 
                     entries.put(key, val);
@@ -801,61 +638,43 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     private void doPutAll(long curRev, List<byte[]> keys, List<byte[]> 
bytesList, HybridTimestamp opTs) {
-        synchronized (mux) {
-            // Update revsIdx.
-            NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
-
-            for (int i = 0; i < keys.size(); i++) {
-                byte[] key = keys.get(i);
-
-                byte[] bytes = bytesList.get(i);
+        // Update revsIdx.
+        NavigableMap<byte[], Value> entries = new TreeMap<>(KEY_COMPARATOR);
 
-                // Update keysIdx.
-                List<Long> revs = keysIdx.computeIfAbsent(key, k -> new 
ArrayList<>());
+        for (int i = 0; i < keys.size(); i++) {
+            byte[] key = keys.get(i);
 
-                revs.add(curRev);
+            byte[] bytes = bytesList.get(i);
 
-                Value val = new Value(bytes, opTs);
+            // Update keysIdx.
+            List<Long> revs = keysIdx.computeIfAbsent(key, k -> new 
ArrayList<>());
 
-                entries.put(key, val);
+            revs.add(curRev);
 
-                updatedEntries.add(new EntryImpl(key, bytes, curRev, opTs));
+            Value val = new Value(bytes, opTs);
 
-                revsIdx.put(curRev, entries);
-            }
+            entries.put(key, val);
 
-            updateRevision(curRev, opTs);
+            updatedEntries.add(new EntryImpl(key, bytes, curRev, opTs));
 
+            revsIdx.put(curRev, entries);
         }
-    }
 
-    private static long lastRevision(List<Long> revs) {
-        return revs.get(revs.size() - 1);
-    }
-
-    @Override
-    public void registerRevisionUpdateListener(RevisionUpdateListener 
listener) {
-        watchProcessor.registerRevisionUpdateListener(listener);
-    }
-
-    @Override
-    public void unregisterRevisionUpdateListener(RevisionUpdateListener 
listener) {
-        watchProcessor.unregisterRevisionUpdateListener(listener);
-    }
-
-    @Override
-    public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long 
newRevision) {
-        return watchProcessor.notifyUpdateRevisionListeners(newRevision);
+        updateRevision(curRev, opTs);
     }
 
     @Override
     public void advanceSafeTime(HybridTimestamp newSafeTime) {
-        synchronized (mux) {
+        rwLock.writeLock().lock();
+
+        try {
             if (!areWatchesEnabled) {
                 return;
             }
 
             watchProcessor.advanceSafeTime(newSafeTime);
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
@@ -863,28 +682,14 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     public void saveCompactionRevision(long revision) {
         assert revision >= 0 : revision;
 
-        synchronized (mux) {
-            assertCompactionRevisionLessThanCurrent(revision, rev);
+        rwLock.writeLock().lock();
 
-            savedCompactionRevision = revision;
-        }
-    }
-
-    @Override
-    public void setCompactionRevision(long revision) {
-        assert revision >= 0 : revision;
-
-        synchronized (mux) {
+        try {
             assertCompactionRevisionLessThanCurrent(revision, rev);
 
-            compactionRevision = revision;
-        }
-    }
-
-    @Override
-    public long getCompactionRevision() {
-        synchronized (mux) {
-            return compactionRevision;
+            savedCompactionRevision = revision;
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
@@ -917,7 +722,13 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         return value.tombstone();
     }
 
-    private Value getValue(byte[] key, long revision) {
+    @Override
+    protected long[] keyRevisionsForOperation(byte[] key) {
+        return toLongArray(keysIdx.get(key));
+    }
+
+    @Override
+    protected Value valueForOperation(byte[] key, long revision) {
         Value value = getValueNullable(key, revision);
 
         assert value != null : "key=" + toUtf8String(key) + ", revision=" + 
revision;
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index f7131c30ed..9c964b7665 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -29,6 +29,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.WatchProcessor;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 
@@ -66,12 +67,10 @@ public class WatchListenerInhibitor {
      * @return Listener inhibitor.
      */
     public static WatchListenerInhibitor 
metastorageEventsInhibitor(MetaStorageManager metaStorageManager) {
-        var metaStorageManager0 = metaStorageManager;
-
         // TODO: IGNITE-15723 After a component factory is implemented, need 
to got rid of reflection here.
-        var storage = (RocksDbKeyValueStorage) 
getFieldValue(metaStorageManager0, MetaStorageManagerImpl.class, "storage");
+        var storage = (RocksDbKeyValueStorage) 
getFieldValue(metaStorageManager, MetaStorageManagerImpl.class, "storage");
 
-        var watchProcessor = (WatchProcessor) getFieldValue(storage, 
RocksDbKeyValueStorage.class, "watchProcessor");
+        var watchProcessor = (WatchProcessor) getFieldValue(storage, 
AbstractKeyValueStorage.class, "watchProcessor");
 
         return new WatchListenerInhibitor(watchProcessor, storage);
     }
@@ -81,7 +80,7 @@ public class WatchListenerInhibitor {
         this.storage = storage;
 
         processorNotificationFutureField = getField(watchProcessor, 
WatchProcessor.class, "notificationFuture");
-        storageRwLockField = getField(storage, RocksDbKeyValueStorage.class, 
"rwLock");
+        storageRwLockField = getField(storage, AbstractKeyValueStorage.class, 
"rwLock");
     }
 
     /**

Reply via email to