This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d8db84df26 IGNITE-23293 Implement the algorithm of Metastorage 
Compaction Trigger (#4632)
d8db84df26 is described below

commit d8db84df260948b6dcc5f54fed34ebdc043a2904
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Oct 25 19:57:10 2024 +0300

    IGNITE-23293 Implement the algorithm of Metastorage Compaction Trigger 
(#4632)
---
 modules/metastorage/build.gradle                   |   1 +
 .../impl/ItMetaStorageCompactionTriggerTest.java   | 162 +++++++++++
 ...MetaStorageSafeTimePropagationAbstractTest.java |   9 +-
 .../CompactionCommand.java}                        |  34 +--
 .../command/MetastorageCommandsMessageGroup.java   |   3 +
 .../impl/MetaStorageCompactionTrigger.java         | 298 +++++++++++++++++++++
 .../metastorage/impl/MetaStorageManagerImpl.java   |  22 +-
 .../metastorage/impl/MetaStorageServiceImpl.java   |  16 ++
 .../server/AbstractKeyValueStorage.java            | 105 +++-----
 .../metastorage/server/KeyValueStorage.java        |  53 ++--
 .../metastorage/server/KeyValueUpdateContext.java  |  10 +
 ...llback.java => WatchEventHandlingCallback.java} |  24 +-
 .../metastorage/server/WatchProcessor.java         |  40 +--
 .../server/persistence/RocksDbKeyValueStorage.java |  27 +-
 .../server/raft/MetaStorageListener.java           |  12 +-
 .../server/raft/MetaStorageWriteHandler.java       |  11 +-
 .../server/raft/MetastorageGroupId.java            |   3 +-
 .../AbstractCompactionKeyValueStorageTest.java     |  83 ++----
 .../server/BasicOperationsKeyValueStorageTest.java |  16 +-
 .../metastorage/server/WatchProcessorTest.java     |  12 +-
 .../server/SimpleInMemoryKeyValueStorage.java      |  31 +--
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 22 files changed, 683 insertions(+), 292 deletions(-)

diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index 7fa001c281..2badd58d49 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -75,6 +75,7 @@ dependencies {
     integrationTestImplementation 
testFixtures(project(':ignite-cluster-management'))
     integrationTestImplementation 
testFixtures(project(':ignite-failure-handler'))
     integrationTestImplementation testFixtures(project(':ignite-metrics:'))
+    integrationTestImplementation testFixtures(project(':ignite-runner:'))
 
     testFixturesImplementation project(':ignite-cluster-management')
     testFixturesImplementation project(':ignite-core')
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
new file mode 100644
index 0000000000..4bfdf58a38
--- /dev/null
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTrigger.COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTrigger.COMPACTION_INTERVAL_PROPERTY;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** Integration test for {@link MetaStorageCompactionTrigger}. */
+@WithSystemProperty(key = COMPACTION_INTERVAL_PROPERTY, value = "10")
+@WithSystemProperty(key = COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY, value = 
"10")
+public class ItMetaStorageCompactionTriggerTest extends 
ClusterPerClassIntegrationTest {
+    private static final ByteArray FOO_KEY = ByteArray.fromString("foo_key");
+
+    private static final byte[] VALUE = ByteArray.fromString("value").bytes();
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @Override
+    protected int[] cmgMetastoreNodes() {
+        assertEquals(2, initialNodes());
+
+        return new int[] {0, 1};
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testMetastorageCompactionOccursOnAllNodes(boolean 
changeMetastorageLeader) throws Exception {
+        MetaStorageManager metaStorageManager = 
aliveNode().metaStorageManager();
+
+        var updateFooKeyLatch = new CountDownLatch(2);
+        watchExact(metaStorageManager, FOO_KEY, updateFooKeyLatch);
+
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        if (changeMetastorageLeader) {
+            transferMetastorageLeadershipToAnotherNode();
+        }
+
+        assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+
+        assertTrue(updateFooKeyLatch.await(1, SECONDS));
+
+        long latestFooEntryRevision = latestKeyRevision(metaStorageManager, 
FOO_KEY);
+
+        assertTrue(waitForCondition(() -> 
allNodesContainsSingleRevisionForKeyLocally(FOO_KEY, latestFooEntryRevision), 
10, 1_000));
+    }
+
+    private static IgniteImpl aliveNode() {
+        return unwrapIgniteImpl(CLUSTER.aliveNode());
+    }
+
+    private static boolean 
allNodesContainsSingleRevisionForKeyLocally(ByteArray key, long revision) {
+        return CLUSTER.runningNodes()
+                .map(TestWrappers::unwrapIgniteImpl)
+                .map(IgniteImpl::metaStorageManager)
+                .map(metaStorageManager -> 
collectRevisionsLocally(metaStorageManager, key))
+                .allMatch(keyRevisions -> keyRevisions.size() == 1 && 
keyRevisions.contains(revision));
+    }
+
+    private static void watchExact(MetaStorageManager metaStorageManager, 
ByteArray key, CountDownLatch latch) {
+        metaStorageManager.registerExactWatch(key, new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                latch.countDown();
+
+                return nullCompletedFuture();
+            }
+
+            @Override
+            public void onError(Throwable e) {
+            }
+        });
+    }
+
+    private static long latestKeyRevision(MetaStorageManager 
metaStorageManager, ByteArray key) {
+        CompletableFuture<Entry> latestEntryFuture = 
metaStorageManager.get(key);
+        assertThat(latestEntryFuture.thenApply(Entry::empty), willBe(false));
+
+        return latestEntryFuture.join().revision();
+    }
+
+    private static Set<Long> collectRevisionsLocally(MetaStorageManager 
metaStorageManager, ByteArray key) {
+        var res = new HashSet<Long>();
+
+        for (int i = 0; i <= metaStorageManager.appliedRevision(); i++) {
+            try {
+                Entry entry = metaStorageManager.getLocally(key, i);
+
+                if (!entry.empty()) {
+                    res.add(entry.revision());
+                }
+            } catch (CompactedException ignore) {
+                // Do nothing.
+            }
+        }
+
+        return res;
+    }
+
+    private void transferMetastorageLeadershipToAnotherNode() throws Exception 
{
+        RaftGroupService raftGroupService = 
CLUSTER.leaderServiceFor(MetastorageGroupId.INSTANCE);
+
+        String leaderConsistentId = 
raftGroupService.getRaftNode().getLeaderId().getConsistentId();
+
+        for (int i = 0; i < initialNodes(); i++) {
+            if (!CLUSTER.node(i).name().equals(leaderConsistentId)) {
+                CLUSTER.transferLeadershipTo(i, MetastorageGroupId.INSTANCE);
+
+                return;
+            }
+        }
+
+        fail("Failed to change metastorage leader: " + leaderConsistentId);
+    }
+}
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index 2cf7549f56..fea3a10818 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import 
org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest;
-import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
+import 
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.junit.jupiter.api.AfterEach;
@@ -47,16 +47,11 @@ public abstract class 
ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
 
     @BeforeEach
     public void startWatches() {
-        storage.startWatches(1, new OnRevisionAppliedCallback() {
+        storage.startWatches(1, new WatchEventHandlingCallback() {
             @Override
             public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
                 time.updateSafeTime(newSafeTime);
             }
-
-            @Override
-            public void onRevisionApplied(long revision) {
-                // No-op.
-            }
         });
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/CompactionCommand.java
similarity index 56%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
copy to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/CompactionCommand.java
index 0739f719f5..f35c3820df 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/CompactionCommand.java
@@ -15,32 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.server.raft;
+package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.network.annotations.Transferable;
 
 /**
- * Meta storage replication group id.
+ * Command to start the metastore compaction locally.
+ *
+ * <p>Invokes {@link KeyValueStorage#updateCompactionRevision} on command 
processing.</p>
  */
-public enum MetastorageGroupId implements ReplicationGroupId {
-    /** Meta storage group id. */
-    INSTANCE("metastorage_group");
-
-    /** Group id string representation. */
-    private String name;
-
-    /**
-     * The constructor.
-     *
-     * @param name The string representation of the enum.
-     */
-    MetastorageGroupId(String name) {
-        this.name = name;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString() {
-        return name;
-    }
+@Transferable(MetastorageCommandsMessageGroup.COMPACTION)
+public interface CompactionCommand extends MetaStorageWriteCommand {
+    /** New metastorage compaction revision. */
+    long compactionRevision();
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 819d02717d..040db1118d 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -64,4 +64,7 @@ public interface MetastorageCommandsMessageGroup {
 
     /** Message type for {@link EvictIdempotentCommandsCacheCommand}. */
     short EVICT_IDEMPOTENT_COMMAND_CACHE = 71;
+
+    /** Message type for {@link CompactionCommand}. */
+    short COMPACTION = 72;
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
new file mode 100644
index 0000000000..58011d242b
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metastorage compaction trigger.
+ *
+ * <p>Algorithm:</p>
+ * <ol>
+ *     <li>Metastorage leader waits locally for the start of scheduled 
compaction.</li>
+ *     <li>Metastorage leader locally calculates revision for compaction: it 
takes the current safe time and subtracts the data
+ *     availability time and uses that timestamp to get the revision.</li>
+ *     <li>If the revision is less than or equal to the last compacted 
revision, then go to point 6.</li>
+ *     <li>Metastorage leader creates and sends a {@link CompactionCommand} 
(see the command description what each node will do) with a new
+ *     revision for compaction.</li>
+ *     <li>Metastorage leader locally gets notification of the completion of 
the local compaction for the new revision.</li>
+ *     <li>Metastorage leader locally schedules a new start of compaction.</li>
+ * </ol>
+ */
+// TODO: IGNITE-23280 Turn on compaction
+class MetaStorageCompactionTrigger implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetaStorageCompactionTrigger.class);
+
+    /** System property that defines compaction start interval (in 
milliseconds). Default value is {@link Long#MAX_VALUE}. */
+    public static final String COMPACTION_INTERVAL_PROPERTY = 
"IGNITE_COMPACTION_INTERVAL";
+
+    /** System property that defines data availability time (in milliseconds). 
Default value is {@link Long#MAX_VALUE}. */
+    public static final String COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY = 
"IGNITE_COMPACTION_DATA_AVAILABILITY_TIME";
+
+    private final String localNodeName;
+
+    private final KeyValueStorage storage;
+
+    private final CompletableFuture<MetaStorageServiceImpl> 
metastorageServiceFuture;
+
+    private final ClusterTime clusterTime;
+
+    private final ReadOperationForCompactionTracker 
readOperationForCompactionTracker;
+
+    private final ScheduledExecutorService compactionExecutor;
+
+    /** Guarded by {@link #lock}. */
+    private @Nullable ScheduledFuture<?> lastScheduledFuture;
+
+    /** Guarded by {@link #lock}. */
+    private boolean isLocalNodeLeader;
+
+    private final Lock lock = new ReentrantLock();
+
+    /** Compaction start interval (in milliseconds). */
+    // TODO: IGNITE-23279 Change configuration
+    private final long startInterval = 
IgniteSystemProperties.getLong(COMPACTION_INTERVAL_PROPERTY, Long.MAX_VALUE);
+
+    /** Data availability time (in milliseconds). */
+    // TODO: IGNITE-23279 Change configuration
+    private final long dataAvailabilityTime = 
IgniteSystemProperties.getLong(COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY, 
Long.MAX_VALUE);
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Constructor.
+     *
+     * @param localNodeName Local node name.
+     * @param storage Storage.
+     * @param metastorageServiceFuture Metastorage service future.
+     * @param clusterTime Cluster time.
+     * @param readOperationForCompactionTracker Tracker of read operations, 
both local and from the leader.
+     */
+    MetaStorageCompactionTrigger(
+            String localNodeName,
+            KeyValueStorage storage,
+            CompletableFuture<MetaStorageServiceImpl> metastorageServiceFuture,
+            ClusterTime clusterTime,
+            ReadOperationForCompactionTracker readOperationForCompactionTracker
+    ) {
+        this.localNodeName = localNodeName;
+        this.storage = storage;
+        this.metastorageServiceFuture = metastorageServiceFuture;
+        this.clusterTime = clusterTime;
+        this.readOperationForCompactionTracker = 
readOperationForCompactionTracker;
+
+        compactionExecutor = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(localNodeName, 
"metastorage-compaction-executor", LOG)
+        );
+    }
+
+    @Override
+    public void close() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        cancelLastScheduledFutureBusy();
+
+        IgniteUtils.shutdownAndAwaitTermination(compactionExecutor, 10, 
TimeUnit.SECONDS);
+    }
+
+    private void doCompactionBusy() {
+        lock.lock();
+
+        try {
+            if (!isLocalNodeLeader) {
+                return;
+            }
+
+            HybridTimestamp candidateCompactionRevisionTimestamp = 
createCandidateCompactionRevisionTimestampBusy();
+
+            Long newCompactionRevision = 
calculateCandidateCompactionRevisionBusy(candidateCompactionRevisionTimestamp);
+
+            if (newCompactionRevision == null) {
+                scheduleNextCompactionBusy();
+            } else {
+                metastorageServiceFuture.thenCompose(svc -> 
svc.sendCompactionCommand(newCompactionRevision))
+                        .whenComplete((unused, throwable) -> {
+                            if (throwable != null) {
+                                Throwable cause = unwrapCause(throwable);
+
+                                if (!(cause instanceof NodeStoppingException)) 
{
+                                    LOG.error(
+                                            "Unknown error occurred while 
sending the metastorage compaction command: "
+                                                    + 
"[newCompactionRevision={}]",
+                                            cause,
+                                            newCompactionRevision
+                                    );
+
+                                    inBusyLockSafe(busyLock, 
this::scheduleNextCompactionBusy);
+                                }
+                            }
+                        });
+            }
+        } catch (Throwable t) {
+            LOG.error("Unknown error on new metastorage compaction revision 
scheduling", t);
+
+            inBusyLockSafe(busyLock, this::scheduleNextCompactionBusy);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private HybridTimestamp createCandidateCompactionRevisionTimestampBusy() {
+        HybridTimestamp safeTime = clusterTime.currentSafeTime();
+
+        return safeTime.getPhysical() <= dataAvailabilityTime
+                ? HybridTimestamp.MIN_VALUE
+                : safeTime.subtractPhysicalTime(dataAvailabilityTime);
+    }
+
+    /** Returns {@code null} if there is no need to compact yet. */
+    private @Nullable Long 
calculateCandidateCompactionRevisionBusy(HybridTimestamp candidateTimestamp) {
+        try {
+            long candidateCompactionRevision = 
storage.revisionByTimestamp(candidateTimestamp);
+            long currentStorageRevision = storage.revision();
+
+            if (candidateCompactionRevision >= currentStorageRevision) {
+                candidateCompactionRevision = currentStorageRevision - 1;
+            }
+
+            return candidateCompactionRevision <= 
storage.getCompactionRevision() ? null : candidateCompactionRevision;
+        } catch (CompactedException exception) {
+            // Revision has already been compacted, we need to plan the next 
compaction.
+            return null;
+        }
+    }
+
+    private void scheduleNextCompactionBusy() {
+        lock.lock();
+
+        try {
+            if (isLocalNodeLeader) {
+                lastScheduledFuture = compactionExecutor.schedule(
+                        () -> inBusyLock(busyLock, this::doCompactionBusy),
+                        startInterval,
+                        MILLISECONDS
+                );
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /** Invokes when the metastorage compaction revision is updated. */
+    void onCompactionRevisionUpdate(long compactionRevision) {
+        inBusyLockSafe(busyLock, () -> 
onCompactionRevisionUpdateBusy(compactionRevision));
+    }
+
+    private void onCompactionRevisionUpdateBusy(long compactionRevision) {
+        supplyAsync(() -> 
readOperationForCompactionTracker.collect(compactionRevision), 
compactionExecutor)
+                .thenComposeAsync(Function.identity(), compactionExecutor)
+                .thenRunAsync(() -> storage.compact(compactionRevision), 
compactionExecutor)
+                .whenComplete((unused, throwable) -> {
+                    if (throwable == null) {
+                        LOG.info("Metastore compaction completed successfully: 
[compactionRevision={}]", compactionRevision);
+                    } else {
+                        Throwable cause = unwrapCause(throwable);
+
+                        if (!(cause instanceof NodeStoppingException)) {
+                            LOG.error(
+                                    "Unknown error on new metastorage 
compaction revision: {}",
+                                    cause,
+                                    compactionRevision
+                            );
+                        }
+                    }
+
+                    inBusyLockSafe(busyLock, this::scheduleNextCompactionBusy);
+                });
+    }
+
+    /** Invokes when a new leader is elected. */
+    void onLeaderElected(ClusterNode newLeader) {
+        inBusyLockSafe(busyLock, () -> onLeaderElectedBusy(newLeader));
+    }
+
+    private void onLeaderElectedBusy(ClusterNode newLeader) {
+        lock.lock();
+
+        try {
+            if (localNodeName.equals(newLeader.name())) {
+                isLocalNodeLeader = true;
+
+                scheduleNextCompactionBusy();
+            } else {
+                isLocalNodeLeader = false;
+
+                cancelLastScheduledFutureBusy();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void cancelLastScheduledFutureBusy() {
+        lock.lock();
+
+        try {
+            ScheduledFuture<?> lastScheduledFuture = this.lastScheduledFuture;
+
+            if (lastScheduledFuture != null) {
+                lastScheduledFuture.cancel(true);
+            }
+
+            this.lastScheduledFuture = null;
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 000e9bdea0..42006ec065 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -71,8 +71,8 @@ import 
org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import 
org.apache.ignite.internal.metastorage.impl.raft.MetaStorageSnapshotStorageFactory;
 import org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import 
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -192,6 +192,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     /** Tracks only reads from the leader, local reads are tracked by the 
storage itself. */
     private final ReadOperationForCompactionTracker 
readOperationFromLeaderForCompactionTracker;
 
+    private final MetaStorageCompactionTrigger metaStorageCompactionTrigger;
+
     /**
      * The constructor.
      *
@@ -234,6 +236,16 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         this.readOperationFromLeaderForCompactionTracker = 
readOperationForCompactionTracker;
 
         learnerManager = new MetaStorageLearnerManager(busyLock, 
logicalTopologyService, metaStorageSvcFut);
+
+        metaStorageCompactionTrigger = new MetaStorageCompactionTrigger(
+                clusterService.nodeName(),
+                storage,
+                metaStorageSvcFut,
+                clusterTime,
+                readOperationForCompactionTracker
+        );
+
+        electionListeners.add(metaStorageCompactionTrigger::onLeaderElected);
     }
 
     /**
@@ -707,6 +719,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
         try {
             IgniteUtils.closeAllManually(
+                    metaStorageCompactionTrigger,
                     () -> 
metricManager.unregisterSource(metaStorageMetricSource),
                     clusterTime,
                     () -> cancelOrConsume(metaStorageSvcFut, 
MetaStorageServiceImpl::close),
@@ -754,7 +767,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         try {
             return recoveryFinishedFuture
                     .thenAccept(revision -> inBusyLock(busyLock, () -> {
-                        storage.startWatches(revision + 1, new 
OnRevisionAppliedCallback() {
+                        storage.startWatches(revision + 1, new 
WatchEventHandlingCallback() {
                             @Override
                             public void onSafeTimeAdvanced(HybridTimestamp 
newSafeTime) {
                                 
MetaStorageManagerImpl.this.onSafeTimeAdvanced(newSafeTime);
@@ -764,6 +777,11 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
                             public void onRevisionApplied(long revision) {
                                 
MetaStorageManagerImpl.this.onRevisionApplied(revision);
                             }
+
+                            @Override
+                            public void onCompactionRevisionUpdated(long 
compactionRevision) {
+                                
metaStorageCompactionTrigger.onCompactionRevisionUpdate(compactionRevision);
+                            }
                         });
                     }))
                     .whenComplete((v, e) -> {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 90703ee62d..dbd4e3f1a7 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
 import 
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
@@ -358,4 +359,19 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
     ) {
         return 
commandsFactory.evictIdempotentCommandsCacheCommand().evictionTimestamp(evictionTimestamp).initiatorTime(ts).build();
     }
+
+    /**
+     * Sends command {@link CompactionCommand} to the leader.
+     *
+     * @param compactionRevision New metastorage compaction revision.
+     * @return Operation future.
+     */
+    CompletableFuture<Void> sendCompactionCommand(long compactionRevision) {
+        CompactionCommand command = 
context.commandsFactory().compactionCommand()
+                .compactionRevision(compactionRevision)
+                .initiatorTime(clusterTime.now())
+                .build();
+
+        return context.raftService().run(command);
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index f24850613e..c2865b1e30 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -17,14 +17,12 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,16 +31,13 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.LongConsumer;
 import java.util.function.Predicate;
-import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureManager;
-import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -93,29 +88,22 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
     protected final ReadOperationForCompactionTracker 
readOperationForCompactionTracker;
 
-    protected final ExecutorService compactionExecutor;
-
-    private final List<CompactionListener> compactionListeners = new 
CopyOnWriteArrayList<>();
-
     /**
      * Constructor.
      *
      * @param nodeName Node name.
      * @param failureManager Failure processor that is used to handle critical 
errors.
      * @param readOperationForCompactionTracker Read operation tracker for 
metastorage compaction.
-     * @param compactionExecutor Metastorage compaction executor.
      */
     protected AbstractKeyValueStorage(
             String nodeName,
             FailureManager failureManager,
-            ReadOperationForCompactionTracker 
readOperationForCompactionTracker,
-            ExecutorService compactionExecutor
+            ReadOperationForCompactionTracker readOperationForCompactionTracker
     ) {
         this.failureManager = failureManager;
         this.readOperationForCompactionTracker = 
readOperationForCompactionTracker;
-        this.compactionExecutor = compactionExecutor;
 
-        this.watchProcessor = new WatchProcessor(nodeName, this::get, 
failureManager);
+        watchProcessor = new WatchProcessor(nodeName, this::get, 
failureManager);
     }
 
     /** Returns the key revisions for operation, an empty array if not found. 
*/
@@ -197,6 +185,23 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
+    @Override
+    public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context) {
+        assert revision >= 0 : revision;
+
+        rwLock.writeLock().lock();
+
+        try {
+            assertCompactionRevisionLessThanCurrent(revision, rev);
+
+            saveCompactionRevision(revision, context, true);
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    protected abstract void saveCompactionRevision(long compactionRevision, 
KeyValueUpdateContext context, boolean advanceSafeTime);
+
     @Override
     public void setCompactionRevision(long revision) {
         assert revision >= 0 : revision;
@@ -224,7 +229,7 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public void startCompaction(long compactionRevision) {
+    public void updateCompactionRevision(long compactionRevision, 
KeyValueUpdateContext context) {
         assert compactionRevision >= 0 : compactionRevision;
 
         rwLock.writeLock().lock();
@@ -232,26 +237,12 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
         try {
             assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
 
+            saveCompactionRevision(compactionRevision, context, false);
+
             if (isInRecoveryState()) {
-                this.compactionRevision = compactionRevision;
+                setCompactionRevision(compactionRevision);
             } else {
-                watchProcessor
-                        .addTaskToWatchEventQueue(() -> 
setCompactionRevision(compactionRevision))
-                        .thenComposeAsync(unused -> 
readOperationsFuture(compactionRevision), compactionExecutor)
-                        .thenRunAsync(() -> compact(compactionRevision), 
compactionExecutor)
-                        .whenCompleteAsync((unused, throwable) -> {
-                            if (throwable == null) {
-                                log.info("Metastore compaction completed 
successfully: [compactionRevision={}]", compactionRevision);
-                            } else {
-                                log.error(
-                                        "Metastore compaction failed: 
[compactionRevision={}]",
-                                        unwrapCause(throwable),
-                                        compactionRevision
-                                );
-                            }
-
-                            
notifyCompleteCompactionLocally(this.compactionRevision, throwable);
-                        }, compactionExecutor);
+                watchProcessor.updateCompactionRevision(compactionRevision, 
context.timestamp);
             }
         } finally {
             rwLock.writeLock().unlock();
@@ -333,21 +324,6 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
         watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
     }
 
-    @Override
-    public CompletableFuture<Void> readOperationsFuture(long 
compactionRevisionExcluded) {
-        return 
readOperationForCompactionTracker.collect(compactionRevisionExcluded);
-    }
-
-    @Override
-    public void registerCompactionListener(CompactionListener listener) {
-        compactionListeners.add(listener);
-    }
-
-    @Override
-    public void unregisterCompactionListener(CompactionListener listener) {
-        compactionListeners.remove(listener);
-    }
-
     /** Notifies of revision update. Must be called under the {@link #rwLock}. 
*/
     protected void notifyRevisionUpdate() {
         if (recoveryRevisionListener != null) {
@@ -441,25 +417,24 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
         return res;
     }
 
-    private void notifyCompleteCompactionLocally(long compactionRevision, 
@Nullable Throwable throwable) {
-        if (throwable != null) {
-            doCriticalErrorIfNotNodeStoppingException(throwable);
-        }
+    protected WatchEventHandlingCallback 
createWrapper(WatchEventHandlingCallback callback) {
+        return new WatchEventHandlingCallback() {
+            @Override
+            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+                callback.onSafeTimeAdvanced(newSafeTime);
+            }
 
-        for (CompactionListener listener : compactionListeners) {
-            try {
-                listener.onCompactionCompleteLocally(compactionRevision);
-            } catch (Throwable t) {
-                doCriticalErrorIfNotNodeStoppingException(t);
+            @Override
+            public void onRevisionApplied(long revision) {
+                callback.onRevisionApplied(revision);
             }
-        }
-    }
 
-    private void doCriticalErrorIfNotNodeStoppingException(Throwable 
throwable) {
-        throwable = unwrapCause(throwable);
+            @Override
+            public void onCompactionRevisionUpdated(long compactionRevision) {
+                setCompactionRevision(compactionRevision);
 
-        if (!(throwable instanceof NodeStoppingException)) {
-            failureManager.process(new FailureContext(CRITICAL_ERROR, 
throwable));
-        }
+                callback.onCompactionRevisionUpdated(compactionRevision);
+            }
+        };
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 268cf73028..e278e74850 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -378,10 +378,9 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      * <p>Before calling this method, watches will not receive any updates.</p>
      *
      * @param startRevision Revision to start processing updates from.
-     * @param revisionCallback Callback that will be invoked after all watches 
of a particular revision are processed, with the
-     *         revision and modified entries (processed by at least one watch) 
as its argument.
+     * @param callback Watch event handling callback.
      */
-    void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback);
+    void startWatches(long startRevision, WatchEventHandlingCallback callback);
 
     /**
      * Unregisters a watch listener.
@@ -424,28 +423,6 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      */
     void compact(long revision);
 
-    /**
-     * Starts local compaction of metastorage.
-     *
-     * <p>Algorithm:</p>
-     * <ul>
-     *     <li>If the storage is in a recovery state ({@link #startWatches all 
registered watches not started}), then
-     *     {@link #setCompactionRevision} is invoked and the current method is 
completed.</li>
-     *     <li>Otherwise, a new task (A) is added to the WatchEvent queue and 
the current method is completed.</li>
-     *     <li>Task (A) invokes {@link #setCompactionRevision} and adds a new 
task (B) to the compaction thread pool and completes.</li>
-     *     <li>Task (B) collects all read operations from metastorage (local 
and from the leader {@link #readOperationsFuture}) and starts
-     *     asynchronously waiting for their completion.</li>
-     *     <li>Then {@link #compact} is invoked at the compaction thread 
pool.</li>
-     *     <li>Upon completion there will be a notification via {@link 
CompactionListener#onCompactionCompleteLocally} for
-     *     {@link #registerCompactionListener registered} listeners.</li>
-     * </ul>
-     *
-     * <p>Compaction revision is expected to be less than the {@link #revision 
current storage revision}.</p>
-     *
-     * @param compactionRevision Compaction revision.
-     */
-    void startCompaction(long compactionRevision);
-
     /**
      * Signals the need to stop local metastorage compaction as soon as 
possible. For example, due to a node stopping.
      *
@@ -561,21 +538,25 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
     long getCompactionRevision();
 
     /**
-     * Returns a future that will complete when all read operations that were 
started before {@code compactionRevisionExcluded}.
+     * Updates the metastorage compaction revision.
      *
-     * <p>Current method is expected to be invoked after {@link 
#setCompactionRevision} on the same revision.</p>
+     * <p>Algorithm:</p>
+     * <ul>
+     *     <li>Invokes {@link #saveCompactionRevision}.</li>
+     *     <li>If the storage is in a recovery state ({@link #startWatches all 
registered watches not started}), then
+     *     {@link #setCompactionRevision} is invoked and the current method is 
completed.</li>
+     *     <li>Otherwise, a new task (A) is added to the WatchEvent queue and 
the current method is completed.</li>
+     *     <li>Task (A) invokes {@link #setCompactionRevision} and invokes
+     *     {@link WatchEventHandlingCallback#onCompactionRevisionUpdated}.</li>
+     * </ul>
      *
-     * <p>Future completes without exception.</p>
+     * <p>Compaction revision is expected to be less than the {@link #revision 
current storage revision}.</p>
      *
-     * @param compactionRevisionExcluded Compaction revision of interest.
+     * @param revision Compaction revision to update.
+     * @param context Operation's context.
+     * @throws MetaStorageException If there is an error while saving a 
compaction revision.
      */
-    CompletableFuture<Void> readOperationsFuture(long 
compactionRevisionExcluded);
-
-    /** Adds a metastore compaction listener. */
-    void registerCompactionListener(CompactionListener listener);
-
-    /** Removes a metastore compaction listener. */
-    void unregisterCompactionListener(CompactionListener listener);
+    void updateCompactionRevision(long revision, KeyValueUpdateContext 
context);
 
     /**
      * Returns checksum corresponding to the revision.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
index 9e49da4c8b..2dd7dfe3e7 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.metastorage.server;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -26,7 +28,10 @@ import org.jetbrains.annotations.TestOnly;
  */
 public class KeyValueUpdateContext {
     public final long index;
+
     public final long term;
+
+    @IgniteToStringInclude
     public final HybridTimestamp timestamp;
 
     /**
@@ -44,6 +49,11 @@ public class KeyValueUpdateContext {
         this.timestamp = timestamp;
     }
 
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
     /**
      * Returns a context instance with {@code 0} index and term values.
      */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEventHandlingCallback.java
similarity index 71%
rename from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
rename to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEventHandlingCallback.java
index 48a07367bf..8c46d9b87c 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEventHandlingCallback.java
@@ -19,23 +19,33 @@ package org.apache.ignite.internal.metastorage.server;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 
-/**
- * Interface for declaring callbacks that get called after all Meta Storage 
watches have been notified of a particular revision
- * and/or when SafeTime gets advanced.
- */
-public interface OnRevisionAppliedCallback {
+/** Watch event handling callback. */
+public interface WatchEventHandlingCallback {
     /**
      * Invoked whenever MetaStorage Safe Time gets advanced (either because a 
write command is applied,
      * together with all watches that process it, or because idle safe time 
mechanism advanced Safe Time).
      *
      * @param newSafeTime New safe time value.
      */
-    void onSafeTimeAdvanced(HybridTimestamp newSafeTime);
+    default void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+        // No-op.
+    }
 
     /**
      * Notifies of completion of processing of Meta Storage watches for a 
particular revision.
      *
      * @param revision Latest applied meta-storage revision.
      */
-    void onRevisionApplied(long revision);
+    default void onRevisionApplied(long revision) {
+        // No-op.
+    }
+
+    /**
+     * Invokes when the metastorage compaction revision has been updated in 
the WatchEvent queue.
+     *
+     * @param compactionRevision New metastorage compaction revision.
+     */
+    default void onCompactionRevisionUpdated(long compactionRevision) {
+        // No-op.
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 68e6e57d5d..c717b9420b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -94,8 +94,7 @@ public class WatchProcessor implements ManuallyCloseable {
 
     private final EntryReader entryReader;
 
-    /** Callback that gets notified after a {@link WatchEvent} has been 
processed by a registered watch. */
-    private volatile OnRevisionAppliedCallback revisionCallback;
+    private volatile WatchEventHandlingCallback watchEventHandlingCallback;
 
     /** Executor for processing watch events. */
     private final ExecutorService watchExecutor;
@@ -141,13 +140,11 @@ public class WatchProcessor implements ManuallyCloseable {
                 .min();
     }
 
-    /**
-     * Sets the callback that will be executed every time after watches have 
been notified of a particular revision.
-     */
-    public void setRevisionCallback(OnRevisionAppliedCallback 
revisionCallback) {
-        assert this.revisionCallback == null;
+    /** Sets the watch event handling callback. */
+    public void setWatchEventHandlingCallback(WatchEventHandlingCallback 
callback) {
+        assert this.watchEventHandlingCallback == null;
 
-        this.revisionCallback = revisionCallback;
+        this.watchEventHandlingCallback = callback;
     }
 
     /**
@@ -327,9 +324,9 @@ public class WatchProcessor implements ManuallyCloseable {
     }
 
     private void invokeOnRevisionCallback(long revision, HybridTimestamp time) 
{
-        revisionCallback.onSafeTimeAdvanced(time);
+        watchEventHandlingCallback.onSafeTimeAdvanced(time);
 
-        revisionCallback.onRevisionApplied(revision);
+        watchEventHandlingCallback.onRevisionApplied(revision);
     }
 
     /**
@@ -341,7 +338,7 @@ public class WatchProcessor implements ManuallyCloseable {
         assert time != null;
 
         notificationFuture = notificationFuture
-                .thenRunAsync(() -> revisionCallback.onSafeTimeAdvanced(time), 
watchExecutor)
+                .thenRunAsync(() -> 
watchEventHandlingCallback.onSafeTimeAdvanced(time), watchExecutor)
                 .whenComplete((ignored, e) -> {
                     if (e != null) {
                         failureManager.process(new 
FailureContext(CRITICAL_ERROR, e));
@@ -383,15 +380,24 @@ public class WatchProcessor implements ManuallyCloseable {
     }
 
     /**
-     * Returns a future that will complete when the task in the WatchEvent 
queue is complete.
+     * Updates the metastorage compaction revision in the WatchEvent queue.
      *
      * <p>This method is not thread-safe and must be performed under an 
exclusive lock in concurrent scenarios.</p>
+     *
+     * @param compactionRevision New metastorage compaction revision.
+     * @param time Metastorage compaction revision update timestamp.
      */
-    public CompletableFuture<Void> addTaskToWatchEventQueue(Runnable task) {
-        CompletableFuture<Void> future = notificationFuture.thenRunAsync(task, 
watchExecutor);
-
-        notificationFuture = future;
+    public void updateCompactionRevision(long compactionRevision, 
HybridTimestamp time) {
+        notificationFuture = notificationFuture
+                .thenRunAsync(() -> {
+                    
watchEventHandlingCallback.onCompactionRevisionUpdated(compactionRevision);
 
-        return future;
+                    watchEventHandlingCallback.onSafeTimeAdvanced(time);
+                }, watchExecutor)
+                .whenComplete((ignored, e) -> {
+                    if (e != null) {
+                        failureManager.process(new 
FailureContext(CRITICAL_ERROR, e));
+                    }
+                });
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index dcee785092..6e42e4ba7b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -71,7 +71,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -88,10 +87,10 @@ import org.apache.ignite.internal.metastorage.server.If;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
 import org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
-import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import org.apache.ignite.internal.metastorage.server.Statement;
 import org.apache.ignite.internal.metastorage.server.Value;
+import 
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
 import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
@@ -266,12 +265,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         super(
                 nodeName,
                 failureManager,
-                readOperationForCompactionTracker,
-                Executors.newSingleThreadExecutor(NamedThreadFactory.create(
-                        nodeName,
-                        "metastorage-compaction-executor",
-                        Loggers.forClass(RocksDbKeyValueStorage.class)
-                ))
+                readOperationForCompactionTracker
         );
 
         this.dbPath = dbPath;
@@ -421,7 +415,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         watchProcessor.close();
 
         IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);
-        IgniteUtils.shutdownAndAwaitTermination(compactionExecutor, 10, 
TimeUnit.SECONDS);
 
         rwLock.writeLock().lock();
         try {
@@ -867,7 +860,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
+    public void startWatches(long startRevision, WatchEventHandlingCallback 
callback) {
         assert startRevision > 0 : startRevision;
 
         long currentRevision;
@@ -875,7 +868,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         rwLock.readLock().lock();
 
         try {
-            watchProcessor.setRevisionCallback(revisionCallback);
+            
watchProcessor.setWatchEventHandlingCallback(createWrapper(callback));
 
             currentRevision = rev;
 
@@ -1278,27 +1271,19 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context) {
-        assert revision >= 0 : revision;
-
-        rwLock.writeLock().lock();
-
+    protected void saveCompactionRevision(long revision, KeyValueUpdateContext 
context, boolean advanceSafeTime) {
         try (WriteBatch batch = new WriteBatch()) {
-            assertCompactionRevisionLessThanCurrent(revision, rev);
-
             data.put(batch, COMPACTION_REVISION_KEY, longToBytes(revision));
 
             addIndexAndTermToWriteBatch(batch, context);
 
             db.write(defaultWriteOptions, batch);
 
-            if (!isInRecoveryState()) {
+            if (advanceSafeTime && !isInRecoveryState()) {
                 watchProcessor.advanceSafeTime(context.timestamp);
             }
         } catch (Throwable t) {
             throw new MetaStorageException(COMPACTION_ERR, "Error saving 
compaction revision: " + revision, t);
-        } finally {
-            rwLock.writeLock().unlock();
         }
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index cbc49a8581..a2947cf298 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -68,21 +68,13 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
 
     private final RaftGroupConfigurationConverter configurationConverter = new 
RaftGroupConfigurationConverter();
 
-    /**
-     * Constructor.
-     *
-     * @param storage Storage.
-     */
+    /** Constructor. */
     @TestOnly
     public MetaStorageListener(KeyValueStorage storage, ClusterTimeImpl 
clusterTime) {
         this(storage, clusterTime, newConfig -> {});
     }
 
-    /**
-     * Constructor.
-     *
-     * @param storage Storage.
-     */
+    /** Constructor. */
     public MetaStorageListener(
             KeyValueStorage storage,
             ClusterTimeImpl clusterTime,
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 5075111d7d..a97a9a60de 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
 import 
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
 import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
@@ -184,7 +185,7 @@ public class MetaStorageWriteHandler {
     private void handleWriteWithTime(CommandClosure<WriteCommand> clo, 
MetaStorageWriteCommand command, long index, long term) {
         HybridTimestamp opTime = command.safeTime();
 
-        KeyValueUpdateContext context = new KeyValueUpdateContext(index, term, 
opTime);
+        var context = new KeyValueUpdateContext(index, term, opTime);
 
         if (command instanceof PutCommand) {
             PutCommand putCmd = (PutCommand) command;
@@ -227,6 +228,14 @@ public class MetaStorageWriteHandler {
             evictIdempotentCommandsCache(cmd.evictionTimestamp(), context);
 
             clo.result(null);
+        } else if (command instanceof CompactionCommand) {
+            CompactionCommand cmd = (CompactionCommand) command;
+
+            storage.updateCompactionRevision(cmd.compactionRevision(), 
context);
+
+            clo.result(null);
+        } else {
+            throw new AssertionError(String.format("Unsupported command: 
[context=%s, command=%s]", context, command));
         }
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
index 0739f719f5..e2f7535ac5 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
@@ -27,7 +27,7 @@ public enum MetastorageGroupId implements ReplicationGroupId {
     INSTANCE("metastorage_group");
 
     /** Group id string representation. */
-    private String name;
+    private final String name;
 
     /**
      * The constructor.
@@ -38,7 +38,6 @@ public enum MetastorageGroupId implements ReplicationGroupId {
         this.name = name;
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString() {
         return name;
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index d298ebef39..bc8416d84d 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -26,7 +26,6 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -52,6 +51,7 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.IndexWithTerm;
@@ -60,6 +60,7 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -85,6 +86,9 @@ public abstract class AbstractCompactionKeyValueStorageTest 
extends AbstractKeyV
 
     private final ClusterTimeImpl clusterTime = new ClusterTimeImpl(NODE_NAME, 
new IgniteSpinBusyLock(), clock);
 
+    private final PendingComparableValuesTracker<Long, Void> 
updateCompactionRevisionInWatchEvenQueue
+            = new PendingComparableValuesTracker<>(Long.MIN_VALUE);
+
     ReadOperationForCompactionTracker readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
     @Override
@@ -282,7 +286,7 @@ public abstract class AbstractCompactionKeyValueStorageTest 
extends AbstractKeyV
         HybridTimestamp now0 = clock.now();
         HybridTimestamp now1 = clock.now();
 
-        startListenUpdateSafeTime();
+        startWatches();
 
         storage.saveCompactionRevision(0, new KeyValueUpdateContext(1, 1, 
now0));
         assertEquals(-1, storage.getCompactionRevision());
@@ -936,12 +940,12 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
 
     @Test
     void testReadOperationsFutureWithoutReadOperations() {
-        assertTrue(storage.readOperationsFuture(0).isDone());
-        assertTrue(storage.readOperationsFuture(1).isDone());
+        assertTrue(readOperationForCompactionTracker.collect(0).isDone());
+        assertTrue(readOperationForCompactionTracker.collect(1).isDone());
     }
 
     /**
-     * Tests {@link KeyValueStorage#readOperationsFuture} as expected in use.
+     * Tests tracking only read operations from the storage, reading from the 
leader tracks {@link MetaStorageManagerImpl} itself.
      * <ul>
      *     <li>Create read operations, we only need cursors since reading one 
entry or a batch is synchronized with
      *     {@link KeyValueStorage#setCompactionRevision}.</li>
@@ -959,7 +963,7 @@ public abstract class AbstractCompactionKeyValueStorageTest 
extends AbstractKeyV
         try {
             storage.setCompactionRevision(3);
 
-            CompletableFuture<Void> readOperationsFuture = 
storage.readOperationsFuture(3);
+            CompletableFuture<Void> readOperationsFuture = 
readOperationForCompactionTracker.collect(3);
             assertFalse(readOperationsFuture.isDone());
 
             range0.stream().forEach(entry -> {});
@@ -980,7 +984,7 @@ public abstract class AbstractCompactionKeyValueStorageTest 
extends AbstractKeyV
 
     /**
      * Tests that cursors created after {@link 
KeyValueStorage#setCompactionRevision} will not affect future from
-     * {@link KeyValueStorage#readOperationsFuture} on a new compaction 
revision.
+     * {@link ReadOperationForCompactionTracker#collect} on a new compaction 
revision.
      */
     @Test
     void testReadOperationsFutureForReadOperationAfterSetCompactionRevision() 
throws Exception {
@@ -994,7 +998,7 @@ public abstract class AbstractCompactionKeyValueStorageTest 
extends AbstractKeyV
             rangeAfterSetCompactionRevision0 = storage.range(FOO_KEY, FOO_KEY);
             rangeAfterSetCompactionRevision1 = storage.range(FOO_KEY, FOO_KEY, 
5);
 
-            CompletableFuture<Void> readOperationsFuture = 
storage.readOperationsFuture(3);
+            CompletableFuture<Void> readOperationsFuture = 
readOperationForCompactionTracker.collect(3);
             assertFalse(readOperationsFuture.isDone());
 
             rangeBeforeSetCompactionRevision.close();
@@ -1011,56 +1015,24 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         }
     }
 
-    /** Tests {@link KeyValueStorage#startCompaction} case from method 
description when storage is in recovery state. */
+    /** Tests {@link KeyValueStorage#updateCompactionRevision} case from 
method description when storage is in recovery state. */
     @Test
-    void testStartCompactionWithoutStartWatches() {
-        var completeCompactionFuture = new CompletableFuture<Long>();
-        storage.registerCompactionListener(completeCompactionFuture::complete);
-
-        storage.startCompaction(3);
-        assertEquals(3, storage.getCompactionRevision());
-
-        assertThat(completeCompactionFuture, willTimeoutFast());
+    void testUpdateCompactionRevisionWithoutStartWatches() {
+        storage.updateCompactionRevision(1, kvContext(clock.now()));
+        assertEquals(1, storage.getCompactionRevision());
 
-        // Let's make sure that nothing happens after the watch starts.
-        startWatches();
-        assertEquals(3, storage.getCompactionRevision());
-        assertThat(completeCompactionFuture, willTimeoutFast());
+        assertThat(updateCompactionRevisionInWatchEvenQueue.waitFor(1L), 
willTimeoutFast());
     }
 
-    /** Tests {@link KeyValueStorage#startCompaction} case from method 
description when storage is <b>not</b> in recovery state. */
+    /** Tests {@link KeyValueStorage#updateCompactionRevision} case from 
method description when storage is <b>not</b> in recovery state. */
     @Test
-    void testStartCompaction() {
-        var completeCompactionFuture = new CompletableFuture<Long>();
-        storage.registerCompactionListener(completeCompactionFuture::complete);
-
-        var startHandleWatchEventFuture = new CompletableFuture<Void>();
-        var finishHandleWatchEventFuture = new CompletableFuture<Void>();
-
-        watchExact(FOO_KEY, startHandleWatchEventFuture, 
finishHandleWatchEventFuture);
+    void testUpdateCompactionRevision() {
         startWatches();
 
-        storage.put(FOO_KEY, SOME_VALUE, kvContext(clock.now()));
-        assertThat(startHandleWatchEventFuture, willCompleteSuccessfully());
-
-        storage.startCompaction(3);
-
-        Cursor<Entry> rangeCursor = storage.range(FOO_KEY, FOO_KEY);
+        storage.updateCompactionRevision(1, kvContext(clock.now()));
 
-        // Since we blocked the WatchEvent queue, we can't complete the 
compaction.
-        assertThat(completeCompactionFuture, willTimeoutFast());
-        assertEquals(-1, storage.getCompactionRevision());
-
-        finishHandleWatchEventFuture.complete(null);
-
-        // We have unlocked the WatchEvent queue, but the cursor has not yet 
been closed.
-        assertThat(completeCompactionFuture, willTimeoutFast());
-        assertEquals(3, storage.getCompactionRevision());
-
-        rangeCursor.close();
-
-        assertThat(completeCompactionFuture, willBe(3L));
-        assertEquals(3, storage.getCompactionRevision());
+        assertThat(updateCompactionRevisionInWatchEvenQueue.waitFor(1L), 
willCompleteSuccessfully());
+        assertEquals(1, storage.getCompactionRevision());
     }
 
     private List<Integer> collectRevisions(byte[] key) {
@@ -1146,23 +1118,20 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         );
     }
 
-    private void startListenUpdateSafeTime() {
-        storage.startWatches(storage.revision() + 1, new 
OnRevisionAppliedCallback() {
+    private void startWatches() {
+        storage.startWatches(storage.revision() + 1, new 
WatchEventHandlingCallback() {
             @Override
             public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
                 clusterTime.updateSafeTime(newSafeTime);
             }
 
             @Override
-            public void onRevisionApplied(long revision) {
+            public void onCompactionRevisionUpdated(long compactionRevision) {
+                
updateCompactionRevisionInWatchEvenQueue.update(compactionRevision, null);
             }
         });
     }
 
-    private void startWatches() {
-        startListenUpdateSafeTime();
-    }
-
     private void watchExact(
             byte[] key,
             CompletableFuture<Void> startHandleWatchEventFuture,
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 9810f5d3c4..898027b10e 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1611,17 +1611,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches(1, new OnRevisionAppliedCallback() {
-            @Override
-            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
-                // No-op.
-            }
-
-            @Override
-            public void onRevisionApplied(long revision) {
-                // No-op.
-            }
-        });
+        storage.startWatches(1, new WatchEventHandlingCallback() {});
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
@@ -1938,7 +1928,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         storage.watchExact(key, 1, mockListener2);
         storage.watchExact(key, 1, mockListener3);
 
-        OnRevisionAppliedCallback mockCallback = 
mock(OnRevisionAppliedCallback.class);
+        WatchEventHandlingCallback mockCallback = 
mock(WatchEventHandlingCallback.class);
 
         storage.startWatches(1, mockCallback);
 
@@ -2256,7 +2246,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
             }
         });
 
-        storage.startWatches(1, new OnRevisionAppliedCallback() {
+        storage.startWatches(1, new WatchEventHandlingCallback() {
             @Override
             public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
                 // No-op.
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index d7e1f4c90c..8b04f25eed 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -58,11 +58,11 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
             WatchProcessorTest::oldEntry,
             mock(FailureManager.class));
 
-    private final OnRevisionAppliedCallback revisionCallback = 
mock(OnRevisionAppliedCallback.class);
+    private final WatchEventHandlingCallback watchEventHandlingCallback = 
mock(WatchEventHandlingCallback.class);
 
     @BeforeEach
     void setUp() {
-        watchProcessor.setRevisionCallback(revisionCallback);
+        
watchProcessor.setWatchEventHandlingCallback(watchEventHandlingCallback);
     }
 
     @AfterEach
@@ -94,7 +94,7 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
         verify(listener1).onUpdate(new WatchEvent(entryEvent1));
         verify(listener2).onUpdate(new WatchEvent(entryEvent2));
 
-        verify(revisionCallback).onRevisionApplied(1L);
+        verify(watchEventHandlingCallback).onRevisionApplied(1L);
     }
 
     /**
@@ -121,7 +121,7 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
 
         verify(listener1).onUpdate(event);
 
-        verify(revisionCallback).onRevisionApplied(1L);
+        verify(watchEventHandlingCallback).onRevisionApplied(1L);
 
         ts = new HybridTimestamp(2, 3);
 
@@ -133,7 +133,7 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
 
         verify(listener2).onUpdate(event);
 
-        verify(revisionCallback).onRevisionApplied(2L);
+        verify(watchEventHandlingCallback).onRevisionApplied(2L);
     }
 
     /**
@@ -161,7 +161,7 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
         verify(listener2).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
         verify(listener2).onError(any(IllegalStateException.class));
 
-        verify(revisionCallback, never()).onRevisionApplied(anyLong());
+        verify(watchEventHandlingCallback, 
never()).onRevisionApplied(anyLong());
     }
 
     /**
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 749d5f569a..2d2de27071 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -50,12 +50,9 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ForkJoinPool;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -74,8 +71,6 @@ import org.jetbrains.annotations.Nullable;
  * Simple in-memory key/value storage for tests.
  */
 public class SimpleInMemoryKeyValueStorage extends AbstractKeyValueStorage {
-    private static final IgniteLogger LOG = 
Loggers.forClass(SimpleInMemoryKeyValueStorage.class);
-
     /**
      * Keys index. Value is the list of all revisions under which entry 
corresponding to the key was modified.
      *
@@ -147,7 +142,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
 
     /** Constructor. */
     public SimpleInMemoryKeyValueStorage(String nodeName, 
ReadOperationForCompactionTracker readOperationForCompactionTracker) {
-        super(nodeName, new NoOpFailureManager(), 
readOperationForCompactionTracker, ForkJoinPool.commonPool());
+        super(nodeName, new NoOpFailureManager(), 
readOperationForCompactionTracker);
     }
 
     @Override
@@ -465,7 +460,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
+    public void startWatches(long startRevision, WatchEventHandlingCallback 
callback) {
         assert startRevision > 0 : startRevision;
 
         rwLock.readLock().lock();
@@ -473,7 +468,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         try {
             areWatchesEnabled = true;
 
-            watchProcessor.setRevisionCallback(revisionCallback);
+            
watchProcessor.setWatchEventHandlingCallback(createWrapper(callback));
 
             replayUpdates(startRevision);
         } finally {
@@ -770,23 +765,13 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context) {
-        assert revision >= 0 : revision;
-
-        rwLock.writeLock().lock();
-
-        try {
-            assertCompactionRevisionLessThanCurrent(revision, rev);
-
-            savedCompactionRevision = revision;
+    public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context, boolean advanceSafeTime) {
+        savedCompactionRevision = revision;
 
-            setIndexAndTerm(context.index, context.term);
+        setIndexAndTerm(context.index, context.term);
 
-            if (!isInRecoveryState()) {
-                watchProcessor.advanceSafeTime(context.timestamp);
-            }
-        } finally {
-            rwLock.writeLock().unlock();
+        if (advanceSafeTime && !isInRecoveryState()) {
+            watchProcessor.advanceSafeTime(context.timestamp);
         }
     }
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index db1d2e1ccc..c5e20183e6 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -701,7 +701,7 @@ public class IgniteImpl implements Ignite {
                 readOperationForCompactionTracker
         );
 
-        this.cfgStorage = new DistributedConfigurationStorage(name, 
metaStorageMgr);
+        cfgStorage = new DistributedConfigurationStorage(name, metaStorageMgr);
 
         clusterCfgMgr = new ConfigurationManager(
                 modules.distributed().rootKeys(),
@@ -1715,6 +1715,7 @@ public class IgniteImpl implements Ignite {
         return CompletableFuture.allOf(startupConfigurationUpdate, 
startupRevisionUpdate, startFuture)
                 .thenComposeAsync(t -> {
                     // Deploy all registered watches because all components 
are ready and have registered their listeners.
+                    // TODO: IGNITE-23292 Run local metastore compaction after 
start watches for the latest compacted revision
                     return metaStorageMgr.deployWatches();
                 }, startupExecutor);
     }

Reply via email to