This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f9abf076a4 IGNITE-23300 Add local tracking of completion of readings 
from metastorage to the required revision (#4572)
f9abf076a4 is described below

commit f9abf076a442f47d45606f1eabd82344d6d4ad43
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Oct 17 11:17:45 2024 +0300

    IGNITE-23300 Add local tracking of completion of readings from metastorage 
to the required revision (#4572)
---
 .../metastorage/MetaStorageCompactionManager.java  |  13 ++
 .../impl/ItMetaStorageManagerImplTest.java         | 167 +++++++++++++++++++--
 .../metastorage/impl/MetaStorageManagerImpl.java   | 135 ++++++++++++++---
 .../server/AbstractKeyValueStorage.java            |  27 ++--
 .../metastorage/server/KeyValueStorage.java        |  11 ++
 .../server/ReadOperationForCompactionTracker.java  | 147 ++++++++++++++++++
 .../server/persistence/RocksDbKeyValueStorage.java |   5 +
 .../AbstractCompactionKeyValueStorageTest.java     |  81 ++++++++++
 .../ReadOperationForCompactionTrackerTest.java     |  99 ++++++++++++
 .../server/SimpleInMemoryKeyValueStorage.java      |  29 +++-
 10 files changed, 660 insertions(+), 54 deletions(-)

diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
index 192f7eebdc..cd32f41346 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -80,4 +81,16 @@ public interface MetaStorageCompactionManager extends 
IgniteComponent {
      * @see #setCompactionRevisionLocally(long)
      */
     long getCompactionRevisionLocally();
+
+    /**
+     * Returns a future that will complete when all read operations (from 
leader and locally) that were started before
+     * {@code compactionRevisionExcluded} will be completed.
+     *
+     * <p>Should be invoked after {@link #setCompactionRevisionLocally} on the 
same revision.</p>
+     *
+     * <p>Future may complete with {@link NodeStoppingException} if the node 
is in the process of stopping.</p>
+     *
+     * @param compactionRevisionExcluded Compaction revision of interest.
+     */
+    CompletableFuture<Void> readOperationsFuture(long 
compactionRevisionExcluded);
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index a99e3b74d5..a122230d7a 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -35,6 +36,8 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -47,6 +50,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.stream.Stream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
@@ -65,6 +69,7 @@ import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
@@ -76,6 +81,7 @@ import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.DefaultMessagingService;
 import org.apache.ignite.internal.network.StaticNodeFinder;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
@@ -85,13 +91,19 @@ import 
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.ReadActionRequest;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Named;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 
 /**
@@ -99,6 +111,10 @@ import org.mockito.ArgumentCaptor;
  */
 @ExtendWith(ConfigurationExtension.class)
 public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
+    private static final ByteArray FOO_KEY = new ByteArray("foo");
+
+    private static final byte[] VALUE = "value".getBytes(UTF_8);
+
     private ClusterService clusterService;
 
     private Loza raftManager;
@@ -201,8 +217,6 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
      */
     @Test
     void testPrefixOverflow() {
-        byte[] value = "value".getBytes(UTF_8);
-
         var key1 = new ByteArray(new byte[]{1, (byte) 0xFF, 0});
         var key2 = new ByteArray(new byte[]{1, (byte) 0xFF, 1});
         var key3 = new ByteArray(new byte[]{1, (byte) 0xFF, (byte) 0xFF});
@@ -212,13 +226,13 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
         var key5 = new ByteArray(new byte[]{1, (byte) 0xFE, 1});
 
         CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
-                Conditions.notExists(new ByteArray("foo")),
+                Conditions.notExists(FOO_KEY),
                 List.of(
-                        Operations.put(key1, value),
-                        Operations.put(key2, value),
-                        Operations.put(key3, value),
-                        Operations.put(key4, value),
-                        Operations.put(key5, value)
+                        Operations.put(key1, VALUE),
+                        Operations.put(key2, VALUE),
+                        Operations.put(key3, VALUE),
+                        Operations.put(key4, VALUE),
+                        Operations.put(key5, VALUE)
                 ),
                 List.of(Operations.noop())
         );
@@ -238,7 +252,7 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
 
         assertThat(metaStorageManager.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-        CompletableFuture<Entry> fut = 
svc.get(ByteArray.fromString("ignored"));
+        CompletableFuture<Entry> fut = svc.get(FOO_KEY);
 
         assertThat(fut, willThrowFast(CancellationException.class));
     }
@@ -290,7 +304,7 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
 
         metaStorageManager.registerRevisionUpdateListener(listener);
 
-        assertThat(metaStorageManager.put(ByteArray.fromString("test"), 
"test".getBytes(UTF_8)), willSucceedFast());
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), willSucceedFast());
 
         // Watches are processed asynchronously.
         // Timeout is big just in case there's a GC pause. Test's duration 
doesn't really depend on it.
@@ -304,13 +318,10 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
      */
     @Test
     void 
testIdleSafeTimePropagationAndNormalSafeTimePropagationInteraction(TestInfo 
testInfo) {
-        var key = new ByteArray("foo");
-        byte[] value = "bar".getBytes(UTF_8);
-
         AtomicBoolean watchCompleted = new AtomicBoolean(false);
         CompletableFuture<HybridTimestamp> watchEventTsFuture = new 
CompletableFuture<>();
 
-        metaStorageManager.registerExactWatch(key, new WatchListener() {
+        metaStorageManager.registerExactWatch(FOO_KEY, new WatchListener() {
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 watchEventTsFuture.complete(event.timestamp());
@@ -326,7 +337,7 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
             }
         });
 
-        metaStorageManager.put(key, value);
+        metaStorageManager.put(FOO_KEY, VALUE);
 
         ClusterTime clusterTime = metaStorageManager.clusterTime();
 
@@ -338,6 +349,132 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
         assertThat("Safe time is advanced too early", watchCompleted.get(), 
is(true));
     }
 
+    @Test
+    void testReadOperationsFutureWithoutReadOperations() {
+        assertTrue(metaStorageManager.readOperationsFuture(0).isDone());
+        assertTrue(metaStorageManager.readOperationsFuture(1).isDone());
+    }
+
+    /**
+     * Tests {@link MetaStorageManagerImpl#readOperationsFuture} as expected 
in use.
+     * <ul>
+     *     <li>Creates read operations from the leader and local ones.</li>
+     *     <li>Set a new compaction revision via {@link 
MetaStorageManagerImpl#setCompactionRevisionLocally}.</li>
+     *     <li>Wait for the completion of read operations on the new 
compaction revision.</li>
+     * </ul>
+     *
+     * <p>Due to the difficulty of testing all reading from leader methods at 
once, we test each of them separately.</p>
+     */
+    @ParameterizedTest
+    @MethodSource("readFromLeaderOperations")
+    public void testReadOperationsFuture(ReadFromLeaderAction 
readFromLeaderAction) {
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        var startSendReadActionRequestFuture = new CompletableFuture<Void>();
+        var continueSendReadActionRequestFuture = new 
CompletableFuture<Void>();
+
+        listenReadActionRequest(startSendReadActionRequestFuture, 
continueSendReadActionRequestFuture);
+
+        CompletableFuture<?> readFromLeaderOperationFuture = 
readFromLeaderAction.read(metaStorageManager, FOO_KEY);
+        Cursor<Entry> getLocallyCursor = 
metaStorageManager.getLocally(FOO_KEY, FOO_KEY, 5);
+
+        assertThat(startSendReadActionRequestFuture, 
willCompleteSuccessfully());
+
+        metaStorageManager.setCompactionRevisionLocally(1);
+
+        CompletableFuture<Void> readOperationsFuture = 
metaStorageManager.readOperationsFuture(1);
+        assertFalse(readOperationsFuture.isDone());
+
+        getLocallyCursor.close();
+        assertFalse(readOperationsFuture.isDone());
+
+        continueSendReadActionRequestFuture.complete(null);
+        assertThat(readOperationsFuture, willCompleteSuccessfully());
+        assertThat(readFromLeaderOperationFuture, willCompleteSuccessfully());
+    }
+
+    /**
+     * Tests that read operations from the leader and local ones created after 
{@link MetaStorageManagerImpl#setCompactionRevisionLocally}
+     * will not affect future from {@link 
MetaStorageManagerImpl#readOperationsFuture} on a new compaction revision.
+     *
+     * <p>Due to the difficulty of testing all reading from leader methods at 
once, we test each of them separately.</p>
+     */
+    @ParameterizedTest
+    @MethodSource("readFromLeaderOperations")
+    void 
testReadOperationsFutureForReadOperationAfterSetCompactionRevision(ReadFromLeaderAction
 readFromLeaderAction) throws Exception {
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        metaStorageManager.setCompactionRevisionLocally(1);
+
+        var startSendReadActionRequestFuture = new CompletableFuture<Void>();
+        var continueSendReadActionRequestFuture = new 
CompletableFuture<Void>();
+
+        listenReadActionRequest(startSendReadActionRequestFuture, 
continueSendReadActionRequestFuture);
+
+        CompletableFuture<?> readFromLeaderOperationFuture = 
readFromLeaderAction.read(metaStorageManager, FOO_KEY);
+        Cursor<Entry> getLocallyCursor = 
metaStorageManager.getLocally(FOO_KEY, FOO_KEY, 5);
+
+        assertThat(startSendReadActionRequestFuture, 
willCompleteSuccessfully());
+
+        assertTrue(metaStorageManager.readOperationsFuture(1).isDone());
+
+        getLocallyCursor.close();
+
+        continueSendReadActionRequestFuture.complete(null);
+        assertThat(readFromLeaderOperationFuture, willCompleteSuccessfully());
+    }
+
+    @FunctionalInterface
+    private interface ReadFromLeaderAction {
+        CompletableFuture<?> read(MetaStorageManager metastore, ByteArray key);
+
+        static ReadFromLeaderAction readAsync(ReadFromLeaderAction 
readFromLeaderAction) {
+            return (metastore, key) -> runAsync(() -> 
readFromLeaderAction.read(metastore, key)).thenCompose(Function.identity());
+        }
+    }
+
+    private static List<Arguments> readFromLeaderOperations() {
+        return List.of(
+                Arguments.of(Named.named(
+                        "getSingleLatest",
+                        ReadFromLeaderAction.readAsync(MetaStorageManager::get)
+                )),
+                Arguments.of(Named.named(
+                        "getSingleBounded",
+                        ReadFromLeaderAction.readAsync((metastore, key) -> 
metastore.get(key, 2))
+                )),
+                Arguments.of(Named.named(
+                        "getAllLatest",
+                        ReadFromLeaderAction.readAsync((metastore, key) -> 
metastore.getAll(Set.of(key)))
+                )),
+                Arguments.of(Named.named(
+                        "prefixLatest",
+                        ReadFromLeaderAction.readAsync((metastore, key) -> 
subscribeToList(metastore.prefix(key)))
+                )),
+                Arguments.of(Named.named(
+                        "prefixBounded",
+                        ReadFromLeaderAction.readAsync((metastore, key) -> 
subscribeToList(metastore.prefix(key, 3)))
+                ))
+        );
+    }
+
+    private void listenReadActionRequest(
+            CompletableFuture<Void> startSendReadActionRequestFuture,
+            CompletableFuture<Void> continueSendReadActionRequestFuture
+    ) {
+        ((DefaultMessagingService) 
clusterService.messagingService()).dropMessages((recipientConsistentId, 
message) -> {
+            if (message instanceof ReadActionRequest) {
+                startSendReadActionRequestFuture.complete(null);
+
+                assertThat(continueSendReadActionRequestFuture, 
willCompleteSuccessfully());
+            }
+
+            return false;
+        });
+    }
+
     private static CompletableFuture<Void> waitFor(int timeout, TimeUnit unit) 
{
         return new CompletableFuture<Void>()
                 .orTimeout(timeout, unit)
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 580dcb1f44..4ea86aac0c 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.Collections.emptySet;
 import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -37,10 +38,13 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongConsumer;
+import java.util.function.Supplier;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.MetaStorageInfo;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -71,6 +75,7 @@ import 
org.apache.ignite.internal.metastorage.impl.raft.MetaStorageSnapshotStora
 import org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
+import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -187,6 +192,12 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
      */
     private final AtomicReference<IndexWithTerm> lastHandledIndexWithTerm = 
new AtomicReference<>(new IndexWithTerm(0, 0));
 
+    /** Tracks only reads from the leader, local reads are tracked by the 
storage itself. */
+    private final ReadOperationForCompactionTracker 
readOperationFromLeaderForCompactionTracker = new 
ReadOperationForCompactionTracker();
+
+    /** Read operation ID generator for {@link 
#readOperationIdGeneratorForTracker}. */
+    private final AtomicLong readOperationIdGeneratorForTracker = new 
AtomicLong();
+
     /**
      * The constructor.
      *
@@ -739,12 +750,18 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     @Override
     public CompletableFuture<Entry> get(ByteArray key) {
-        return inBusyLockAsync(busyLock, () -> 
metaStorageSvcFut.thenCompose(svc -> svc.get(key)));
+        return inBusyLockAsync(
+                busyLock,
+                () -> withTrackReadOperationFromLeaderFuture(() -> 
metaStorageSvcFut.thenCompose(svc -> svc.get(key)))
+        );
     }
 
     @Override
     public CompletableFuture<Entry> get(ByteArray key, long revUpperBound) {
-        return inBusyLockAsync(busyLock, () -> 
metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound)));
+        return inBusyLockAsync(
+                busyLock,
+                () -> withTrackReadOperationFromLeaderFuture(() -> 
metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound)))
+        );
     }
 
     @Override
@@ -779,7 +796,10 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     @Override
     public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> 
keys) {
-        return inBusyLock(busyLock, () -> metaStorageSvcFut.thenCompose(svc -> 
svc.getAll(keys)));
+        return inBusyLock(
+                busyLock,
+                () -> withTrackReadOperationFromLeaderFuture(() -> 
metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys)))
+        );
     }
 
     /**
@@ -893,24 +913,6 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         }
     }
 
-    /**
-     * Retrieves entries for the given key range in lexicographic order. 
Entries will be filtered out by upper bound of given revision
-     * number.
-     *
-     * @see MetaStorageService#range(ByteArray, ByteArray, long)
-     */
-    public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray 
keyTo, long revUpperBound) {
-        if (!busyLock.enterBusy()) {
-            return new NodeStoppingPublisher<>();
-        }
-
-        try {
-            return new 
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> 
svc.range(keyFrom, keyTo, revUpperBound)));
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     @Override
     public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray 
keyTo) {
         if (!busyLock.enterBusy()) {
@@ -918,7 +920,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         }
 
         try {
-            return new 
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> 
svc.range(keyFrom, keyTo, false)));
+            return withTrackReadOperationFromLeaderPublisher(
+                    () -> new 
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> 
svc.range(keyFrom, keyTo, false)))
+            );
         } finally {
             busyLock.leaveBusy();
         }
