This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f9abf076a4 IGNITE-23300 Add local tracking of completion of readings
from metastorage to the required revision (#4572)
f9abf076a4 is described below
commit f9abf076a442f47d45606f1eabd82344d6d4ad43
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Oct 17 11:17:45 2024 +0300
IGNITE-23300 Add local tracking of completion of readings from metastorage
to the required revision (#4572)
---
.../metastorage/MetaStorageCompactionManager.java | 13 ++
.../impl/ItMetaStorageManagerImplTest.java | 167 +++++++++++++++++++--
.../metastorage/impl/MetaStorageManagerImpl.java | 135 ++++++++++++++---
.../server/AbstractKeyValueStorage.java | 27 ++--
.../metastorage/server/KeyValueStorage.java | 11 ++
.../server/ReadOperationForCompactionTracker.java | 147 ++++++++++++++++++
.../server/persistence/RocksDbKeyValueStorage.java | 5 +
.../AbstractCompactionKeyValueStorageTest.java | 81 ++++++++++
.../ReadOperationForCompactionTrackerTest.java | 99 ++++++++++++
.../server/SimpleInMemoryKeyValueStorage.java | 29 +++-
10 files changed, 660 insertions(+), 54 deletions(-)
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
index 192f7eebdc..cd32f41346 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -80,4 +81,16 @@ public interface MetaStorageCompactionManager extends
IgniteComponent {
* @see #setCompactionRevisionLocally(long)
*/
long getCompactionRevisionLocally();
+
+ /**
+ * Returns a future that will complete when all read operations (from
leader and locally) that were started before
+ * {@code compactionRevisionExcluded} will be completed.
+ *
+ * <p>Should be invoked after {@link #setCompactionRevisionLocally} on the
same revision.</p>
+ *
+ * <p>Future may complete with {@link NodeStoppingException} if the node
is in the process of stopping.</p>
+ *
+ * @param compactionRevisionExcluded Compaction revision of interest.
+ */
+ CompletableFuture<Void> readOperationsFuture(long
compactionRevisionExcluded);
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index a99e3b74d5..a122230d7a 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -35,6 +36,8 @@ import static
org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
@@ -47,6 +50,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.stream.Stream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
@@ -65,6 +69,7 @@ import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
@@ -76,6 +81,7 @@ import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
@@ -85,13 +91,19 @@ import
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.ReadActionRequest;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
/**
@@ -99,6 +111,10 @@ import org.mockito.ArgumentCaptor;
*/
@ExtendWith(ConfigurationExtension.class)
public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
+ private static final ByteArray FOO_KEY = new ByteArray("foo");
+
+ private static final byte[] VALUE = "value".getBytes(UTF_8);
+
private ClusterService clusterService;
private Loza raftManager;
@@ -201,8 +217,6 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
*/
@Test
void testPrefixOverflow() {
- byte[] value = "value".getBytes(UTF_8);
-
var key1 = new ByteArray(new byte[]{1, (byte) 0xFF, 0});
var key2 = new ByteArray(new byte[]{1, (byte) 0xFF, 1});
var key3 = new ByteArray(new byte[]{1, (byte) 0xFF, (byte) 0xFF});
@@ -212,13 +226,13 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
var key5 = new ByteArray(new byte[]{1, (byte) 0xFE, 1});
CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
- Conditions.notExists(new ByteArray("foo")),
+ Conditions.notExists(FOO_KEY),
List.of(
- Operations.put(key1, value),
- Operations.put(key2, value),
- Operations.put(key3, value),
- Operations.put(key4, value),
- Operations.put(key5, value)
+ Operations.put(key1, VALUE),
+ Operations.put(key2, VALUE),
+ Operations.put(key3, VALUE),
+ Operations.put(key4, VALUE),
+ Operations.put(key5, VALUE)
),
List.of(Operations.noop())
);
@@ -238,7 +252,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
assertThat(metaStorageManager.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
- CompletableFuture<Entry> fut =
svc.get(ByteArray.fromString("ignored"));
+ CompletableFuture<Entry> fut = svc.get(FOO_KEY);
assertThat(fut, willThrowFast(CancellationException.class));
}
@@ -290,7 +304,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
metaStorageManager.registerRevisionUpdateListener(listener);
- assertThat(metaStorageManager.put(ByteArray.fromString("test"),
"test".getBytes(UTF_8)), willSucceedFast());
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE), willSucceedFast());
// Watches are processed asynchronously.
// Timeout is big just in case there's a GC pause. Test's duration
doesn't really depend on it.
@@ -304,13 +318,10 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
*/
@Test
void
testIdleSafeTimePropagationAndNormalSafeTimePropagationInteraction(TestInfo
testInfo) {
- var key = new ByteArray("foo");
- byte[] value = "bar".getBytes(UTF_8);
-
AtomicBoolean watchCompleted = new AtomicBoolean(false);
CompletableFuture<HybridTimestamp> watchEventTsFuture = new
CompletableFuture<>();
- metaStorageManager.registerExactWatch(key, new WatchListener() {
+ metaStorageManager.registerExactWatch(FOO_KEY, new WatchListener() {
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
watchEventTsFuture.complete(event.timestamp());
@@ -326,7 +337,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
}
});
- metaStorageManager.put(key, value);
+ metaStorageManager.put(FOO_KEY, VALUE);
ClusterTime clusterTime = metaStorageManager.clusterTime();
@@ -338,6 +349,132 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
assertThat("Safe time is advanced too early", watchCompleted.get(),
is(true));
}
+ @Test
+ void testReadOperationsFutureWithoutReadOperations() {
+ assertTrue(metaStorageManager.readOperationsFuture(0).isDone());
+ assertTrue(metaStorageManager.readOperationsFuture(1).isDone());
+ }
+
+ /**
+ * Tests {@link MetaStorageManagerImpl#readOperationsFuture} as expected
in use.
+ * <ul>
+ * <li>Creates read operations from the leader and local ones.</li>
+ * <li>Set a new compaction revision via {@link
MetaStorageManagerImpl#setCompactionRevisionLocally}.</li>
+ * <li>Wait for the completion of read operations on the new
compaction revision.</li>
+ * </ul>
+ *
+ * <p>Due to the difficulty of testing all reading from leader methods at
once, we test each of them separately.</p>
+ */
+ @ParameterizedTest
+ @MethodSource("readFromLeaderOperations")
+ public void testReadOperationsFuture(ReadFromLeaderAction
readFromLeaderAction) {
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+
+ var startSendReadActionRequestFuture = new CompletableFuture<Void>();
+ var continueSendReadActionRequestFuture = new
CompletableFuture<Void>();
+
+ listenReadActionRequest(startSendReadActionRequestFuture,
continueSendReadActionRequestFuture);
+
+ CompletableFuture<?> readFromLeaderOperationFuture =
readFromLeaderAction.read(metaStorageManager, FOO_KEY);
+ Cursor<Entry> getLocallyCursor =
metaStorageManager.getLocally(FOO_KEY, FOO_KEY, 5);
+
+ assertThat(startSendReadActionRequestFuture,
willCompleteSuccessfully());
+
+ metaStorageManager.setCompactionRevisionLocally(1);
+
+ CompletableFuture<Void> readOperationsFuture =
metaStorageManager.readOperationsFuture(1);
+ assertFalse(readOperationsFuture.isDone());
+
+ getLocallyCursor.close();
+ assertFalse(readOperationsFuture.isDone());
+
+ continueSendReadActionRequestFuture.complete(null);
+ assertThat(readOperationsFuture, willCompleteSuccessfully());
+ assertThat(readFromLeaderOperationFuture, willCompleteSuccessfully());
+ }
+
+ /**
+ * Tests that read operations from the leader and local ones created after
{@link MetaStorageManagerImpl#setCompactionRevisionLocally}
+ * will not affect future from {@link
MetaStorageManagerImpl#readOperationsFuture} on a new compaction revision.
+ *
+ * <p>Due to the difficulty of testing all reading from leader methods at
once, we test each of them separately.</p>
+ */
+ @ParameterizedTest
+ @MethodSource("readFromLeaderOperations")
+ void
testReadOperationsFutureForReadOperationAfterSetCompactionRevision(ReadFromLeaderAction
readFromLeaderAction) throws Exception {
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+
+ metaStorageManager.setCompactionRevisionLocally(1);
+
+ var startSendReadActionRequestFuture = new CompletableFuture<Void>();
+ var continueSendReadActionRequestFuture = new
CompletableFuture<Void>();
+
+ listenReadActionRequest(startSendReadActionRequestFuture,
continueSendReadActionRequestFuture);
+
+ CompletableFuture<?> readFromLeaderOperationFuture =
readFromLeaderAction.read(metaStorageManager, FOO_KEY);
+ Cursor<Entry> getLocallyCursor =
metaStorageManager.getLocally(FOO_KEY, FOO_KEY, 5);
+
+ assertThat(startSendReadActionRequestFuture,
willCompleteSuccessfully());
+
+ assertTrue(metaStorageManager.readOperationsFuture(1).isDone());
+
+ getLocallyCursor.close();
+
+ continueSendReadActionRequestFuture.complete(null);
+ assertThat(readFromLeaderOperationFuture, willCompleteSuccessfully());
+ }
+
+ @FunctionalInterface
+ private interface ReadFromLeaderAction {
+ CompletableFuture<?> read(MetaStorageManager metastore, ByteArray key);
+
+ static ReadFromLeaderAction readAsync(ReadFromLeaderAction
readFromLeaderAction) {
+ return (metastore, key) -> runAsync(() ->
readFromLeaderAction.read(metastore, key)).thenCompose(Function.identity());
+ }
+ }
+
+ private static List<Arguments> readFromLeaderOperations() {
+ return List.of(
+ Arguments.of(Named.named(
+ "getSingleLatest",
+ ReadFromLeaderAction.readAsync(MetaStorageManager::get)
+ )),
+ Arguments.of(Named.named(
+ "getSingleBounded",
+ ReadFromLeaderAction.readAsync((metastore, key) ->
metastore.get(key, 2))
+ )),
+ Arguments.of(Named.named(
+ "getAllLatest",
+ ReadFromLeaderAction.readAsync((metastore, key) ->
metastore.getAll(Set.of(key)))
+ )),
+ Arguments.of(Named.named(
+ "prefixLatest",
+ ReadFromLeaderAction.readAsync((metastore, key) ->
subscribeToList(metastore.prefix(key)))
+ )),
+ Arguments.of(Named.named(
+ "prefixBounded",
+ ReadFromLeaderAction.readAsync((metastore, key) ->
subscribeToList(metastore.prefix(key, 3)))
+ ))
+ );
+ }
+
+ private void listenReadActionRequest(
+ CompletableFuture<Void> startSendReadActionRequestFuture,
+ CompletableFuture<Void> continueSendReadActionRequestFuture
+ ) {
+ ((DefaultMessagingService)
clusterService.messagingService()).dropMessages((recipientConsistentId,
message) -> {
+ if (message instanceof ReadActionRequest) {
+ startSendReadActionRequestFuture.complete(null);
+
+ assertThat(continueSendReadActionRequestFuture,
willCompleteSuccessfully());
+ }
+
+ return false;
+ });
+ }
+
private static CompletableFuture<Void> waitFor(int timeout, TimeUnit unit)
{
return new CompletableFuture<Void>()
.orTimeout(timeout, unit)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 580dcb1f44..4ea86aac0c 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.Collections.emptySet;
import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -37,10 +38,13 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongConsumer;
+import java.util.function.Supplier;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.MetaStorageInfo;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -71,6 +75,7 @@ import
org.apache.ignite.internal.metastorage.impl.raft.MetaStorageSnapshotStora
import org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
+import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -187,6 +192,12 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
*/
private final AtomicReference<IndexWithTerm> lastHandledIndexWithTerm =
new AtomicReference<>(new IndexWithTerm(0, 0));
+ /** Tracks only reads from the leader, local reads are tracked by the
storage itself. */
+ private final ReadOperationForCompactionTracker
readOperationFromLeaderForCompactionTracker = new
ReadOperationForCompactionTracker();
+
+ /** Read operation ID generator for {@link
#readOperationIdGeneratorForTracker}. */
+ private final AtomicLong readOperationIdGeneratorForTracker = new
AtomicLong();
+
/**
* The constructor.
*
@@ -739,12 +750,18 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
@Override
public CompletableFuture<Entry> get(ByteArray key) {
- return inBusyLockAsync(busyLock, () ->
metaStorageSvcFut.thenCompose(svc -> svc.get(key)));
+ return inBusyLockAsync(
+ busyLock,
+ () -> withTrackReadOperationFromLeaderFuture(() ->
metaStorageSvcFut.thenCompose(svc -> svc.get(key)))
+ );
}
@Override
public CompletableFuture<Entry> get(ByteArray key, long revUpperBound) {
- return inBusyLockAsync(busyLock, () ->
metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound)));
+ return inBusyLockAsync(
+ busyLock,
+ () -> withTrackReadOperationFromLeaderFuture(() ->
metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound)))
+ );
}
@Override
@@ -779,7 +796,10 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
@Override
public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray>
keys) {
- return inBusyLock(busyLock, () -> metaStorageSvcFut.thenCompose(svc ->
svc.getAll(keys)));
+ return inBusyLock(
+ busyLock,
+ () -> withTrackReadOperationFromLeaderFuture(() ->
metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys)))
+ );
}
/**
@@ -893,24 +913,6 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
}
- /**
- * Retrieves entries for the given key range in lexicographic order.
Entries will be filtered out by upper bound of given revision
- * number.
- *
- * @see MetaStorageService#range(ByteArray, ByteArray, long)
- */
- public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray
keyTo, long revUpperBound) {
- if (!busyLock.enterBusy()) {
- return new NodeStoppingPublisher<>();
- }
-
- try {
- return new
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc ->
svc.range(keyFrom, keyTo, revUpperBound)));
- } finally {
- busyLock.leaveBusy();
- }
- }
-
@Override
public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray
keyTo) {
if (!busyLock.enterBusy()) {
@@ -918,7 +920,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
try {
- return new
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc ->
svc.range(keyFrom, keyTo, false)));
+ return withTrackReadOperationFromLeaderPublisher(
+ () -> new
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc ->
svc.range(keyFrom, keyTo, false)))
+ );
} finally {
busyLock.leaveBusy();
}
@@ -936,7 +940,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
try {
- return new
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc ->
svc.prefix(keyPrefix, revUpperBound)));
+ return withTrackReadOperationFromLeaderPublisher(
+ () -> new
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc ->
svc.prefix(keyPrefix, revUpperBound)))
+ );
} finally {
busyLock.leaveBusy();
}
@@ -987,7 +993,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
throw new CompletionException(e);
}
- assert indexWithTerm != null : "Attempt to get index and term when
Raft node is not started yet or already stopped)";
+ assert indexWithTerm != null : "Attempt to get index and term when
Raft node is not started yet or already stopped): " + nodeId;
return indexWithTerm;
}));
@@ -1135,4 +1141,85 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
public long getCompactionRevisionLocally() {
return inBusyLock(busyLock, storage::getCompactionRevision);
}
+
+ @Override
+ public CompletableFuture<Void> readOperationsFuture(long
compactionRevisionExcluded) {
+ return inBusyLock(
+ busyLock,
+ () -> allOf(
+
readOperationFromLeaderForCompactionTracker.collect(compactionRevisionExcluded),
+
storage.readOperationsFuture(compactionRevisionExcluded)
+ )
+ );
+ }
+
+ private <T> CompletableFuture<T>
withTrackReadOperationFromLeaderFuture(Supplier<CompletableFuture<T>>
readFromLeader) {
+ long readOperationId =
readOperationIdGeneratorForTracker.getAndIncrement();
+ long compactionRevision = storage.getCompactionRevision();
+
+ readOperationFromLeaderForCompactionTracker.track(readOperationId,
compactionRevision);
+
+ try {
+ return readFromLeader.get().whenComplete(
+ (t, throwable) ->
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
compactionRevision)
+ );
+ } catch (Throwable t) {
+
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
compactionRevision);
+
+ throw t;
+ }
+ }
+
+ private Publisher<Entry>
withTrackReadOperationFromLeaderPublisher(Supplier<Publisher<Entry>>
readFromLeader) {
+ long readOperationId =
readOperationIdGeneratorForTracker.getAndIncrement();
+ long compactionRevision = storage.getCompactionRevision();
+
+ readOperationFromLeaderForCompactionTracker.track(readOperationId,
compactionRevision);
+
+ try {
+ Publisher<Entry> publisherFromLeader = readFromLeader.get();
+
+ return subscriber -> publisherFromLeader.subscribe(new
Subscriber<>() {
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscriber.onSubscribe(new Subscription() {
+ @Override
+ public void request(long n) {
+ subscription.request(n);
+ }
+
+ @Override
+ public void cancel() {
+
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
compactionRevision);
+
+ subscription.cancel();
+ }
+ });
+ }
+
+ @Override
+ public void onNext(Entry item) {
+ subscriber.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
compactionRevision);
+
+ subscriber.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
compactionRevision);
+
+ subscriber.onComplete();
+ }
+ });
+ } catch (Throwable t) {
+
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
compactionRevision);
+
+ throw t;
+ }
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index e694a9249a..70f66bd871 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.impl.EntryImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
-import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
/** Abstract implementation of {@link KeyValueStorage}. */
@@ -79,6 +78,16 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
protected final AtomicBoolean stopCompaction = new AtomicBoolean();
+ /** Tracks only cursors, since reading a single entry or a batch is done
entirely under {@link #rwLock}. */
+ protected final ReadOperationForCompactionTracker
readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
+
+ /**
+ * Used to generate read operation ID for {@link
#readOperationForCompactionTracker}.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
+ protected long readOperationIdGeneratorForTracker;
+
/**
* Constructor.
*
@@ -150,17 +159,6 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
- rwLock.readLock().lock();
-
- try {
- return range(keyFrom, keyTo, rev);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
@Override
public long revision() {
rwLock.readLock().lock();
@@ -273,6 +271,11 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
}
+ @Override
+ public CompletableFuture<Void> readOperationsFuture(long
compactionRevisionExcluded) {
+ return
readOperationForCompactionTracker.collect(compactionRevisionExcluded);
+ }
+
/** Notifies of revision update. Must be called under the {@link #rwLock}.
*/
protected void notifyRevisionUpdate() {
if (recoveryRevisionListener != null) {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3ca0f9f5fa..51ec2e44cd 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -538,6 +538,17 @@ public interface KeyValueStorage extends ManuallyCloseable
{
*/
long getCompactionRevision();
+ /**
+ * Returns a future that will complete when all read operations that were
started before {@code compactionRevisionExcluded}.
+ *
+ * <p>Current method is expected to be invoked after {@link
#setCompactionRevision} on the same revision.</p>
+ *
+ * <p>Future completes without exception.</p>
+ *
+ * @param compactionRevisionExcluded Compaction revision of interest.
+ */
+ CompletableFuture<Void> readOperationsFuture(long
compactionRevisionExcluded);
+
/**
* Returns checksum corresponding to the revision.
*
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
new file mode 100644
index 0000000000..f68c4efbb2
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.metastorage.MetaStorageCompactionManager;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.CompletableFutures;
+
+/**
+ * Tracker of read operations from metastorage or its storage. Used to track
the completion of read operations before start local
+ * compaction of metastorage.
+ *
+ * <p>Expected usage:</p>
+ * <ul>
+ * <li>Before starting execution, the reading command invoke {@link
#track} with its ID and the compaction revision that is currently
+ * set ({@link
MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link
KeyValueStorage#setCompactionRevision}).</li>
+ * <li>After completion, the reading command will invoke {@link #untrack}
with the same arguments as when calling {@link #track},
+ * regardless of whether the operation was successful or not.</li>
+ * <li>{@link #collect} will be invoked only after a new compaction
revision has been set
+ * ({@link
MetaStorageCompactionManager#setCompactionRevisionLocally}/{@link
KeyValueStorage#setCompactionRevision}) for a new
+ * compaction revision.</li>
+ * </ul>
+ */
+public class ReadOperationForCompactionTracker {
+ private final Map<ReadOperationKey, CompletableFuture<Void>>
readOperationFutureByKey = new ConcurrentHashMap<>();
+
+ /**
+ * Starts tracking the completion of a read operation on the current
compaction revision.
+ *
+ * <p>Method is expected not to be called more than once for the same
arguments.</p>
+ *
+ * <p>Expected usage pattern:</p>
+ * <pre><code>
+ * Object readOperationId = ...;
+ * int compactionRevision = ...;
+ *
+ * tracker.track(readOperationId, compactionRevision);
+ *
+ * try {
+ * doReadOperation(...);
+ * } finally {
+ * tracker.untrack(readOperationId, compactionRevision);
+ * }
+ * </code></pre>
+ *
+ * @see #untrack(Object, long)
+ */
+ public void track(Object readOperationId, long compactionRevision) {
+ var key = new ReadOperationKey(readOperationId, compactionRevision);
+
+ CompletableFuture<Void> previous =
readOperationFutureByKey.putIfAbsent(key, new CompletableFuture<>());
+
+ assert previous == null : key;
+ }
+
+ /**
+ * Stops tracking the read operation on the compaction revision on which
tracking start.
+ *
+ * <p>Method is expected not to be called more than once for the same
arguments, and {@link #track} was previously called for same
+ * arguments.</p>
+ *
+ * @see #track(Object, long)
+ */
+ public void untrack(Object readOperationId, long compactionRevision) {
+ var key = new ReadOperationKey(readOperationId, compactionRevision);
+
+ CompletableFuture<Void> removed = readOperationFutureByKey.remove(key);
+
+ assert removed != null : key;
+
+ removed.complete(null);
+ }
+
+ /**
+ * Collects all read operations that were started before {@code
compactionRevisionExcluded} and returns a future that will complete
+ * when all collected operations complete.
+ *
+ * <p>Future completes without exception.</p>
+ */
+ public CompletableFuture<Void> collect(long compactionRevisionExcluded) {
+ return readOperationFutureByKey.entrySet().stream()
+ .filter(entry -> entry.getKey().compactionRevision <
compactionRevisionExcluded)
+ .map(Entry::getValue)
+ .collect(collectingAndThen(toList(),
CompletableFutures::allOf));
+ }
+
+ private static class ReadOperationKey {
+ @IgniteToStringInclude
+ private final Object readOperationId;
+
+ private final long compactionRevision;
+
+ private ReadOperationKey(Object readOperationId, long
compactionRevision) {
+ this.readOperationId = readOperationId;
+ this.compactionRevision = compactionRevision;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReadOperationKey that = (ReadOperationKey) o;
+
+ return compactionRevision == that.compactionRevision &&
readOperationId.equals(that.readOperationId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = readOperationId.hashCode();
+ result = 31 * result + (int) (compactionRevision ^
(compactionRevision >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index cb49b034e1..eef6e16fdb 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1464,8 +1464,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
iterator.seek(keyFrom);
+ long readOperationId = readOperationIdGeneratorForTracker++;
long compactionRevisionBeforeCreateCursor = compactionRevision;
+ readOperationForCompactionTracker.track(readOperationId,
compactionRevision);
+
return new RocksIteratorAdapter<>(iterator) {
/** Cached entry used to filter "empty" values. */
private @Nullable Entry next;
@@ -1529,6 +1532,8 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public void close() {
+ readOperationForCompactionTracker.untrack(readOperationId,
compactionRevisionBeforeCreateCursor);
+
super.close();
RocksUtils.closeAll(readOpts, upperBound);
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index c549271e4b..749bda65b7 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -30,14 +30,17 @@ import static
org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -52,6 +55,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -924,6 +928,83 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
}
}
+ @Test
+ void testReadOperationsFutureWithoutReadOperations() {
+ assertTrue(storage.readOperationsFuture(0).isDone());
+ assertTrue(storage.readOperationsFuture(1).isDone());
+ }
+
+ /**
+ * Tests {@link KeyValueStorage#readOperationsFuture} as expected in use.
+ * <ul>
+ * <li>Create read operations, we only need cursors since reading one
entry or a batch is synchronized with
+ * {@link KeyValueStorage#setCompactionRevision}.</li>
+ * <li>Set a new compaction revision via {@link
KeyValueStorage#setCompactionRevision}.</li>
+ * <li>Wait for the completion of read operations on the new
compaction revision.</li>
+ * </ul>
+ *
+ * <p>The keys are chosen randomly. Keys with their revisions are added in
{@link #setUp()}.</p>
+ */
+ @Test
+ void testReadOperationsFuture() throws Exception {
+ Cursor<Entry> range0 = storage.range(FOO_KEY, FOO_KEY);
+ Cursor<Entry> range1 = storage.range(BAR_KEY, BAR_KEY, 5);
+
+ try {
+ storage.setCompactionRevision(3);
+
+ CompletableFuture<Void> readOperationsFuture =
storage.readOperationsFuture(3);
+ assertFalse(readOperationsFuture.isDone());
+
+ range0.stream().forEach(entry -> {});
+ assertFalse(readOperationsFuture.isDone());
+
+ range1.stream().forEach(entry -> {});
+ assertFalse(readOperationsFuture.isDone());
+
+ range0.close();
+ assertFalse(readOperationsFuture.isDone());
+
+ range1.close();
+ assertTrue(readOperationsFuture.isDone());
+ } catch (Throwable t) {
+ IgniteUtils.closeAll(range0, range1);
+ }
+ }
+
+ /**
+ * Tests that cursors created after {@link
KeyValueStorage#setCompactionRevision} will not affect future from
+ * {@link KeyValueStorage#readOperationsFuture} on a new compaction
revision.
+ */
+ @Test
+ void testReadOperationsFutureForReadOperationAfterSetCompactionRevision()
throws Exception {
+ Cursor<Entry> rangeBeforeSetCompactionRevision =
storage.range(FOO_KEY, FOO_KEY);
+ Cursor<Entry> rangeAfterSetCompactionRevision0 = null;
+ Cursor<Entry> rangeAfterSetCompactionRevision1 = null;
+
+ try {
+ storage.setCompactionRevision(3);
+
+ rangeAfterSetCompactionRevision0 = storage.range(FOO_KEY, FOO_KEY);
+ rangeAfterSetCompactionRevision1 = storage.range(FOO_KEY, FOO_KEY,
5);
+
+ CompletableFuture<Void> readOperationsFuture =
storage.readOperationsFuture(3);
+ assertFalse(readOperationsFuture.isDone());
+
+ rangeBeforeSetCompactionRevision.close();
+ assertTrue(readOperationsFuture.isDone());
+
+ rangeAfterSetCompactionRevision0.close();
+ rangeAfterSetCompactionRevision1.close();
+ } catch (Throwable t) {
+ IgniteUtils.closeAll(
+ rangeBeforeSetCompactionRevision,
+ rangeAfterSetCompactionRevision0,
+ rangeAfterSetCompactionRevision1
+ );
+ }
+ }
+
private List<Integer> collectRevisions(byte[] key) {
var revisions = new ArrayList<Integer>();
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
new file mode 100644
index 0000000000..6c23982c9c
--- /dev/null
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.junit.jupiter.api.Test;
+
+/** For {@link ReadOperationForCompactionTracker} testing. */
+public class ReadOperationForCompactionTrackerTest {
+ private final ReadOperationForCompactionTracker tracker = new
ReadOperationForCompactionTracker();
+
+ @Test
+ void testCollectEmpty() {
+ assertTrue(tracker.collect(0).isDone());
+ assertTrue(tracker.collect(1).isDone());
+ assertTrue(tracker.collect(2).isDone());
+ }
+
+ @Test
+ void testTrackAndUntrack() {
+ UUID readOperation0 = UUID.randomUUID();
+ UUID readOperation1 = UUID.randomUUID();
+
+ long compactionRevision0 = 0;
+ long compactionRevision1 = 1;
+
+ assertDoesNotThrow(() -> tracker.track(readOperation0,
compactionRevision0));
+ assertDoesNotThrow(() -> tracker.track(readOperation0,
compactionRevision1));
+ assertDoesNotThrow(() -> tracker.track(readOperation1,
compactionRevision0));
+ assertDoesNotThrow(() -> tracker.track(readOperation1,
compactionRevision1));
+
+ assertDoesNotThrow(() -> tracker.untrack(readOperation0,
compactionRevision0));
+ assertDoesNotThrow(() -> tracker.untrack(readOperation0,
compactionRevision1));
+ assertDoesNotThrow(() -> tracker.untrack(readOperation1,
compactionRevision0));
+ assertDoesNotThrow(() -> tracker.untrack(readOperation1,
compactionRevision1));
+
+ // Let's check that after untrack we can do track again for the
previous arguments.
+ assertDoesNotThrow(() -> tracker.track(readOperation0,
compactionRevision0));
+ assertDoesNotThrow(() -> tracker.untrack(readOperation0,
compactionRevision0));
+ }
+
+ @Test
+ void testTrackUntrackAndCollect() {
+ UUID readOperation0 = UUID.randomUUID();
+ UUID readOperation1 = UUID.randomUUID();
+
+ long compactionRevision0 = 0;
+ long compactionRevision1 = 1;
+
+ tracker.track(readOperation0, compactionRevision0);
+ tracker.track(readOperation1, compactionRevision0);
+
+ assertTrue(tracker.collect(0).isDone());
+
+ CompletableFuture<Void> collectFuture1 = tracker.collect(1);
+ assertFalse(collectFuture1.isDone());
+
+ tracker.untrack(readOperation0, compactionRevision0);
+ assertFalse(collectFuture1.isDone());
+
+ tracker.untrack(readOperation1, compactionRevision0);
+ assertTrue(collectFuture1.isDone());
+
+ tracker.track(readOperation0, compactionRevision1);
+ tracker.track(readOperation1, compactionRevision1);
+
+ assertTrue(tracker.collect(0).isDone());
+ assertTrue(tracker.collect(1).isDone());
+
+ CompletableFuture<Void> collectFuture2 = tracker.collect(2);
+ assertFalse(collectFuture2.isDone());
+
+ tracker.untrack(readOperation1, compactionRevision1);
+ assertFalse(collectFuture2.isDone());
+
+ tracker.untrack(readOperation0, compactionRevision1);
+ assertTrue(collectFuture2.isDone());
+ }
+}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 4ec9d8c091..110ff4973c 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.server;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
@@ -870,7 +869,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
? keysIdx.tailMap(keyFrom)
: keysIdx.subMap(keyFrom, keyTo);
- return subMap.entrySet().stream()
+ List<Entry> entries = subMap.entrySet().stream()
.map(e -> {
byte[] key = e.getKey();
long[] keyRevisions = toLongArray(e.getValue());
@@ -892,6 +891,30 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
return EntryImpl.toEntry(key, revision, value);
})
.filter(e -> !e.empty())
- .collect(collectingAndThen(toList(), Cursor::fromIterable));
+ .collect(toList());
+
+ Iterator<Entry> iterator = entries.iterator();
+
+ long readOperationId = readOperationIdGeneratorForTracker++;
+ long compactionRevisionOnCreateIterator = compactionRevision;
+
+ readOperationForCompactionTracker.track(readOperationId,
compactionRevisionOnCreateIterator);
+
+ return new Cursor<>() {
+ @Override
+ public void close() {
+ readOperationForCompactionTracker.untrack(readOperationId,
compactionRevisionOnCreateIterator);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ return iterator.next();
+ }
+ };
}
}