This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 79f9e3f1214 IGNITE-25030 Remove rwLock from rocksdb-based meta-storage 
(#5670)
79f9e3f1214 is described below

commit 79f9e3f1214416ba8d0d66f6f22375d1626fa045
Author: Ivan Bessonov <[email protected]>
AuthorDate: Wed Apr 23 12:12:31 2025 +0300

    IGNITE-25030 Remove rwLock from rocksdb-based meta-storage (#5670)
---
 .../impl/ItMetaStorageServicePersistenceTest.java  |   8 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  35 +-
 .../server/AbstractKeyValueStorage.java            | 177 +++------
 .../server/ReadOperationForCompactionTracker.java  | 106 ++++--
 .../server/persistence/RocksDbKeyValueStorage.java | 400 +++++++++++----------
 .../server/persistence/WriteBatchProtector.java    |  87 +++++
 .../AbstractCompactionKeyValueStorageTest.java     |  41 ++-
 .../ReadOperationForCompactionTrackerTest.java     | 143 +++++---
 .../persistence/WriteBatchProtectorTest.java       |  82 +++++
 .../server/SimpleInMemoryKeyValueStorage.java      |  99 ++++-
 10 files changed, 765 insertions(+), 413 deletions(-)

diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 82201e1b2ee..5c33f2c55ad 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.ANY_TIMESTAMP;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -138,14 +137,9 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
 
         KeyValueStorage storage = storageByName.get(node.name());
 
-        byte[] lastKey = interactedAfterSnapshot ? SECOND_KEY.bytes() : 
FIRST_KEY.bytes();
-        byte[] lastValue = interactedAfterSnapshot ? SECOND_VALUE : 
FIRST_VALUE;
-
         int expectedRevision = interactedAfterSnapshot ? 4 : 3;
 
-        Entry expectedLastEntry = new EntryImpl(lastKey, lastValue, 
expectedRevision, ANY_TIMESTAMP);
-
-        return () -> TestMetasStorageUtils.equals(storage.get(lastKey), 
expectedLastEntry);
+        return () -> storage.revision() == expectedRevision;
     }
 
     @Override
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index e5930391d20..434bdaaf7d5 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -84,6 +84,7 @@ import 
org.apache.ignite.internal.metastorage.impl.raft.MetaStorageSnapshotStora
 import org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
 import 
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -1303,29 +1304,27 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
             long operationRevision,
             Supplier<CompletableFuture<T>> readFromLeader
     ) {
-        long trackingRevision = operationRevision == LATEST_REVISION ? 
storage.revision() : operationRevision;
-
-        long readOperationId = 
readOperationFromLeaderForCompactionTracker.generateReadOperationId();
-
-        readOperationFromLeaderForCompactionTracker.track(readOperationId, 
trackingRevision);
+        TrackingToken token = 
readOperationFromLeaderForCompactionTracker.track(
+                operationRevision,
+                storage::revision,
+                storage::getCompactionRevision
+        );
 
         try {
-            return readFromLeader.get().whenComplete(
-                    (t, throwable) -> 
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
trackingRevision)
-            );
+            return readFromLeader.get().whenComplete((t, throwable) -> 
token.close());
         } catch (Throwable t) {
-            
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
trackingRevision);
+            token.close();
 
             throw t;
         }
     }
 
     private Publisher<Entry> withTrackReadOperationFromLeaderPublisher(long 
operationRevision, Supplier<Publisher<Entry>> readFromLeader) {
-        long trackingRevision = operationRevision == LATEST_REVISION ? 
storage.revision() : operationRevision;
-
-        long readOperationId = 
readOperationFromLeaderForCompactionTracker.generateReadOperationId();
-
-        readOperationFromLeaderForCompactionTracker.track(readOperationId, 
trackingRevision);
+        TrackingToken token = 
readOperationFromLeaderForCompactionTracker.track(
+                operationRevision,
+                storage::revision,
+                storage::getCompactionRevision
+        );
 
         try {
             Publisher<Entry> publisherFromLeader = readFromLeader.get();
@@ -1341,7 +1340,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
                         @Override
                         public void cancel() {
-                            
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
trackingRevision);
+                            token.close();
 
                             subscription.cancel();
                         }
@@ -1355,20 +1354,20 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
                 @Override
                 public void onError(Throwable throwable) {
-                    
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
trackingRevision);
+                    token.close();
 
                     subscriber.onError(throwable);
                 }
 
                 @Override
                 public void onComplete() {
-                    
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
trackingRevision);
+                    token.close();
 
                     subscriber.onComplete();
                 }
             });
         } catch (Throwable t) {
-            
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
trackingRevision);
+            token.close();
 
             throw t;
         }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index 0a3e9805098..980e72d41b6 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -32,8 +32,6 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -54,19 +52,17 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
     protected final IgniteLogger log = Loggers.forClass(getClass());
 
-    protected final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
     protected final FailureProcessor failureProcessor;
 
     protected final WatchProcessor watchProcessor;
 
+    protected final Object watchProcessorMutex = new Object();
+
     /**
      * Revision listener for recovery only. Notifies {@link 
MetaStorageManagerImpl} of current revisions update, {@code null} if recovery
      * is complete.
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
-    private @Nullable RecoveryRevisionsListener recoveryRevisionListener;
+    private volatile @Nullable RecoveryRevisionsListener 
recoveryRevisionListener;
 
     /**
      * Revision. Will be incremented for each single-entry or multi-entry 
update operation.
@@ -78,8 +74,6 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
      *
      * <p>This field is used by metastorage read methods to determine whether 
{@link CompactedException} should be thrown.</p>
      *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
-     *
      * <p>Field with a volatile so as not to be blocked when reading the 
compaction revision by other components, for example due to long
      * write operations, including the compaction itself.</p>
      */
@@ -93,22 +87,18 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
      * we can get into a gap when commands came from different leaders to the 
same compaction revision, but we simply did not have time to
      * process the update of the compaction revision from the previous leader. 
This is necessary to cover corner cases with a sufficiently
      * small compaction revision update interval.</p>
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
-    private volatile long planedUpdateCompactionRevision = -1;
+    private volatile long plannedUpdateCompactionRevision = -1;
 
     protected final AtomicBoolean stopCompaction = new AtomicBoolean();
 
-    /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
+    /** Tracks only cursors. */
     protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker;
 
     /**
      * Events for notification of the {@link WatchProcessor} that were created 
before the {@link #startWatches start of watches}, after the
      * start of watches there will be {@code null}. Events are sorted by 
{@link NotifyWatchProcessorEvent#timestamp} and are expected to
      * have no duplicates.
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
     protected @Nullable TreeSet<NotifyWatchProcessorEvent> 
notifyWatchProcessorEventsBeforeStartingWatches = new TreeSet<>();
 
@@ -136,13 +126,15 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     /** Returns the key revisions for operation, an empty array if not found. 
*/
     protected abstract long[] keyRevisionsForOperation(byte[] key);
 
-    /** Returns key values by revision for operation. */
-    protected abstract Value valueForOperation(byte[] key, long revision);
+    /**
+     * Returns key values by revision for operation.
+     *
+     * @throws CompactedException If value is compacted for this particular 
revision.
+     */
+    protected abstract Value valueForOperation(byte[] key, long revision) 
throws CompactedException;
 
     /**
      * Returns {@code true} if the metastorage is in the recovery state.
-     *
-     * <p>Method is expected to be invoked under {@link #rwLock}.</p>
      */
     private boolean isInRecoveryState() {
         return recoveryRevisionListener != null;
@@ -150,79 +142,40 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
     /**
      * Returns {@code true} if the watches have {@link #startWatches started}.
-     *
-     * <p>Method is expected to be invoked under {@link #rwLock}.</p>
      */
     protected abstract boolean areWatchesStarted();
 
     @Override
     public Entry get(byte[] key) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGet(key, rev);
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return doGet(key, rev);
     }
 
     @Override
     public Entry get(byte[] key, long revUpperBound) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGet(key, revUpperBound);
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return doGet(key, revUpperBound);
     }
 
     @Override
     public List<Entry> getAll(List<byte[]> keys) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGetAll(keys, rev);
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return doGetAll(keys, rev);
     }
 
     @Override
     public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