@@ -936,7 +940,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         }
 
         try {
-            return new 
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> 
svc.prefix(keyPrefix, revUpperBound)));
+            return withTrackReadOperationFromLeaderPublisher(
+                    () -> new 
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> 
svc.prefix(keyPrefix, revUpperBound)))
+            );
         } finally {
             busyLock.leaveBusy();
         }
@@ -987,7 +993,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
                 throw new CompletionException(e);
             }
 
-            assert indexWithTerm != null : "Attempt to get index and term when 
Raft node is not started yet or already stopped)";
+            assert indexWithTerm != null : "Attempt to get index and term when 
Raft node is not started yet or already stopped): " + nodeId;
 
             return indexWithTerm;
         }));
@@ -1135,4 +1141,85 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     public long getCompactionRevisionLocally() {
         return inBusyLock(busyLock, storage::getCompactionRevision);
     }
+
+    @Override
+    public CompletableFuture<Void> readOperationsFuture(long 
compactionRevisionExcluded) {
+        return inBusyLock(
+                busyLock,
+                () -> allOf(
+                        
readOperationFromLeaderForCompactionTracker.collect(compactionRevisionExcluded),
+                        
storage.readOperationsFuture(compactionRevisionExcluded)
+                )
+        );
+    }
+
+    private <T> CompletableFuture<T> 
withTrackReadOperationFromLeaderFuture(Supplier<CompletableFuture<T>> 
readFromLeader) {
+        long readOperationId = 
readOperationIdGeneratorForTracker.getAndIncrement();
+        long compactionRevision = storage.getCompactionRevision();
+
+        readOperationFromLeaderForCompactionTracker.track(readOperationId, 
compactionRevision);
+
+        try {
+            return readFromLeader.get().whenComplete(
+                    (t, throwable) -> 
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
compactionRevision)
+            );
+        } catch (Throwable t) {
+            
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
compactionRevision);
+
+            throw t;
+        }
+    }
+
+    private Publisher<Entry> 
withTrackReadOperationFromLeaderPublisher(Supplier<Publisher<Entry>> 
readFromLeader) {
+        long readOperationId = 
readOperationIdGeneratorForTracker.getAndIncrement();
+        long compactionRevision = storage.getCompactionRevision();
+
+        readOperationFromLeaderForCompactionTracker.track(readOperationId, 
compactionRevision);
+
+        try {
+            Publisher<Entry> publisherFromLeader = readFromLeader.get();
+
+            return subscriber -> publisherFromLeader.subscribe(new 
Subscriber<>() {
+                @Override
+                public void onSubscribe(Subscription subscription) {
+                    subscriber.onSubscribe(new Subscription() {
+                        @Override
+                        public void request(long n) {
+                            subscription.request(n);
+                        }
+
+                        @Override
+                        public void cancel() {
+                            
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
compactionRevision);
+
+                            subscription.cancel();
+                        }
+                    });
+                }
+
+                @Override
+                public void onNext(Entry item) {
+                    subscriber.onNext(item);
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
compactionRevision);
+
+                    subscriber.onError(throwable);
+                }
+
+                @Override
+                public void onComplete() {
+                    
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
compactionRevision);
+
+                    subscriber.onComplete();
+                }
+            });
+        } catch (Throwable t) {
+            
readOperationFromLeaderForCompactionTracker.untrack(readOperationId, 
compactionRevision);
+
+            throw t;
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index e694a9249a..70f66bd871 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
-import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /** Abstract implementation of {@link KeyValueStorage}. */
@@ -79,6 +78,16 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
 
     protected final AtomicBoolean stopCompaction = new AtomicBoolean();
 
+    /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
+    protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
+
+    /**
+     * Used to generate read operation ID for {@link 
#readOperationForCompactionTracker}.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
+    protected long readOperationIdGeneratorForTracker;
+
     /**
      * Constructor.
      *
@@ -150,17 +159,6 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
-        rwLock.readLock().lock();
-
-        try {
-            return range(keyFrom, keyTo, rev);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
     @Override
     public long revision() {
         rwLock.readLock().lock();
@@ -273,6 +271,11 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
         watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
     }
 
+    @Override
+    public CompletableFuture<Void> readOperationsFuture(long 
compactionRevisionExcluded) {
+        return 
readOperationForCompactionTracker.collect(compactionRevisionExcluded);
+    }
+
     /** Notifies of revision update. Must be called under the {@link #rwLock}. 
*/
     protected void notifyRevisionUpdate() {
         if (recoveryRevisionListener != null) {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3ca0f9f5fa..51ec2e44cd 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -538,6 +538,17 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      */
     long getCompactionRevision();
 
+    /**
+     * Returns a future that will complete when all read operations that were 
started before {@code compactionRevisionExcluded}.
+     *
+     * <p>Current method is expected to be invoked after {@link 
#setCompactionRevision} on the same revision.</p>
+     *
+     * <p>Future completes without exception.</p>
+     *
+     * @param compactionRevisionExcluded Compaction revision of interest.
+     */
+    CompletableFuture<Void> readOperationsFuture(long 
compactionRevisionExcluded);
+
     /**
      * Returns checksum corresponding to the revision.
      *
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
new file mode 100644
index 0000000000..f68c4efbb2
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.metastorage.MetaStorageCompactionManager;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.CompletableFutures;
+
+/**
+ * Tracker of read operations from metastorage or its storage. Used to track 
the completion of read operations before start local
+ * compaction of metastorage.
+ *
+ * <p>Expected usage:</p>
+ * <ul>
+ *     <li>Before starting execution, the reading command invoke {@link 
#track} with its ID and the compaction revision that is currently
+ *     set ({@link 
MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link 
KeyValueStorage#setCompactionRevision}).</li>
+ *     <li>After completion, the reading command will invoke {@link #untrack} 
with the same arguments as when calling {@link #track},
+ *     regardless of whether the operation was successful or not.</li>
+ *     <li>{@link #collect} will be invoked only after a new compaction 
revision has been set
+ *     ({@link 
MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link 
KeyValueStorage#setCompactionRevision}) for a new
+ *     compaction revision.</li>
+ * </ul>
+ */
+public class ReadOperationForCompactionTracker {
+    private final Map<ReadOperationKey, CompletableFuture<Void>> 
readOperationFutureByKey = new ConcurrentHashMap<>();
+
+    /**
+     * Starts tracking the completion of a read operation on the current 
compaction revision.
+     *
+     * <p>Method is expected not to be called more than once for the same 
arguments.</p>
+     *
+     * <p>Expected usage pattern:</p>
+     * <pre><code>
+     *     Object readOperationId = ...;
+     *     int compactionRevision = ...;
+     *
+     *     tracker.track(readOperationId, compactionRevision);
+     *
+     *     try {
+     *         doReadOperation(...);
+     *     } finally {
+     *         tracker.untrack(readOperationId, compactionRevision);
+     *     }
+     * </code></pre>
+     *
+     * @see #untrack(Object, long)
+     */
+    public void track(Object readOperationId, long compactionRevision) {
+        var key = new ReadOperationKey(readOperationId, compactionRevision);
+
+        CompletableFuture<Void> previous = 
readOperationFutureByKey.putIfAbsent(key, new CompletableFuture<>());
+
+        assert previous == null : key;
+    }
+
+    /**
+     * Stops tracking the read operation on the compaction revision on which 
tracking start.
+     *
+     * <p>Method is expected not to be called more than once for the same 
arguments, and {@link #track} was previously called for same
+     * arguments.</p>
+     *
+     * @see #track(Object, long)
+     */
+    public void untrack(Object readOperationId, long compactionRevision) {
+        var key = new ReadOperationKey(readOperationId, compactionRevision);
+
+        CompletableFuture<Void> removed = readOperationFutureByKey.remove(key);
+
+        assert removed != null : key;
+
+        removed.complete(null);
+    }
+
+    /**
+     * Collects all read operations that were started before {@code 
compactionRevisionExcluded} and returns a future that will complete
+     * when all collected operations complete.
+     *
+     * <p>Future completes without exception.</p>
+     */
+    public CompletableFuture<Void> collect(long compactionRevisionExcluded) {
+        return readOperationFutureByKey.entrySet().stream()
+                .filter(entry -> entry.getKey().compactionRevision < 
compactionRevisionExcluded)
+                .map(Entry::getValue)
+                .collect(collectingAndThen(toList(), 
CompletableFutures::allOf));
+    }
+
+    private static class ReadOperationKey {
+        @IgniteToStringInclude
+        private final Object readOperationId;
+
+        private final long compactionRevision;
+
+        private ReadOperationKey(Object readOperationId, long 
compactionRevision) {
+            this.readOperationId = readOperationId;
+            this.compactionRevision = compactionRevision;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            ReadOperationKey that = (ReadOperationKey) o;
+
+            return compactionRevision == that.compactionRevision && 
readOperationId.equals(that.readOperationId);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = readOperationId.hashCode();
+            result = 31 * result + (int) (compactionRevision ^ 
(compactionRevision >>> 32));
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(this);
+        }
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index cb49b034e1..eef6e16fdb 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1464,8 +1464,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         iterator.seek(keyFrom);
 
+        long readOperationId = readOperationIdGeneratorForTracker++;
         long compactionRevisionBeforeCreateCursor = compactionRevision;
 
+        readOperationForCompactionTracker.track(readOperationId, 
compactionRevision);
+
         return new RocksIteratorAdapter<>(iterator) {
             /** Cached entry used to filter "empty" values. */
             private @Nullable Entry next;
@@ -1529,6 +1532,8 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             @Override
             public void close() {
+                readOperationForCompactionTracker.untrack(readOperationId, 
compactionRevisionBeforeCreateCursor);
+
                 super.close();
 
                 RocksUtils.closeAll(readOpts, upperBound);
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index c549271e4b..749bda65b7 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -30,14 +30,17 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -52,6 +55,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -924,6 +928,83 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         }
     }
 
+    @Test
+    void testReadOperationsFutureWithoutReadOperations() {
+        assertTrue(storage.readOperationsFuture(0).isDone());
+        assertTrue(storage.readOperationsFuture(1).isDone());
+    }
+
+    /**
+     * Tests {@link KeyValueStorage#readOperationsFuture} as expected in use.
+     * <ul>
+     *     <li>Create read operations, we only need cursors since reading one 
entry or a batch is synchronized with
+     *     {@link KeyValueStorage#setCompactionRevision}.</li>
+     *     <li>Set a new compaction revision via {@link 
KeyValueStorage#setCompactionRevision}.</li>
+     *     <li>Wait for the completion of read operations on the new 
compaction revision.</li>
+     * </ul>
+     *
+     * <p>The keys are chosen randomly. Keys with their revisions are added in 
{@link #setUp()}.</p>
+     */
+    @Test
+    void testReadOperationsFuture() throws Exception {
+        Cursor<Entry> range0 = storage.range(FOO_KEY, FOO_KEY);
+        Cursor<Entry> range1 = storage.range(BAR_KEY, BAR_KEY, 5);
+
+        try {
+            storage.setCompactionRevision(3);
+
+            CompletableFuture<Void> readOperationsFuture = 
storage.readOperationsFuture(3);
+            assertFalse(readOperationsFuture.isDone());
+
+            range0.stream().forEach(entry -> {});
+            assertFalse(readOperationsFuture.isDone());
+
+            range1.stream().forEach(entry -> {});
+            assertFalse(readOperationsFuture.isDone());
+
+            range0.close();
+            assertFalse(readOperationsFuture.isDone());
+
+            range1.close();
+            assertTrue(readOperationsFuture.isDone());
+        } catch (Throwable t) {
+            IgniteUtils.closeAll(range0, range1);
+        }
+    }
+
+    /**
+     * Tests that cursors created after {@link 
KeyValueStorage#setCompactionRevision} will not affect future from
+     * {@link KeyValueStorage#readOperationsFuture} on a new compaction 
revision.
+     */
+    @Test
+    void testReadOperationsFutureForReadOperationAfterSetCompactionRevision() 
throws Exception {
+        Cursor<Entry> rangeBeforeSetCompactionRevision = 
storage.range(FOO_KEY, FOO_KEY);
+        Cursor<Entry> rangeAfterSetCompactionRevision0 = null;
+        Cursor<Entry> rangeAfterSetCompactionRevision1 = null;
+
+        try {
+            storage.setCompactionRevision(3);
+
+            rangeAfterSetCompactionRevision0 = storage.range(FOO_KEY, FOO_KEY);
+            rangeAfterSetCompactionRevision1 = storage.range(FOO_KEY, FOO_KEY, 
5);
+
+            CompletableFuture<Void> readOperationsFuture = 
storage.readOperationsFuture(3);
+            assertFalse(readOperationsFuture.isDone());
+
+            rangeBeforeSetCompactionRevision.close();
+            assertTrue(readOperationsFuture.isDone());
+
+            rangeAfterSetCompactionRevision0.close();
+            rangeAfterSetCompactionRevision1.close();
+        } catch (Throwable t) {
+            IgniteUtils.closeAll(
+                    rangeBeforeSetCompactionRevision,
+                    rangeAfterSetCompactionRevision0,
+                    rangeAfterSetCompactionRevision1
+            );
+        }
+    }
+
     private List<Integer> collectRevisions(byte[] key) {
         var revisions = new ArrayList<Integer>();
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
new file mode 100644
index 0000000000..6c23982c9c
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.junit.jupiter.api.Test;
+
+/** For {@link ReadOperationForCompactionTracker} testing. */
+public class ReadOperationForCompactionTrackerTest {
+    private final ReadOperationForCompactionTracker tracker = new 
ReadOperationForCompactionTracker();
+
+    @Test
+    void testCollectEmpty() {
+        assertTrue(tracker.collect(0).isDone());
+        assertTrue(tracker.collect(1).isDone());
+        assertTrue(tracker.collect(2).isDone());
+    }
+
+    @Test
+    void testTrackAndUntrack() {
+        UUID readOperation0 = UUID.randomUUID();
+        UUID readOperation1 = UUID.randomUUID();
+
+        long compactionRevision0 = 0;
+        long compactionRevision1 = 1;
+
+        assertDoesNotThrow(() -> tracker.track(readOperation0, 
compactionRevision0));
+        assertDoesNotThrow(() -> tracker.track(readOperation0, 
compactionRevision1));
+        assertDoesNotThrow(() -> tracker.track(readOperation1, 
compactionRevision0));
+        assertDoesNotThrow(() -> tracker.track(readOperation1, 
compactionRevision1));
+
+        assertDoesNotThrow(() -> tracker.untrack(readOperation0, 
compactionRevision0));
+        assertDoesNotThrow(() -> tracker.untrack(readOperation0, 
compactionRevision1));
+        assertDoesNotThrow(() -> tracker.untrack(readOperation1, 
compactionRevision0));
+        assertDoesNotThrow(() -> tracker.untrack(readOperation1, 
compactionRevision1));
+
+        // Let's check that after untrack we can do track again for the 
previous arguments.
+        assertDoesNotThrow(() -> tracker.track(readOperation0, 
compactionRevision0));
+        assertDoesNotThrow(() -> tracker.untrack(readOperation0, 
compactionRevision0));
+    }
+
+    @Test
+    void testTrackUntrackAndCollect() {
+        UUID readOperation0 = UUID.randomUUID();
+        UUID readOperation1 = UUID.randomUUID();
+
+        long compactionRevision0 = 0;
+        long compactionRevision1 = 1;
+
+        tracker.track(readOperation0, compactionRevision0);
+        tracker.track(readOperation1, compactionRevision0);
+
+        assertTrue(tracker.collect(0).isDone());
+
+        CompletableFuture<Void> collectFuture1 = tracker.collect(1);
+        assertFalse(collectFuture1.isDone());
+
+        tracker.untrack(readOperation0, compactionRevision0);
+        assertFalse(collectFuture1.isDone());
+
+        tracker.untrack(readOperation1, compactionRevision0);
+        assertTrue(collectFuture1.isDone());
+
+        tracker.track(readOperation0, compactionRevision1);
+        tracker.track(readOperation1, compactionRevision1);
+
+        assertTrue(tracker.collect(0).isDone());
+        assertTrue(tracker.collect(1).isDone());
+
+        CompletableFuture<Void> collectFuture2 = tracker.collect(2);
+        assertFalse(collectFuture2.isDone());
+
+        tracker.untrack(readOperation1, compactionRevision1);
+        assertFalse(collectFuture2.isDone());
+
+        tracker.untrack(readOperation0, compactionRevision1);
+        assertTrue(collectFuture2.isDone());
+    }
+}
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 4ec9d8c091..110ff4973c 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.server;
 
 import static java.nio.file.StandardOpenOption.WRITE;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
@@ -870,7 +869,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
                 ? keysIdx.tailMap(keyFrom)
                 : keysIdx.subMap(keyFrom, keyTo);
 
-        return subMap.entrySet().stream()
+        List<Entry> entries = subMap.entrySet().stream()
                 .map(e -> {
                     byte[] key = e.getKey();
                     long[] keyRevisions = toLongArray(e.getValue());
@@ -892,6 +891,30 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
                     return EntryImpl.toEntry(key, revision, value);
                 })
                 .filter(e -> !e.empty())
-                .collect(collectingAndThen(toList(), Cursor::fromIterable));
+                .collect(toList());
+
+        Iterator<Entry> iterator = entries.iterator();
+
+        long readOperationId = readOperationIdGeneratorForTracker++;
+        long compactionRevisionOnCreateIterator = compactionRevision;
+
+        readOperationForCompactionTracker.track(readOperationId, 
compactionRevisionOnCreateIterator);
+
+        return new Cursor<>() {
+            @Override
+            public void close() {
+                readOperationForCompactionTracker.untrack(readOperationId, 
compactionRevisionOnCreateIterator);
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public Entry next() {
+                return iterator.next();
+            }
+        };
     }
 }

Reply via email to