-        rwLock.readLock().lock();
-
-        try {
-            return doGetAll(keys, revUpperBound);
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return doGetAll(keys, revUpperBound);
     }
 
     @Override
     public long revision() {
-        rwLock.readLock().lock();
-
-        try {
-            return rev;
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return rev;
     }
 
     @Override
     public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context) {
         assert revision >= 0 : revision;
+        assertCompactionRevisionLessThanCurrent(revision, rev);
 
-        rwLock.writeLock().lock();
-
-        try {
-            assertCompactionRevisionLessThanCurrent(revision, rev);
-
-            saveCompactionRevision(revision, context, true);
-        } finally {
-            rwLock.writeLock().unlock();
-        }
+        saveCompactionRevision(revision, context, true);
     }
 
     protected abstract void saveCompactionRevision(long compactionRevision, 
KeyValueUpdateContext context, boolean advanceSafeTime);
@@ -231,17 +184,11 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     public void setCompactionRevision(long revision) {
         assert revision >= 0 : revision;
 
-        rwLock.writeLock().lock();
+        assertCompactionRevisionLessThanCurrent(revision, rev);
 
-        try {
-            assertCompactionRevisionLessThanCurrent(revision, rev);
+        compactionRevision = revision;
 
-            compactionRevision = revision;
-
-            notifyRevisionsUpdate();
-        } finally {
-            rwLock.writeLock().unlock();
-        }
+        notifyRevisionsUpdate();
     }
 
     @Override
@@ -253,28 +200,22 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     public void updateCompactionRevision(long compactionRevision, 
KeyValueUpdateContext context) {
         assert compactionRevision >= 0 : compactionRevision;
 
-        rwLock.writeLock().lock();
+        assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
 
-        try {
-            assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
+        saveCompactionRevision(compactionRevision, context, false);
 
-            saveCompactionRevision(compactionRevision, context, false);
+        if (isInRecoveryState()) {
+            setCompactionRevision(compactionRevision);
+        } else if (compactionRevision > plannedUpdateCompactionRevision) {
+            plannedUpdateCompactionRevision = compactionRevision;
 
-            if (isInRecoveryState()) {
+            notifyWatchProcessor(new AdvanceSafeTimeEvent(() -> {
                 setCompactionRevision(compactionRevision);
-            } else if (compactionRevision > planedUpdateCompactionRevision) {
-                planedUpdateCompactionRevision = compactionRevision;
 
-                notifyWatchProcessor(new AdvanceSafeTimeEvent(() -> {
-                    setCompactionRevision(compactionRevision);
-
-                    compactionRevisionUpdateListeners.forEach(listener -> 
listener.onUpdate(compactionRevision));
-                }, context.timestamp));
-            } else if (areWatchesStarted()) {
-                watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
-            }
-        } finally {
-            rwLock.writeLock().unlock();
+                compactionRevisionUpdateListeners.forEach(listener -> 
listener.onUpdate(compactionRevision));
+            }, context.timestamp));
+        } else if (areWatchesStarted()) {
+            watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
         }
     }
 
@@ -317,13 +258,7 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
     @Override
     public void setRecoveryRevisionsListener(@Nullable 
RecoveryRevisionsListener listener) {
-        rwLock.writeLock().lock();
-
-        try {
-            this.recoveryRevisionListener = listener;
-        } finally {
-            rwLock.writeLock().unlock();
-        }
+        this.recoveryRevisionListener = listener;
     }
 
     @Override
@@ -365,7 +300,7 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
         watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
     }
 
-    /** Notifies of revision update. Must be called under the {@link #rwLock}. 
*/
+    /** Notifies of revision update. */
     protected void notifyRevisionsUpdate() {
         if (recoveryRevisionListener != null) {
             // Listener must be invoked only on recovery, after recovery 
listener must be null.
@@ -389,8 +324,8 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
         Value value = valueForOperation(key, revision);
 
-        if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions, 
maxRevisionIndex) || value.tombstone())) {
-            throw new CompactedException(revUpperBound, compactionRevision);
+        if (!isLastIndex(keyRevisions, maxRevisionIndex) || value.tombstone()) 
{
+            
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
 compactionRevision);
         }
 
         return EntryImpl.toEntry(key, revision, value);
@@ -411,28 +346,16 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
     @Override
     public void advanceSafeTime(KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
+        setIndexAndTerm(context.index, context.term);
 
-        try {
-            setIndexAndTerm(context.index, context.term);
-
-            if (areWatchesStarted()) {
-                watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
-            }
-        } finally {
-            rwLock.writeLock().unlock();
+        if (areWatchesStarted()) {
+            watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
         }
     }
 
     @Override
     public Revisions revisions() {
-        rwLock.readLock().lock();
-
-        try {
-            return createCurrentRevisions();
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return createCurrentRevisions();
     }
 
     private Revisions createCurrentRevisions() {
@@ -440,20 +363,24 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     }
 
     protected void notifyWatchProcessor(NotifyWatchProcessorEvent event) {
-        if (areWatchesStarted()) {
-            event.notify(watchProcessor);
-        } else {
-            boolean added = 
notifyWatchProcessorEventsBeforeStartingWatches.add(event);
+        synchronized (watchProcessorMutex) {
+            if (areWatchesStarted()) {
+                event.notify(watchProcessor);
+            } else {
+                boolean added = 
notifyWatchProcessorEventsBeforeStartingWatches.add(event);
 
-            assert added : event;
+                assert added : event;
+            }
         }
     }
 
     protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
-        assert !areWatchesStarted();
+        synchronized (watchProcessorMutex) {
+            assert !areWatchesStarted();
 
-        notifyWatchProcessorEventsBeforeStartingWatches.forEach(event -> 
event.notify(watchProcessor));
+            notifyWatchProcessorEventsBeforeStartingWatches.forEach(event -> 
event.notify(watchProcessor));
 
-        notifyWatchProcessorEventsBeforeStartingWatches = null;
+            notifyWatchProcessorEventsBeforeStartingWatches = null;
+        }
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
index 2760099e359..2baae964773 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
@@ -25,6 +25,9 @@ import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.CompletableFutures;
@@ -35,9 +38,8 @@ import org.apache.ignite.internal.util.CompletableFutures;
  *
  * <p>Expected usage:</p>
  * <ul>
- *     <li>Before starting execution, the reading command invoke {@link 
#track} with its ID and the lowest estimation for revision upper
- *     bound.
- *     <li>After completion, the reading command will invoke {@link #untrack} 
with the same arguments as when calling {@link #track},
+ *     <li>Before starting execution, the reading command invokes {@link 
#track} with the lowest estimation for revision upper bound.
+ *     <li>After completion, the reading command will invoke {@link 
TrackingToken#close()} on an instance returned by {@link #track},
  *     regardless of whether the operation was successful or not.</li>
  *     <li>{@link #collect} will be invoked only after a new compaction 
revision has been set
  *     ({@link KeyValueStorage#setCompactionRevision}) for a new compaction 
revision.</li>
@@ -48,56 +50,86 @@ public class ReadOperationForCompactionTracker {
 
     private final AtomicLong longOperationIdGenerator = new AtomicLong();
 
-    /** Generates the next read operation ID. Thread-safe. */
-    public long generateReadOperationId() {
-        return longOperationIdGenerator.getAndIncrement();
+    /**
+     * Token to stop tracking the read operation.
+     *
+     * @see #track(long, LongSupplier, LongSupplier)
+     */
+    @FunctionalInterface
+    public interface TrackingToken extends AutoCloseable {
+        @Override
+        void close();
     }
 
     /**
      * Starts tracking the completion of a read operation on its lowest 
estimation for revision upper bound.
      *
-     * <p>Method is expected not to be called more than once for the same 
arguments.</p>
-     *
      * <p>Expected usage pattern:</p>
      * <pre><code>
-     *     Object readOperationId = ...;
      *     int operationRevision = ...;
      *
-     *     tracker.track(readOperationId, operationRevision);
-     *
-     *     try {
+     *     try (var token = tracker.track(operationRevision, 
storage::revision, storage::getCompactionRevision)) {
      *         doReadOperation(...);
-     *     } finally {
-     *         tracker.untrack(readOperationId, operationRevision);
      *     }
      * </code></pre>
      *
-     * @see #untrack(Object, long)
+     * <p>Or you can use an explicit {@link TrackingToken#close()} call if 
execution context requires that, for example if your operation is
+     * asynchronous.
+     *
+     * @see TrackingToken
      */
-    public void track(Object readOperationId, long operationRevision) {
-        var key = new ReadOperationKey(readOperationId, operationRevision);
+    public TrackingToken track(
+            long operationRevision, LongSupplier latestRevision, LongSupplier 
compactedRevision
+    ) throws CompactedException {
+        long operationId = longOperationIdGenerator.getAndIncrement();
+
+        while (true) {
+            // "operationRevision" parameter is immutable, because we might 
need it on a next iteration.
+            // If it is asked to track the latest revision, we receive it 
right here from the corresponding supplier.
+            long currentOperationRevision = operationRevision == 
MetaStorageManager.LATEST_REVISION
+                    ? latestRevision.getAsLong()
+                    : operationRevision;
+
+            // Value from compacted revision supplier can only grow. We only 
use it for upper bound checks, so it's safe to read it every
+            // time instead of caching it. It applies to all usages of the 
supplier.
+            if (currentOperationRevision <= compactedRevision.getAsLong()) {
+                // Latest revision can never be compacted. If for some reason 
latest revision is concurrently updated and this revision is
+                // already compacted, we should retry until we succeed. That's 
quite a lag, but it is possible in theory and in tests.
+                if (operationRevision == MetaStorageManager.LATEST_REVISION) {
+                    continue;
+                }
+
+                throw new CompactedException(currentOperationRevision, 
compactedRevision.getAsLong());
+            }
 
-        CompletableFuture<Void> previous = 
readOperationFutureByKey.putIfAbsent(key, new CompletableFuture<>());
+            var key = new ReadOperationKey(operationId, 
currentOperationRevision);
 
-        assert previous == null : key;
-    }
+            TrackingToken trackingToken = () -> {
+                CompletableFuture<Void> removed = 
readOperationFutureByKey.remove(key);
 
-    /**
-     * Stops tracking the read operation. {@code operationRevision} must match 
the corresponding value from {@link #track}.
-     *
-     * <p>Method is expected not to be called more than once for the same 
arguments, and {@link #track} was previously called for same
-     * arguments.</p>
-     *
-     * @see #track(Object, long)
-     */
-    public void untrack(Object readOperationId, long operationRevision) {
-        var key = new ReadOperationKey(readOperationId, operationRevision);
+                // Might be null, that's fine, "close" can be called multiple 
times.
+                if (removed != null) {
+                    removed.complete(null);
+                }
+            };
 
-        CompletableFuture<Void> removed = readOperationFutureByKey.remove(key);
+            CompletableFuture<Void> operationFuture = new 
CompletableFuture<>();
+            readOperationFutureByKey.put(key, operationFuture);
 
-        assert removed != null : key;
+            // If this condition passes, it is possible that a future returned 
by "collect" in a compaction thread is already completed.
+            // It is not safe to proceed with tracking, we have to invalidate 
current token and either retry, or throw an exception.
+            if (currentOperationRevision <= compactedRevision.getAsLong()) {
+                trackingToken.close();
 
-        removed.complete(null);
+                if (operationRevision == MetaStorageManager.LATEST_REVISION) {
+                    continue;
+                }
+
+                throw new CompactedException(currentOperationRevision, 
compactedRevision.getAsLong());
+            }
+
+            return trackingToken;
+        }
     }
 
     /**
@@ -115,11 +147,11 @@ public class ReadOperationForCompactionTracker {
 
     private static class ReadOperationKey {
         @IgniteToStringInclude
-        private final Object readOperationId;
+        private final long readOperationId;
 
         private final long operationRevision;
 
-        private ReadOperationKey(Object readOperationId, long 
operationRevision) {
+        private ReadOperationKey(long readOperationId, long operationRevision) 
{
             assert operationRevision >= 0 : operationRevision;
 
             this.readOperationId = readOperationId;
@@ -137,12 +169,12 @@ public class ReadOperationForCompactionTracker {
 
             ReadOperationKey that = (ReadOperationKey) o;
 
-            return operationRevision == that.operationRevision && 
readOperationId.equals(that.readOperationId);
+            return operationRevision == that.operationRevision && 
readOperationId == that.readOperationId;
         }
 
         @Override
         public int hashCode() {
-            int result = readOperationId.hashCode();
+            int result = Long.hashCode(readOperationId);
             result = 31 * result + Long.hashCode(operationRevision);
             return result;
         }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 7bd4add855a..566f028fb3b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.metastorage.server.persistence;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.metastorage.MetaStorageManager.LATEST_REVISION;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent;
@@ -74,7 +75,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
+import java.util.function.LongSupplier;
 import org.apache.ignite.internal.components.NoOpLogSyncer;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -97,6 +98,7 @@ import 
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
 import org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
 import org.apache.ignite.internal.metastorage.server.NotifyWatchProcessorEvent;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
 import org.apache.ignite.internal.metastorage.server.Statement;
 import org.apache.ignite.internal.metastorage.server.UpdateEntriesEvent;
 import org.apache.ignite.internal.metastorage.server.UpdateOnlyRevisionEvent;
@@ -109,7 +111,6 @@ import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -190,6 +191,18 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         RocksDB.loadLibrary();
     }
 
+    private final LongSupplier revSupplier = () -> rev;
+
+    private final LongSupplier compactedRevSupplier = () -> compactionRevision;
+
+    /**
+     * Only read or modifying while being synchronized on itself.
+     *
+     * @see #writeBatch(WriteBatch)
+     * @see #writeCompactedBatch(List, WriteBatch)
+     */
+    private final WriteBatchProtector writeBatchProtector = new 
WriteBatchProtector();
+
     /** Executor for storage operations. */
     private final ExecutorService executor;
 
@@ -229,10 +242,8 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     /**
      * Facility to work with checksums.
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
-    private MetastorageChecksum checksum;
+    private volatile MetastorageChecksum checksum;
 
     /** Status of the watch recovery process. */
     private enum RecoveryStatus {
@@ -251,8 +262,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
      * Current list of updated entries.
      *
      * <p>Since this list gets read and updated only on writes (under a write 
lock), no extra synchronisation is needed.</p>
-     *
-     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
      */
     private final UpdatedEntries updatedEntries = new UpdatedEntries();
 
@@ -261,13 +270,10 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     /**
      * Write options used to write to RocksDB.
-     *
-     * <p>Access is guarded by {@link #rwLock}.
      */
-    private WriteOptions writeOptions;
+    private volatile WriteOptions writeOptions;
 
-    /** Multi-threaded access is guarded by {@link #rwLock}. */
-    private RocksDbFlusher flusher;
+    private volatile RocksDbFlusher flusher;
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -311,8 +317,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     private void startBusy() {
-        rwLock.writeLock().lock();
-
         try {
             Files.createDirectories(dbPath);
 
@@ -321,8 +325,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             closeRocksResources();
 
             throw new MetaStorageException(STARTING_STORAGE_ERR, "Failed to 
start the storage", e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
@@ -489,12 +491,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
 
-        rwLock.writeLock().lock();
-        try {
-            closeRocksResources();
-        } finally {
-            rwLock.writeLock().unlock();
-        }
+        closeRocksResources();
     }
 
     private void closeRocksResources() {
@@ -505,22 +502,15 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public CompletableFuture<Void> snapshot(Path snapshotPath) {
-        rwLock.writeLock().lock();
-
-        try {
-            return snapshotManager
-                    .createSnapshot(snapshotPath)
-                    .thenCompose(unused -> flush());
-        } finally {
-            rwLock.writeLock().unlock();
-        }
+        return snapshotManager
+                .createSnapshot(snapshotPath)
+                .thenCompose(unused -> flush());
     }
 
     @Override
     public void restoreSnapshot(Path path) {
-        rwLock.writeLock().lock();
-
         try {
+            // TODO IGNITE-25213 Ensure that this doesn't happen on a running 
node.
             clear();
 
             snapshotManager.restoreSnapshot(path);
@@ -538,15 +528,25 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             throw e;
         } catch (Exception e) {
             throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to 
restore snapshot", e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
     @Override
-    public void put(byte[] key, byte[] value, KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
+    public Entry get(byte[] key) {
+        try (TrackingToken token = 
readOperationForCompactionTracker.track(LATEST_REVISION, revSupplier, 
compactedRevSupplier)) {
+            return super.get(key);
+        }
+    }
+
+    @Override
+    public List<Entry> getAll(List<byte[]> keys) {
+        try (TrackingToken token = 
readOperationForCompactionTracker.track(LATEST_REVISION, revSupplier, 
compactedRevSupplier)) {
+            return super.getAll(keys);
+        }
+    }
 
+    @Override
+    public void put(byte[] key, byte[] value, KeyValueUpdateContext context) {
         try (WriteBatch batch = new WriteBatch()) {
             long newChecksum = checksum.wholePut(key, value);
 
@@ -559,8 +559,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             completeAndWriteBatch(batch, curRev, context, newChecksum);
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
@@ -590,30 +588,22 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public void saveConfiguration(byte[] configuration, long lastAppliedIndex, 
long lastAppliedTerm) {
-        rwLock.writeLock().lock();
-
         try (WriteBatch batch = new WriteBatch()) {
             data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, 
lastAppliedIndex, lastAppliedTerm));
             data.put(batch, CONFIGURATION_KEY, configuration);
 
-            db.write(writeOptions, batch);
+            writeBatch(batch);
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public byte @Nullable [] getConfiguration() {
-        rwLock.readLock().lock();
-
         try {
             return data.get(CONFIGURATION_KEY);
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.readLock().unlock();
         }
     }
 
@@ -666,9 +656,9 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         addIndexAndTermToWriteBatch(batch, context);
 
-        db.write(writeOptions, batch);
+        writeBatch(batch);
+        rev = newRev; // Done.
 
-        rev = newRev;
         checksum.commitRound(newChecksum);
         updatedEntries.ts = ts;
 
@@ -704,8 +694,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public void putAll(List<byte[]> keys, List<byte[]> values, 
KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
-
         try (WriteBatch batch = new WriteBatch()) {
             long newChecksum = checksum.wholePutAll(keys, values);
 
@@ -720,15 +708,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             completeAndWriteBatch(batch, curRev, context, newChecksum);
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public void remove(byte[] key, KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
-
         try (WriteBatch batch = new WriteBatch()) {
             long newChecksum = checksum.wholeRemove(key);
 
@@ -741,15 +725,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             completeAndWriteBatch(batch, curRev, context, newChecksum);
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public void removeAll(List<byte[]> keys, KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
-
         try (WriteBatch batch = new WriteBatch()) {
             long newChecksum = checksum.wholeRemoveAll(keys);
 
@@ -770,15 +750,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             completeAndWriteBatch(batch, curRev, context, newChecksum);
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public void removeByPrefix(byte[] prefix, KeyValueUpdateContext context) {
-        rwLock.writeLock().lock();
-
         try (
                 WriteBatch batch = new WriteBatch();
                 Cursor<Entry> entryCursor = range(prefix, nextKey(prefix))
@@ -796,8 +772,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             completeAndWriteBatch(batch, curRev, context, 
checksum.wholeRemoveByPrefix(prefix));
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
@@ -809,8 +783,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             KeyValueUpdateContext context,
             CommandId commandId
     ) {
-        rwLock.writeLock().lock();
-
         try {
             Entry[] entries = 
getAll(Arrays.asList(condition.keys())).toArray(new Entry[]{});
 
@@ -826,15 +798,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             return branch;
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
     @Override
     public StatementResult invoke(If iif, KeyValueUpdateContext context, 
CommandId commandId) {
-        rwLock.writeLock().lock();
-
         try {
             If currIf = iif;
 
@@ -868,8 +836,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             }
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
@@ -928,24 +894,12 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
-        rwLock.readLock().lock();
-
-        try {
-            return doRange(keyFrom, keyTo, rev);
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return doRange(keyFrom, keyTo, rev);
     }
 
     @Override
     public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long 
revUpperBound) {
-        rwLock.readLock().lock();
-
-        try {
-            return doRange(keyFrom, keyTo, revUpperBound);
-        } finally {
-            rwLock.readLock().unlock();
-        }
+        return doRange(keyFrom, keyTo, revUpperBound);
     }
 
     @Override
@@ -954,9 +908,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         long currentRevision;
 
-        rwLock.readLock().lock();
-
-        try {
+        synchronized (watchProcessorMutex) {
             watchProcessor.setWatchEventHandlingCallback(callback);
 
             currentRevision = rev;
@@ -969,17 +921,13 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 // If revision is not 0, we need to replay updates that match 
the existing data.
                 recoveryStatus.set(RecoveryStatus.IN_PROGRESS);
             }
-        } finally {
-            rwLock.readLock().unlock();
         }
 
         if (currentRevision != 0) {
             Set<UpdateEntriesEvent> updateEntriesEvents = 
collectUpdateEntriesEventsFromStorage(startRevision, currentRevision);
             Set<UpdateOnlyRevisionEvent> updateOnlyRevisionEvents = 
collectUpdateRevisionEventsFromStorage(startRevision, currentRevision);
 
-            rwLock.writeLock().lock();
-
-            try {
+            synchronized (watchProcessorMutex) {
                 
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateEntriesEvents);
                 // Adds events for which there were no entries updates but the 
revision was updated.
                 
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateOnlyRevisionEvents);
@@ -987,8 +935,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 drainNotifyWatchProcessorEventsBeforeStartingWatches();
 
                 recoveryStatus.set(RecoveryStatus.DONE);
-            } finally {
-                rwLock.writeLock().unlock();
             }
         }
     }
@@ -1001,10 +947,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             compactKeys(revision);
 
             compactAuxiliaryMappings(revision);
-
-            // Compaction might have created a lot of tombstones in column 
families, which affect scan speed. Removing them makes next
-            // compaction faster, as well as other scans in general.
-            db.compactRange();
         } catch (Throwable t) {
             throw new MetaStorageException(COMPACTION_ERR, "Error during 
compaction: " + revision, t);
         }
@@ -1041,12 +983,13 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
      * Compacts the key, see the documentation of {@link 
KeyValueStorage#compact} for examples.
      *
      * @param batch Write batch.
+     * @param batchKeys A list of keys, in which this method will add keys 
that it compacted.
      * @param key Target key.
      * @param revs Key revisions.
      * @param compactionRevision Revision up to which (inclusively) the key 
will be compacted.
      * @throws MetaStorageException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long 
compactionRevision) {
+    private void compactForKey(WriteBatch batch, List<byte[]> batchKeys, 
byte[] key, long[] revs, long compactionRevision) {
         try {
             int indexToCompact = indexToCompact(revs, compactionRevision, 
revision -> isTombstoneForCompaction(key, revision));
 
@@ -1054,6 +997,8 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 return;
             }
 
+            batchKeys.add(key);
+
             for (int revisionIndex = 0; revisionIndex <= indexToCompact; 
revisionIndex++) {
                 // This revision is not needed anymore, remove data.
                 data.delete(batch, keyToRocksKey(revs[revisionIndex], key));
@@ -1290,13 +1235,12 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             rocksIterator.status();
 
-            byte[] tsValue = rocksIterator.value();
-
-            if (tsValue.length == 0) {
+            if (!rocksIterator.isValid()) {
+                // No previous value means that it got deleted, or never 
existed in the first place.
                 throw new CompactedException("Revisions less than or equal to 
the requested one are already compacted: " + timestamp);
             }
 
-            return bytesToLong(tsValue);
+            return bytesToLong(rocksIterator.value());
         } catch (RocksDBException e) {
             throw new MetaStorageException(OP_EXECUTION_ERR, e);
         }
@@ -1361,7 +1305,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             addIndexAndTermToWriteBatch(batch, context);
 
-            db.write(writeOptions, batch);
+            writeBatch(batch);
 
             if (advanceSafeTime && areWatchesStarted()) {
                 watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
@@ -1384,18 +1328,17 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public ChecksumAndRevisions checksumAndRevisions(long revision) {
-        rwLock.readLock().lock();
-
         try {
+            // "rev" is the last thing that gets updated, so it has to be read 
first.
+            long currentRevision = rev;
+
             return new ChecksumAndRevisions(
                     checksumByRevisionOrZero(revision),
                     minChecksummedRevisionOrZero(),
-                    rev
+                    currentRevision
             );
         } catch (RocksDBException e) {
             throw new MetaStorageException(INTERNAL_ERR, "Cannot get checksum 
by revision: " + revision, e);
-        } finally {
-            rwLock.readLock().unlock();
         }
     }
 
@@ -1418,8 +1361,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public void clear() {
-        rwLock.readLock().lock();
-
         try {
             // There's no way to easily remove all data from RocksDB, so we 
need to re-create it from scratch.
             closeRocksResources();
@@ -1434,71 +1375,80 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             createDb();
         } catch (Exception e) {
             throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to 
restore snapshot", e);
-        } finally {
-            rwLock.readLock().unlock();
         }
     }
 
     private void compactKeys(long compactionRevision) throws RocksDBException {
-        compactInBatches(index, (key, value, batch) -> {
-            compactForKey(batch, key, getAsLongs(value.get()), 
compactionRevision);
+        assertCompactionRevisionLessThanCurrent(this.compactionRevision, rev);
 
-            return true;
-        });
-    }
-
-    private void compactAuxiliaryMappings(long compactionRevision) throws 
RocksDBException {
-        compactInBatches(revisionToTs, (key, value, batch) -> {
-            long revision = bytesToLong(key);
+        // Clear bloom filter before opening iterator, so that we don't have 
collisions right from the start.
+        synchronized (writeBatchProtector) {
+            writeBatchProtector.clear();
+        }
 
-            if (revision > compactionRevision) {
-                return false;
-            }
+        if (!busyLock.enterBusy()) {
+            return;
+        }
 
-            revisionToTs.delete(batch, key);
-            tsToRevision.delete(batch, value.get());
+        try (RocksIterator iterator = index.newIterator()) {
+            byte[] key = null;
+            List<byte[]> batchKeys = new ArrayList<>(COMPACT_BATCH_SIZE);
 
-            revisionToChecksum.delete(batch, key);
+            iterator.seekToFirst();
+            while (iterator.isValid()) {
+                try (WriteBatch batch = new WriteBatch()) {
+                    byte[] retryPositionKey = key;
 
-            return true;
-        });
-    }
+                    batchKeys.clear();
+                    for (int i = 0; i < COMPACT_BATCH_SIZE && 
iterator.isValid(); i++, iterator.next()) {
+                        if (stopCompaction.get()) {
+                            return;
+                        }
 
-    @FunctionalInterface
-    private interface CompactionAction {
-        /**
-         * Performs compaction on the storage at the current iterator pointer. 
Returns {@code true} if it is necessary to continue
-         * iterating, {@link false} if it is necessary to finish with writing 
the last batch.
-         */
-        boolean compact(byte[] key, Supplier<byte[]> value, WriteBatch batch) 
throws RocksDBException;
-    }
+                        // Throw an exception if something went wrong.
+                        iterator.status();
 
-    private void compactInBatches(ColumnFamily columnFamily, CompactionAction 
compactionAction) throws RocksDBException {
-        try (RocksIterator iterator = columnFamily.newIterator()) {
-            boolean continueIterating = true;
+                        key = iterator.key();
 
-            byte[] key = null;
+                        compactForKey(batch, batchKeys, key, 
getAsLongs(iterator.value()), compactionRevision);
+                    }
 
-            while (continueIterating) {
-                rwLock.writeLock().lock();
+                    if (!writeCompactedBatch(batchKeys, batch)) {
+                        key = retryPositionKey;
 
-                try (WriteBatch batch = new WriteBatch()) {
-                    // We must refresh the iterator while holding write lock, 
because iterator state might be outdated due to its snapshot
-                    // isolation.
-                    if (!refreshIterator(iterator, key)) {
-                        break;
+                        // Refreshing the iterator is absolutely crucial. We 
have determined that data has been modified externally,
+                        // current snapshot in the iterator is invalid.
+                        refreshIterator(iterator, key);
                     }
+                }
+            }
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private void compactAuxiliaryMappings(long compactionRevision) throws 
RocksDBException {
+        assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
 
-                    
assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try (RocksIterator iterator = revisionToTs.newIterator()) {
+            boolean continueIterating = true;
+            iterator.seekToFirst();
 
+            while (continueIterating && iterator.isValid()) {
+                try (WriteBatch batch = new WriteBatch()) {
                     for (int i = 0; i < COMPACT_BATCH_SIZE && 
iterator.isValid(); i++, iterator.next()) {
                         if (stopCompaction.get()) {
                             return;
                         }
 
-                        key = iterator.key();
+                        // Throw an exception if something went wrong.
+                        iterator.status();
 
-                        if (!compactionAction.compact(key, iterator::value, 
batch)) {
+                        if (!deleteAuxiliaryMapping(compactionRevision, 
iterator, batch)) {
                             continueIterating = false;
 
                             break;
@@ -1506,16 +1456,103 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                     }
 
                     db.write(writeOptions, batch);
-                } finally {
-                    rwLock.writeLock().unlock();
                 }
             }
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private boolean deleteAuxiliaryMapping(long compactionRevision, 
RocksIterator iterator, WriteBatch batch) throws RocksDBException {
+        byte[] key = iterator.key();
+        long revision = bytesToLong(key);
 
-            iterator.status();
+        if (revision > compactionRevision) {
+            return false;
         }
+
+        revisionToTs.delete(batch, key);
+        tsToRevision.delete(batch, iterator.value());
+
+        revisionToChecksum.delete(batch, key);
+
+        return true;
     }
 
-    private static boolean refreshIterator(RocksIterator iterator, byte 
@Nullable [] key) throws RocksDBException {
+    /**
+     * Writes the batch to the database in the {@code FSM} thread. By the time 
this method is called, {@link #updatedEntries} must be in a
+     * state that corresponds to the current batch. This collection will be 
used to mark the entries as updated in
+     * {@link #writeBatchProtector}.
+     *
+     * <p>No other actions besides calling {@link 
WriteBatchProtector#onUpdate(byte[])} and {@link RocksDB#write(WriteOptions, 
WriteBatch)}
+     * are performed here. They're both executed while holding {@link 
#writeBatchProtector}'s monitor in order to synchronise
+     * {@link #writeBatchProtector} modifications.
+     *
+     * <p>Since there's a gap between reading the data and writing batch into 
the storage, it is possible that compaction thread had
+     * concurrently updated the data that we're modifying. The only real side 
effect of such a race will be a presence of already deleted
+     * revisions in revisions list, associated with any particular key (see 
{@link RocksDbKeyValueStorage#keyRevisionsForOperation(byte[])}.
+     *
+     * <p>We ignore this side effect, because retrying the operation in {@code 
FSM} thread is too expensive, and because there's really no
+     * harm in having some obsolete revisions there. We always keep in mind 
that a particular revision might already be compacted when we
+     * read data.
+     *
+     * @param batch RockDB's {@link WriteBatch}.
+     * @throws RocksDBException If {@link RocksDB#write(WriteOptions, 
WriteBatch)} threw an exception.
+     * @see #writeCompactedBatch(List, WriteBatch)
+     */
+    private void writeBatch(WriteBatch batch) throws RocksDBException {
+        synchronized (writeBatchProtector) {
+            for (Entry updatedEntry : updatedEntries.updatedEntries) {
+                writeBatchProtector.onUpdate(updatedEntry.key());
+            }
+
+            db.write(writeOptions, batch);
+        }
+    }
+
+    /**
+     * Writes the batch to the database in the compaction thread.
+     *
+     * <p>No other actions besides accessing {@link #writeBatchProtector} and 
calling {@link RocksDB#write(WriteOptions, WriteBatch)}
+     * are performed here. They're both executed while holding {@link 
#writeBatchProtector}'s monitor in order to synchronise potential
+     * {@link #writeBatchProtector} modifications.
+     *
+     * <p>Since there's a gap between reading the data and writing batch into 
the storage, it is possible that {@code FSM} thread had
+     * concurrently updated the data that we're compacting. Such a race would 
mean that there are unaccounted revisions in revisions list,
+     * associated with keys from {@code batchKeys} (see {@link 
RocksDbKeyValueStorage#keyRevisionsForOperation(byte[])}.
+     *
+     * <p>For that reason, unlike {@link #writeBatch(WriteBatch)}, here we 
don't have a privilege of just writing the batch in case of race.
+     * It would lead to data loss. In order to avoid it, we probabilistically 
check if data that we're compacting has been modified
+     * concurrently. If it certainly wasn't, we proceed and return {@code 
true}. If we're not sure, we abort this batch and return
+     * {@code false}.
+     *
+     * <p>If we return {@code false}, we also clear the {@link 
#writeBatchProtector} to avoid blocking the next batch. It is important to
+     * remember that we'll get false-negative results sometimes if we have 
hash collisions. That's fine.
+     *
+     * @param batchKeys Meta-storage keys that have been compacted in this 
batch.
+     * @param batch RockDB's {@link WriteBatch}.
+     * @return {@code true} if writing succeeded, {@code false} if compaction 
round must be retried due to concurrent storage update.
+     * @throws RocksDBException If {@link RocksDB#write(WriteOptions, 
WriteBatch)} threw an exception.
+     *
+     * @see #writeBatch(WriteBatch)
+     */
+    private boolean writeCompactedBatch(List<byte[]> batchKeys, WriteBatch 
batch) throws RocksDBException {
+        synchronized (writeBatchProtector) {
+            for (byte[] key : batchKeys) {
+                if (writeBatchProtector.maybeUpdated(key)) {
+                    writeBatchProtector.clear();
+
+                    return false;
+                }
+            }
+
+            db.write(writeOptions, batch);
+        }
+
+        return true;
+    }
+
+    private static void refreshIterator(RocksIterator iterator, byte @Nullable 
[] key) throws RocksDBException {
         iterator.refresh();
 
         if (key == null) {
@@ -1535,8 +1572,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         // Check for errors.
         iterator.status();
-
-        return iterator.isValid();
     }
 
     private boolean isTombstone(byte[] key, long revision) throws 
RocksDBException {
@@ -1575,11 +1610,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     protected Value valueForOperation(byte[] key, long revision) {
-        Value value = getValueForOperationNullable(key, revision);
-
-        assert value != null : "key=" + toUtf8String(key) + ", revision=" + 
revision;
-
-        return value;
+        return getValueForOperation(key, revision);
     }
 
     @Override
@@ -1587,13 +1618,18 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         return recoveryStatus.get() == RecoveryStatus.DONE;
     }
 
-    private @Nullable Value getValueForOperationNullable(byte[] key, long 
revision) {
+    private Value getValueForOperation(byte[] key, long revision) {
         try {
             byte[] valueBytes = data.get(keyToRocksKey(revision, key));
 
-            assert valueBytes != null && valueBytes.length != 0 : "key=" + 
toUtf8String(key) + ", revision=" + revision;
+            if (valueBytes == null) {
+                assert revision <= compactionRevision
+                        : "key=" + toUtf8String(key) + ", revision=" + 
revision + ", compactionRevision=" + compactionRevision;
 
-            return ArrayUtils.nullOrEmpty(valueBytes) ? null : 
bytesToValue(valueBytes);
+                throw new CompactedException(revision, compactionRevision);
+            }
+
+            return bytesToValue(valueBytes);
         } catch (RocksDBException e) {
             throw new MetaStorageException(
                     OP_EXECUTION_ERR,
@@ -1606,11 +1642,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, 
long revUpperBound) {
         assert revUpperBound >= 0 : revUpperBound;
 
-        
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
 compactionRevision);
-
-        long readOperationId = 
readOperationForCompactionTracker.generateReadOperationId();
-        readOperationForCompactionTracker.track(readOperationId, 
revUpperBound);
-
         var readOpts = new ReadOptions();
 
         Slice upperBound = keyTo == null ? null : new Slice(keyTo);
@@ -1621,6 +1652,16 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         iterator.seek(keyFrom);
 
+        TrackingToken token;
+        try {
+            token = readOperationForCompactionTracker.track(revUpperBound, 
this::revision, this::getCompactionRevision);
+        } catch (Throwable e) {
+            // Revision might have been compacted already, in which case we 
must free the resources.
+            RocksUtils.closeAll(iterator, upperBound, readOpts);
+
+            throw e;
+        }
+
         return new RocksIteratorAdapter<>(iterator) {
             /** Cached entry used to filter "empty" values. */
             private @Nullable Entry next;
@@ -1672,10 +1713,9 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 }
 
                 long revision = keyRevisions[maxRevisionIndex];
-                Value value = getValueForOperationNullable(key, revision);
+                Value value = getValueForOperation(key, revision);
 
-                // Value may be null if the compaction has removed it in 
parallel.
-                if (value == null || value.tombstone()) {
+                if (value.tombstone()) {
                     return EntryImpl.empty(key);
                 }
 
@@ -1684,7 +1724,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             @Override
             public void close() {
-                readOperationForCompactionTracker.untrack(readOperationId, 
revUpperBound);
+                token.close();
 
                 super.close();
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtector.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtector.java
new file mode 100644
index 00000000000..1def45f50fa
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.util.HashUtils;
+
+/**
+ * Simple bloom-filter-based utility class to be used to cooperate compaction 
thread with an FSM thread. For a full explanation please
+ * consider reading comments in the {@link 
RocksDbKeyValueStorage#compact(long)} implementation.
+ */
+class WriteBatchProtector {
+    /**
+     * Signifies that there are 2^14 bits in this bloom filter, which is 
roughly equal to 16 thousand. This is a reasonably large value.
+     */
+    private static final int BLOOM_BITS = 14;
+    private static final int BLOOM_MASK = (1 << BLOOM_BITS) - 1;
+
+    // Constants for least significant bits (LSB) of hash values. These bits 
would point to individual bit positions in "long" values.
+    private static final int LSB_BITS = 
Integer.numberOfTrailingZeros(Long.SIZE);
+    private static final int LSB_MASK = Long.SIZE - 1;
+
+    /** Bit-set. */
+    private final long[] bloom = new long[1 << BLOOM_BITS >>> LSB_BITS];
+
+    /**
+     * Called when {@code key} is updated in the storage. Immediate 
consecutive call of {@code maybeUpdated(key)} will return {@code true}.
+     */
+    public void onUpdate(byte[] key) {
+        int h = hash(key);
+
+        int msb = indexInsideArray(h);
+        int lsb = indexInsideLongValue(h);
+        bloom[msb] |= singleBitInLong(lsb);
+    }
+
+    /**
+     * Checks if the {@code key} was probably updated in the storage. Returns 
{@code false} if it was definitely not updated. Returns
+     * {@code true} if the answer is unknown.
+     */
+    public boolean maybeUpdated(byte[] key) {
+        int h = hash(key);
+
+        int msb = indexInsideArray(h);
+        int lsb = indexInsideLongValue(h);
+        return (bloom[msb] & singleBitInLong(lsb)) != 0L;
+    }
+
+    private static int indexInsideArray(int h) {
+        return h >>> LSB_BITS;
+    }
+
+    private static int indexInsideLongValue(int h) {
+        return h & LSB_MASK;
+    }
+
+    private static long singleBitInLong(int lsb) {
+        return 1L << lsb;
+    }
+
+    /**
+     * Clears all the information about previous calls of {@link 
#onUpdate(byte[])}. All calls of {@link #maybeUpdated(byte[])} will return
+     * {@code false} after this call.
+     */
+    public void clear() {
+        Arrays.fill(bloom, 0L);
+    }
+
+    private static int hash(byte[] key) {
+        return HashUtils.hash32(key) & BLOOM_MASK;
+    }
+}
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index dbb6d8f19ed..374b3254085 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -535,6 +535,7 @@ public abstract class AbstractCompactionKeyValueStorageTest 
extends AbstractKeyV
         assertDoesNotThrowCompactedExceptionForGetSingleValue(BAR_KEY, 5);
 
         storage.setCompactionRevision(5);
+        // TODO IGNITE-25212 Consider expecting a tombstone here.
         assertThrowsCompactedExceptionForGetSingleValue(BAR_KEY, 5);
         assertDoesNotThrowCompactedExceptionForGetSingleValue(BAR_KEY, 6);
 
@@ -959,12 +960,50 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         }
     }
 
+    @Test
+    void testConcurrentUpdateAndCompaction() {
+        KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
+
+        for (int i = 0; i < 500; i++) {
+            byte[] key = key(i);
+            byte[] value = keyValue(i, i);
+
+            storage.put(key, value, context);
+            long revision = storage.revision();
+            storage.remove(key, context);
+
+            runRace(
+                    () -> {
+                        storage.setCompactionRevision(revision);
+                        storage.compact(revision);
+                    },
+                    () -> {
+                        // We update the same value in order to cause a race 
in "writeBatch" method, that would leave already compacted
+                        // revisions in a list of revisions associated with 
this key.
+                        storage.put(key, value, context);
+                    }
+            );
+
+            try {
+                Entry entry = storage.get(key, revision);
+
+                assertFalse(entry.empty());
+                assertEquals(revision, entry.revision());
+                assertArrayEquals(value, entry.value());
+            } catch (CompactedException ignore) {
+                // This is expected.
+            }
+
+            storage.remove(key, context);
+        }
+    }
+
     @Test
     void testConcurrentReadAllAndCompaction() {
         KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
 
         int numberOfKeys = 15;
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 800; i++) {
             List<byte[]> keys = new ArrayList<>();
             List<byte[]> values = new ArrayList<>();
             for (int j = 0; j < numberOfKeys; j++) {
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
index 9476d69d02a..62eb6c5d1aa 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
@@ -17,12 +17,19 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static 
org.apache.ignite.internal.metastorage.MetaStorageManager.LATEST_REVISION;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.Test;
 
 /** For {@link ReadOperationForCompactionTracker} testing. */
@@ -38,62 +45,114 @@ public class ReadOperationForCompactionTrackerTest {
 
     @Test
     void testTrackAndUntrack() {
-        UUID readOperation0 = UUID.randomUUID();
-        UUID readOperation1 = UUID.randomUUID();
+        assertThrows(CompactedException.class, () -> tracker.track(0, () -> 
0L, () -> 0L));
 
-        long operationRevision0 = 1;
-        long operationRevision1 = 2;
+        TrackingToken token1 = tracker.track(1, () -> 0L, () -> 0L);
 
-        assertDoesNotThrow(() -> tracker.track(readOperation0, 
operationRevision0));
-        assertDoesNotThrow(() -> tracker.track(readOperation0, 
operationRevision1));
-        assertDoesNotThrow(() -> tracker.track(readOperation1, 
operationRevision0));
-        assertDoesNotThrow(() -> tracker.track(readOperation1, 
operationRevision1));
-
-        assertDoesNotThrow(() -> tracker.untrack(readOperation0, 
operationRevision0));
-        assertDoesNotThrow(() -> tracker.untrack(readOperation0, 
operationRevision1));
-        assertDoesNotThrow(() -> tracker.untrack(readOperation1, 
operationRevision0));
-        assertDoesNotThrow(() -> tracker.untrack(readOperation1, 
operationRevision1));
-
-        // Let's check that after untrack we can do track again for the 
previous arguments.
-        assertDoesNotThrow(() -> tracker.track(readOperation0, 
operationRevision0));
-        assertDoesNotThrow(() -> tracker.untrack(readOperation0, 
operationRevision0));
+        token1.close();
     }
 
     @Test
     void testTrackUntrackAndCollect() {
-        UUID readOperation0 = UUID.randomUUID();
-        UUID readOperation1 = UUID.randomUUID();
-
-        long operationRevision0 = 1;
-        long operationRevision1 = 2;
-
-        tracker.track(readOperation0, operationRevision0);
-        tracker.track(readOperation1, operationRevision0);
+        TrackingToken token1 = tracker.track(1, () -> 0L, () -> 0L);
+        TrackingToken token2 = tracker.track(2, () -> 0L, () -> 0L);
 
         assertTrue(tracker.collect(0).isDone());
 
         CompletableFuture<Void> collectFuture1 = tracker.collect(1);
-        assertFalse(collectFuture1.isDone());
+        CompletableFuture<Void> collectFuture2 = tracker.collect(2);
 
-        tracker.untrack(readOperation0, operationRevision0);
         assertFalse(collectFuture1.isDone());
-
-        tracker.untrack(readOperation1, operationRevision0);
-        assertTrue(collectFuture1.isDone());
-
-        tracker.track(readOperation0, operationRevision1);
-        tracker.track(readOperation1, operationRevision1);
-
-        assertTrue(tracker.collect(0).isDone());
-        assertTrue(tracker.collect(1).isDone());
-
-        CompletableFuture<Void> collectFuture2 = tracker.collect(2);
         assertFalse(collectFuture2.isDone());
 
-        tracker.untrack(readOperation1, operationRevision1);
+        token1.close();
+        assertTrue(collectFuture1.isDone());
         assertFalse(collectFuture2.isDone());
 
-        tracker.untrack(readOperation0, operationRevision1);
+        token2.close();
+        assertTrue(collectFuture1.isDone());
         assertTrue(collectFuture2.isDone());
     }
+
+    /**
+     * Tests that concurrent update and compaction doesn't break reading the 
"latest" revision of data.
+     */
+    @Test
+    void testTrackLatestRevision() {
+        for (int i = 0; i < 10_000; i++) {
+            AtomicLong latestRevision = new AtomicLong(2);
+            AtomicLong compactedRevision = new AtomicLong(1);
+
+            AtomicReference<TrackingToken> token = new AtomicReference<>();
+
+            IgniteTestUtils.runRace(
+                    () -> token.set(tracker.track(LATEST_REVISION, 
latestRevision::get, compactedRevision::get)),
+                    () -> {
+                        latestRevision.set(3);
+                        compactedRevision.set(2);
+
+                        // This one is tricky. If we identify that there are 
no tracked futures after the moment of setting compaction
+                        // revision to "2", it should mean that concurrent 
tracking should end up creating the future for revision "3".
+                        // Here we validate that there are no data races for 
such a scenario, by re-checking the list of tracked futures.
+                        // If something has been added there by a concurrent 
thread, it should soon be completed before a retry.
+                        if (tracker.collect(2).isDone()) {
+                            assertThat(tracker.collect(2), willSucceedFast());
+                        }
+                    }
+            );
+
+            assertFalse(tracker.collect(3).isDone());
+
+            token.get().close();
+
+            assertTrue(tracker.collect(3).isDone());
+        }
+    }
+
+    /**
+     * Tests that concurrent compaction either leads to {@link 
CompactedException}, or just works. There should be no leaks or races.
+     */
+    @Test
+    void testConcurrentCompact() {
+        for (int i = 0; i < 10_000; i++) {
+            AtomicLong latestRevision = new AtomicLong(2);
+            AtomicLong compactedRevision = new AtomicLong(1);
+
+            AtomicReference<TrackingToken> token = new AtomicReference<>();
+
+            IgniteTestUtils.runRace(
+                    () -> {
+                        try {
+                            token.set(tracker.track(2, latestRevision::get, 
compactedRevision::get));
+                        } catch (CompactedException ignore) {
+                            // No-op.
+                        }
+                    },
+                    () -> {
+                        latestRevision.set(3);
+                        compactedRevision.set(2);
+
+                        // This one is tricky. If we identify that there are 
no tracked futures after the moment of setting compaction
+                        // revision to "2", it should mean that concurrent 
tracking should end up end up throwing CompactedException.
+                        // Here we validate that there are no data races for 
such a scenario, by re-checking the list of tracked futures.
+                        // If something has been added there by a concurrent 
thread, it should soon be completed before failing.
+                        if (tracker.collect(2).isDone()) {
+                            assertThat(tracker.collect(2), willSucceedFast());
+                        }
+                    }
+            );
+
+            if (token.get() == null) {
+                // CompactedException case.
+                assertTrue(tracker.collect(2).isDone());
+            } else {
+                // Successful tracking case.
+                assertFalse(tracker.collect(2).isDone());
+
+                token.get().close();
+
+                assertTrue(tracker.collect(2).isDone());
+            }
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtectorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtectorTest.java
new file mode 100644
index 00000000000..fa41e2ef7b6
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtectorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class WriteBatchProtectorTest {
+    private WriteBatchProtector protector;
+
+    @BeforeEach
+    void setUp() {
+        protector = new WriteBatchProtector();
+    }
+
+    @Test
+    void testUpdateKey() {
+        byte[] key = "testKey".getBytes();
+
+        protector.onUpdate(key);
+
+        assertTrue(protector.maybeUpdated(key), "Key should be present in the 
bloom filter");
+    }
+
+    @Test
+    void testNoUpdates() {
+        byte[] key = "nonExistentKey".getBytes();
+
+        assertFalse(protector.maybeUpdated(key), "Key should not be present in 
the bloom filter");
+    }
+
+    @Test
+    void testUpdateAndClear() {
+        byte[] key = "testKey".getBytes();
+
+        protector.onUpdate(key);
+        protector.clear();
+
+        assertFalse(protector.maybeUpdated(key), "Key should not be present 
after clearing the bloom filter");
+    }
+
+    @Test
+    void testUpdateMultipleKeys() {
+        byte[] key1 = "key1".getBytes();
+        byte[] key2 = "key2".getBytes();
+
+        protector.onUpdate(key1);
+        protector.onUpdate(key2);
+
+        assertTrue(protector.maybeUpdated(key1), "Key1 should be present in 
the bloom filter");
+        assertTrue(protector.maybeUpdated(key2), "Key2 should be present in 
the bloom filter");
+    }
+
+    @Test
+    void testNoCollision() {
+        byte[] key1 = "key1".getBytes();
+        byte[] key2 = "key2".getBytes();
+
+        protector.onUpdate(key1);
+
+        // No collision.
+        assertFalse(protector.maybeUpdated(key2), "Key2 should not be present 
unless explicitly added");
+    }
+}
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 00b19ec8648..804b07d3705 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -50,6 +50,8 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.CommandId;
@@ -60,6 +62,7 @@ import 
org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
 import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
@@ -70,6 +73,8 @@ import org.jetbrains.annotations.Nullable;
  * Simple in-memory key/value storage for tests.
  */
 public class SimpleInMemoryKeyValueStorage extends AbstractKeyValueStorage {
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
     /**
      * Keys index. Value is the list of all revisions under which entry 
corresponding to the key was modified.
      *
@@ -200,6 +205,61 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         }
     }
 
+    @Override
+    public Entry get(byte[] key) {
+        rwLock.readLock().lock();
+
+        try {
+            return super.get(key);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public Entry get(byte[] key, long revUpperBound) {
+        rwLock.readLock().lock();
+
+        try {
+            return super.get(key, revUpperBound);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public List<Entry> getAll(List<byte[]> keys) {
+        rwLock.readLock().lock();
+
+        try {
+            return super.getAll(keys);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+        rwLock.readLock().lock();
+
+        try {
+            return super.getAll(keys, revUpperBound);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public long revision() {
+        rwLock.readLock().lock();
+
+        try {
+            return rev;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
     @Override
     public void put(byte[] key, byte[] value, KeyValueUpdateContext context) {
         rwLock.writeLock().lock();
@@ -795,6 +855,17 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         updateRevision(curRev, context);
     }
 
+    @Override
+    public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context) {
+        rwLock.writeLock().lock();
+
+        try {
+            super.saveCompactionRevision(revision, context);
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
     @Override
     public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context, boolean advanceSafeTime) {
         savedCompactionRevision = revision;
@@ -806,6 +877,28 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         }
     }
 
+    @Override
+    public void setCompactionRevision(long revision) {
+        rwLock.writeLock().lock();
+
+        try {
+            super.setCompactionRevision(revision);
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void updateCompactionRevision(long compactionRevision, 
KeyValueUpdateContext context) {
+        rwLock.writeLock().lock();
+
+        try {
+            super.updateCompactionRevision(compactionRevision, context);
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
     @Override
     public long checksum(long revision) {
         throw new UnsupportedOperationException();
@@ -927,13 +1020,13 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
 
         Iterator<Entry> iterator = entries.iterator();
 
-        long readOperationId = 
readOperationForCompactionTracker.generateReadOperationId();
-        readOperationForCompactionTracker.track(readOperationId, 
revUpperBound);
+        //noinspection resource
+        TrackingToken token = 
readOperationForCompactionTracker.track(revUpperBound, this::revision, 
this::getCompactionRevision);
 
         return new Cursor<>() {
             @Override
             public void close() {
-                readOperationForCompactionTracker.untrack(readOperationId, 
revUpperBound);
+                token.close();
             }
 
             @Override


Reply via email to