This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7523612b38 IGNITE-19532 Happens-before for safe time propagation 
(#2098)
7523612b38 is described below

commit 7523612b3883e4cad4fe8fb38c602edeb4dbc05c
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu May 25 12:47:05 2023 +0400

    IGNITE-19532 Happens-before for safe time propagation (#2098)
---
 .../ignite/internal/hlc/HybridTimestamp.java       |   5 +
 ...=> ItMetaStorageMultipleNodesAbstractTest.java} |  64 ++++++++----
 .../ItMetaStorageMultipleNodesRocksDbTest.java}    |  18 ++--
 ...MetaStorageMultipleNodesSimpleStorageTest.java} |  14 ++-
 ...MetaStorageSafeTimePropagationAbstractTest.java | 100 +++++++++++++++++++
 ...MetaStorageSafeTimePropagationRocksDbTest.java} |  11 ++-
 ...orageSafeTimePropagationSimpleStorageTest.java} |  11 ++-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  11 ++-
 .../server/OnRevisionAppliedCallback.java          |   4 +-
 .../metastorage/server/WatchProcessor.java         |  16 ++-
 .../server/persistence/RocksDbKeyValueStorage.java | 109 ++++++++++++++++++---
 .../persistence/StorageColumnFamilyType.java       |   5 +-
 .../server/raft/MetaStorageWriteHandler.java       |   8 +-
 .../server/BasicOperationsKeyValueStorageTest.java |   8 +-
 .../RocksDbCompactionKeyValueStorageTest.java      |   2 +-
 .../server/RocksDbKeyValueStorageTest.java         |   4 +-
 ...impleInMemoryCompactionKeyValueStorageTest.java |   2 +-
 .../server/SimpleInMemoryKeyValueStorageTest.java  |   2 +-
 .../metastorage/server/WatchProcessorTest.java     |  27 ++---
 .../server/AbstractKeyValueStorageTest.java        |   2 +-
 .../server/SimpleInMemoryKeyValueStorage.java      |  18 ++--
 21 files changed, 339 insertions(+), 102 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 5cdcebdcd5..047c536cc9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -82,6 +82,11 @@ public final class HybridTimestamp implements 
Comparable<HybridTimestamp>, Seria
         }
     }
 
+    /**
+     * The constructor.
+     *
+     * @param time Long time value.
+     */
     private HybridTimestamp(long time) {
         this.time = time;
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
similarity index 89%
rename from 
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
rename to 
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 2fe20b45ee..53ef8f6d2a 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -28,6 +28,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -37,6 +38,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -57,7 +59,7 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
-import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Loza;
@@ -73,9 +75,7 @@ import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
@@ -90,7 +90,7 @@ import org.junit.jupiter.params.provider.ValueSource;
  * Tests for scenarios when Meta Storage nodes join and leave a cluster.
  */
 @ExtendWith(ConfigurationExtension.class)
-public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest {
+public abstract class ItMetaStorageMultipleNodesAbstractTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private static RaftConfiguration raftConfiguration;
 
@@ -100,7 +100,9 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private static NodeAttributesConfiguration nodeAttributes;
 
-    private static class Node {
+    public abstract KeyValueStorage createStorage(String nodeName, Path path);
+
+    private class Node {
         private final VaultManager vaultManager;
 
         private final ClusterService clusterService;
@@ -146,7 +148,7 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
                     cmgManager,
                     new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
                     raftManager,
-                    new SimpleInMemoryKeyValueStorage(name()),
+                    createStorage(name(), basePath),
                     clock
             );
         }
@@ -194,16 +196,6 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
                     .thenCompose(service -> 
service.refreshMembers(false).thenApply(v -> service.learners()))
                     .thenApply(learners -> 
learners.stream().map(Peer::consistentId).collect(toSet()));
         }
-
-        void startDroppingMessagesTo(Node recipient, Class<? extends 
NetworkMessage> msgType) {
-            ((DefaultMessagingService) clusterService.messagingService())
-                    .dropMessages((recipientConsistentId, message) ->
-                            recipient.name().equals(recipientConsistentId) && 
msgType.isInstance(message));
-        }
-
-        void stopDroppingMessages() {
-            ((DefaultMessagingService) 
clusterService.messagingService()).stopDroppingMessages();
-        }
     }
 
     private final List<Node> nodes = new ArrayList<>();
@@ -365,12 +357,50 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
 
         assertThat(allOf(firstNode.cmgManager.onJoinReady(), 
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
 
+        CompletableFuture<Void> watchCompletedFuture = new 
CompletableFuture<>();
+        CountDownLatch watchCalledLatch = new CountDownLatch(1);
+
+        ByteArray testKey = ByteArray.fromString("test-key");
+
+        // Register watch listener, so that we can control safe time 
propagation.
+        // Safe time can only be propagated when all of the listeners 
completed their futures successfully.
+        secondNode.metaStorageManager.registerExactWatch(testKey, new 
WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                watchCalledLatch.countDown();
+
+                return watchCompletedFuture;
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                // No-op.
+            }
+        });
+
+        HybridTimestamp timeBeforeOp = firstNodeTime.currentSafeTime();
+
         // Try putting data from both nodes, because any of them can be a 
leader.
         assertThat(
-                
firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new 
byte[]{0, 1, 2, 3}),
+                firstNode.metaStorageManager.put(testKey, new byte[]{0, 1, 2, 
3}),
                 willCompleteSuccessfully()
         );
 
+        // Ensure watch listener is called.
+        assertTrue(watchCalledLatch.await(1, TimeUnit.SECONDS));
+
+        // Wait until leader's safe time is propagated.
+        assertTrue(waitForCondition(() -> {
+            return firstNodeTime.currentSafeTime().compareTo(timeBeforeOp) > 0;
+        }, TimeUnit.SECONDS.toMillis(1)));
+
+        // Safe time must not be propagated to the second node at this moment.
+        assertThat(firstNodeTime.currentSafeTime(), 
greaterThan(secondNodeTime.currentSafeTime()));
+
+        // Finish watch listener notification process.
+        watchCompletedFuture.complete(null);
+
+        // After that in the nearest future safe time must be propagated.
         assertTrue(waitForCondition(() -> {
             HybridTimestamp sf1 = firstNodeTime.currentSafeTime();
             HybridTimestamp sf2 = secondNodeTime.currentSafeTime();
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
similarity index 60%
copy from 
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
copy to 
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
index 6f52b4130c..b90a88c9ac 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
@@ -15,22 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
 
 import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-/** Compaction test for the RocksDB implementation of {@link KeyValueStorage}. 
*/
-@ExtendWith(WorkDirectoryExtension.class)
-public class RocksDbCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
-    @WorkDirectory
-    private Path workDir;
 
+/** {@link ItMetaStorageMultipleNodesAbstractTest} with {@link 
RocksDbKeyValueStorage} implementation. */
+public class ItMetaStorageMultipleNodesRocksDbTest extends 
ItMetaStorageMultipleNodesAbstractTest {
     @Override
-    KeyValueStorage createStorage() {
-        return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
+    public KeyValueStorage createStorage(String nodeName, Path path) {
+        return new RocksDbKeyValueStorage(nodeName, path.resolve("ms"));
     }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesSimpleStorageTest.java
similarity index 58%
copy from 
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
copy to 
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesSimpleStorageTest.java
index 54ba8cfc14..b1eefc14d9 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesSimpleStorageTest.java
@@ -15,12 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
 
-/** Compaction test for the simple in-memory implementation of {@link 
KeyValueStorage}. */
-public class SimpleInMemoryCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
+import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+
+/** {@link ItMetaStorageMultipleNodesAbstractTest} with {@link 
SimpleInMemoryKeyValueStorage} implementation. */
+public class ItMetaStorageMultipleNodesSimpleStorageTest extends 
ItMetaStorageMultipleNodesAbstractTest {
     @Override
-    KeyValueStorage createStorage() {
-        return new SimpleInMemoryKeyValueStorage("test");
+    public KeyValueStorage createStorage(String nodeName, Path path) {
+        return new SimpleInMemoryKeyValueStorage(nodeName);
     }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
new file mode 100644
index 0000000000..7192723bb6
--- /dev/null
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import 
org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Safe time propagation tests. */
+public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends 
AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final ClusterTimeImpl time = new ClusterTimeImpl(new 
IgniteSpinBusyLock(), clock);
+
+    @BeforeEach
+    @Override
+    public void setUp() {
+        super.setUp();
+
+        storage.startWatches((e, t) -> {
+            time.updateSafeTime(t);
+
+            return CompletableFuture.completedFuture(null);
+        });
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        storage.close();
+    }
+
+    @Test
+    public void testTimePropagated() throws Exception {
+        CompletableFuture<Void> watchCompletedFuture = new 
CompletableFuture<>();
+
+        CountDownLatch watchCalledLatch = new CountDownLatch(1);
+
+        // Register watch listener, so that we can control safe time 
propagation.
+        // Safe time can only be propagated when all of the listeners 
completed their futures successfully.
+        storage.watchExact(key(0), 1, new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                watchCalledLatch.countDown();
+                return watchCompletedFuture;
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                // No-op.
+            }
+        });
+
+        HybridTimestamp opTs = clock.now();
+
+        storage.put(key(0), keyValue(0, 1), opTs);
+
+        // Ensure watch listener is called.
+        assertTrue(watchCalledLatch.await(1, TimeUnit.SECONDS));
+
+        // Safe time must not be propagated before watch notifies all 
listeners.
+        assertThat(time.currentSafeTime(), lessThan(opTs));
+
+        // Finish listener notification.
+        watchCompletedFuture.complete(null);
+
+        // Safe time must be propagated.
+        assertThat(time.waitFor(opTs), willCompleteSuccessfully());
+    }
+}
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
similarity index 72%
copy from 
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
copy to 
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
index 6f52b4130c..22cd42442c 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
@@ -15,22 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
 
 import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-/** Compaction test for the RocksDB implementation of {@link KeyValueStorage}. 
*/
+/** {@link ItMetaStorageSafeTimePropagationAbstractTest} with {@link 
RocksDbKeyValueStorage} implementation. */
 @ExtendWith(WorkDirectoryExtension.class)
-public class RocksDbCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
+public class ItMetaStorageSafeTimePropagationRocksDbTest extends 
ItMetaStorageSafeTimePropagationAbstractTest {
     @WorkDirectory
     private Path workDir;
 
     @Override
-    KeyValueStorage createStorage() {
-        return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
+    public KeyValueStorage createStorage() {
+        return new RocksDbKeyValueStorage("test", workDir);
     }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
similarity index 64%
copy from 
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
copy to 
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
index 54ba8cfc14..d412c6565e 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
 
-/** Compaction test for the simple in-memory implementation of {@link 
KeyValueStorage}. */
-public class SimpleInMemoryCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+
+/** {@link ItMetaStorageSafeTimePropagationAbstractTest} with {@link 
SimpleInMemoryKeyValueStorage} implementation. */
+public class ItMetaStorageSafeTimePropagationSimpleStorageTest extends 
ItMetaStorageSafeTimePropagationAbstractTest {
     @Override
-    KeyValueStorage createStorage() {
+    public KeyValueStorage createStorage() {
         return new SimpleInMemoryKeyValueStorage("test");
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 7bc432a27c..5bf665416d 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -150,7 +151,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            RaftNodeDisruptorConfiguration ownFsmCallerExecutorDisruptorConfig 
= new RaftNodeDisruptorConfiguration("metastorage", 1);
+            var ownFsmCallerExecutorDisruptorConfig = new 
RaftNodeDisruptorConfiguration("metastorage", 1);
 
             // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
             if (metaStorageNodes.contains(thisNodeName)) {
@@ -283,7 +284,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         try {
             // Meta Storage contract states that all updated entries under a 
particular revision must be stored in the Vault.
-            storage.startWatches(this::saveUpdatedEntriesToVault);
+            storage.startWatches(this::onRevisionApplied);
         } finally {
             busyLock.leaveBusy();
         }
@@ -599,13 +600,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /**
      * Saves processed Meta Storage revision and corresponding entries to the 
Vault.
      */
-    private CompletableFuture<Void> saveUpdatedEntriesToVault(WatchEvent 
watchEvent) {
+    private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, 
HybridTimestamp time) {
+        assert time != null;
+
         if (!busyLock.enterBusy()) {
             LOG.info("Skipping applying MetaStorage revision because the node 
is stopping");
 
             return completedFuture(null);
         }
 
+        clusterTime.updateSafeTime(time);
+
         try {
             CompletableFuture<Void> saveToVaultFuture;
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 0ce5b39072..e02534db44 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.server;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 
 /**
@@ -29,7 +30,8 @@ public interface OnRevisionAppliedCallback {
      * Notifies of completion of processing of Meta Storage watches for a 
particular revision.
      *
      * @param watchEvent Event with modified Meta Storage entries processed at 
least one Watch.
+     * @param newSafeTime Safe time of the applied revision.
      * @return Future that represents the state of the execution of the 
callback.
      */
-    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
+    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, 
HybridTimestamp newSafeTime);
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index aac4330e29..234a14dd08 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -118,7 +119,10 @@ public class WatchProcessor implements ManuallyCloseable {
     /**
      * Notifies registered watch about an update event.
      */
-    public void notifyWatches(List<Entry> updatedEntries) {
+    @SuppressWarnings("unchecked")
+    public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp 
time) {
+        assert time != null;
+
         notificationFuture = notificationFuture
                 .thenComposeAsync(v -> {
                     // Revision must be the same for all entries.
@@ -130,7 +134,7 @@ public class WatchProcessor implements ManuallyCloseable {
                             .toArray(CompletableFuture[]::new);
 
                     return allOf(notificationFutures)
-                            .thenComposeAsync(ignored -> 
invokeOnRevisionCallback(notificationFutures, newRevision), watchExecutor);
+                            .thenComposeAsync(ignored -> 
invokeOnRevisionCallback(notificationFutures, newRevision, time), 
watchExecutor);
                 }, watchExecutor);
     }
 
@@ -179,7 +183,11 @@ public class WatchProcessor implements ManuallyCloseable {
                 });
     }
 
-    private CompletableFuture<Void> 
invokeOnRevisionCallback(CompletableFuture<List<EntryEvent>>[] 
notificationFutures, long revision) {
+    private CompletableFuture<Void> invokeOnRevisionCallback(
+            CompletableFuture<List<EntryEvent>>[] notificationFutures,
+            long revision,
+            HybridTimestamp time
+    ) {
         try {
             // Only notify about entries that have been accepted by at least 
one Watch.
             var acceptedEntries = new HashSet<EntryEvent>();
@@ -193,7 +201,7 @@ public class WatchProcessor implements ManuallyCloseable {
 
             var event = new WatchEvent(acceptedEntries, revision);
 
-            return revisionCallback.onRevisionApplied(event)
+            return revisionCallback.onRevisionApplied(event, time)
                     .whenComplete((ignored, e) -> {
                         if (e != null) {
                             LOG.error("Error occurred when notifying watches", 
e);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 0d439a0b24..78ff64ef7a 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksSto
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
@@ -47,6 +48,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -156,6 +158,9 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     /** Timestamp to revision mapping column family. */
     private volatile ColumnFamily tsToRevision;
 
+    /** Revision to timestamp mapping column family. */
+    private volatile ColumnFamily revisionToTs;
+
     /** Snapshot manager. */
     private volatile RocksSnapshotManager snapshotManager;
 
@@ -197,14 +202,14 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
      * <p>Multi-threaded access is guarded by {@link #rwLock}.
      */
     @Nullable
-    private List<List<Entry>> eventCache;
+    private List<UpdatedEntries> eventCache;
 
     /**
      * Current list of updated entries.
      *
      * <p>Since this list gets read and updated only on writes (under a write 
lock), no extra synchronisation is needed.
      */
-    private final List<Entry> updatedEntries = new ArrayList<>();
+    private final UpdatedEntries updatedEntries = new UpdatedEntries();
 
     /**
      * Constructor.
@@ -242,10 +247,14 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         Options tsToRevOptions = new Options().setCreateIfMissing(true);
         ColumnFamilyOptions tsToRevFamilyOptions = new 
ColumnFamilyOptions(tsToRevOptions);
 
+        Options revToTsOptions = new Options().setCreateIfMissing(true);
+        ColumnFamilyOptions revToTsFamilyOptions = new 
ColumnFamilyOptions(revToTsOptions);
+
         return List.of(
                 new ColumnFamilyDescriptor(DATA.nameAsBytes(), 
dataFamilyOptions),
                 new ColumnFamilyDescriptor(INDEX.nameAsBytes(), 
indexFamilyOptions),
-                new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), 
tsToRevFamilyOptions)
+                new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), 
tsToRevFamilyOptions),
+                new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(), 
revToTsFamilyOptions)
         );
     }
 
@@ -254,7 +263,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
 
-        assert descriptors.size() == 3;
+        assert descriptors.size() == 4;
 
         var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
 
@@ -270,8 +279,10 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         tsToRevision = ColumnFamily.wrap(db, handles.get(2));
 
+        revisionToTs = ColumnFamily.wrap(db, handles.get(3));
+
         snapshotManager = new RocksSnapshotManager(db,
-                List.of(fullRange(data), fullRange(index), 
fullRange(tsToRevision)),
+                List.of(fullRange(data), fullRange(index), 
fullRange(tsToRevision), fullRange(revisionToTs)),
                 snapshotExecutor
         );
     }
@@ -413,7 +424,10 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
             data.put(batch, REVISION_KEY, revisionBytes);
 
             if (ts != null) {
-                tsToRevision.put(batch, hybridTsToArray(ts), revisionBytes);
+                byte[] tsBytes = hybridTsToArray(ts);
+
+                tsToRevision.put(batch, tsBytes, revisionBytes);
+                revisionToTs.put(batch, revisionBytes, tsBytes);
             }
 
             db.write(opts, batch);
@@ -422,6 +436,8 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
             updCntr = newCntr;
         }
 
+        updatedEntries.ts = ts;
+
         queueWatchEvent();
     }
 
@@ -1329,9 +1345,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                     eventCache = new ArrayList<>();
                 }
 
-                eventCache.add(List.copyOf(updatedEntries));
-
-                updatedEntries.clear();
+                eventCache.add(updatedEntries.transfer());
 
                 break;
 
@@ -1343,9 +1357,11 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     private void notifyWatches() {
-        watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+        UpdatedEntries copy = updatedEntries.transfer();
+
+        assert copy.ts != null;
 
-        updatedEntries.clear();
+        watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
     }
 
     private void replayUpdates(long upperRevision) {
@@ -1359,6 +1375,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
 
         var updatedEntries = new ArrayList<Entry>();
+        HybridTimestamp ts = null;
 
         try (
                 var upperBound = new Slice(longToBytes(upperRevision + 1));
@@ -1379,7 +1396,9 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                     if (!updatedEntries.isEmpty()) {
                         var updatedEntriesCopy = List.copyOf(updatedEntries);
 
-                        watchProcessor.notifyWatches(updatedEntriesCopy);
+                        assert ts != null;
+
+                        watchProcessor.notifyWatches(updatedEntriesCopy, ts);
 
                         updatedEntries.clear();
                     }
@@ -1387,6 +1406,10 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                     lastSeenRevision = revision;
                 }
 
+                if (ts == null) {
+                    ts = timestampByRevision(revision);
+                }
+
                 updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, 
bytesToValue(rocksValue)));
             }
 
@@ -1394,13 +1417,27 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
             // Notify about the events left after finishing the cycle above.
             if (!updatedEntries.isEmpty()) {
-                watchProcessor.notifyWatches(updatedEntries);
+                assert ts != null;
+
+                watchProcessor.notifyWatches(updatedEntries, ts);
             }
         }
 
         finishReplay();
     }
 
+    private HybridTimestamp timestampByRevision(long revision) {
+        try {
+            byte[] tsBytes = revisionToTs.get(longToBytes(revision));
+
+            assert tsBytes != null;
+
+            return HybridTimestamp.hybridTimestamp(bytesToLong(tsBytes));
+        } catch (RocksDBException e) {
+            throw new MetaStorageException(OP_EXECUTION_ERR, e);
+        }
+    }
+
     private void finishReplay() {
         // Take the lock to drain the event cache and prevent new events from 
being cached. Since event notification is asynchronous,
         // this lock shouldn't be held for long.
@@ -1408,7 +1445,11 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         try {
             if (eventCache != null) {
-                eventCache.forEach(watchProcessor::notifyWatches);
+                eventCache.forEach(entries -> {
+                    assert entries.ts != null;
+
+                    watchProcessor.notifyWatches(entries.updatedEntries, 
entries.ts);
+                });
 
                 eventCache = null;
             }
@@ -1423,4 +1464,44 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     public Path getDbPath() {
         return dbPath;
     }
+
+    private static class UpdatedEntries {
+        private final List<Entry> updatedEntries;
+
+        @Nullable
+        private HybridTimestamp ts;
+
+        public UpdatedEntries() {
+            this.updatedEntries = new ArrayList<>();
+        }
+
+        private UpdatedEntries(List<Entry> updatedEntries, HybridTimestamp ts) 
{
+            this.updatedEntries = updatedEntries;
+            this.ts = Objects.requireNonNull(ts);
+        }
+
+        boolean isEmpty() {
+            return updatedEntries.isEmpty();
+        }
+
+        void add(Entry entry) {
+            updatedEntries.add(entry);
+        }
+
+        void clear() {
+            updatedEntries.clear();
+
+            ts = null;
+        }
+
+        UpdatedEntries transfer() {
+            assert ts != null;
+
+            UpdatedEntries transferredValue = new UpdatedEntries(new 
ArrayList<>(updatedEntries), ts);
+
+            clear();
+
+            return transferredValue;
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
index 958b73fb88..ef2f0aa685 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
@@ -31,7 +31,10 @@ enum StorageColumnFamilyType {
     INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
 
     /** Column family for the timestamp to revision mapping. */
-    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8));
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
+
+    /** Column family for the revision to timestamp mapping. */
+    REVISION_TO_TS("REVTOTS".getBytes(StandardCharsets.UTF_8));
 
     /** Byte representation of the column family's name. */
     private final byte[] nameAsBytes;
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index ca80ffc46a..6354ece930 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -88,13 +88,9 @@ class MetaStorageWriteHandler {
                 safeTime = cmdWithTime.safeTime();
 
                 handleWriteWithTime(clo, cmdWithTime, safeTime);
-
-                // Every MetaStorageWriteCommand holds safe time that we 
should set as the cluster time.
-                clusterTime.updateSafeTime(safeTime);
             } else if (command instanceof SyncTimeCommand) {
-                clusterTime.updateSafeTime(((SyncTimeCommand) 
command).safeTime());
-
-                clo.result(null);
+                // TODO: IGNITE-19199 WatchProcessor must be notified of the 
new safe time.
+                throw new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199";);
             } else {
                 assert false : "Command was not found [cmd=" + command + ']';
             }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 27033f9685..fd2318d8ff 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1887,7 +1887,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches(event -> completedFuture(null));
+        storage.startWatches((event, ts) -> completedFuture(null));
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
@@ -2226,7 +2226,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         OnRevisionAppliedCallback mockCallback = 
mock(OnRevisionAppliedCallback.class);
 
-        
when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+        when(mockCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
 
         storage.startWatches(mockCallback);
 
@@ -2238,7 +2238,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         verify(mockListener3, timeout(10_000)).onUpdate(any());
 
-        verify(mockCallback, never()).onRevisionApplied(any());
+        verify(mockCallback, never()).onRevisionApplied(any(), any());
     }
 
     @Test
@@ -2423,7 +2423,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
             }
         });
 
-        storage.startWatches(event -> completedFuture(null));
+        storage.startWatches((event, ts) -> completedFuture(null));
 
         return resultFuture;
     }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
index 6f52b4130c..01198c199d 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
@@ -30,7 +30,7 @@ public class RocksDbCompactionKeyValueStorageTest extends 
AbstractCompactionKeyV
     private Path workDir;
 
     @Override
-    KeyValueStorage createStorage() {
+    public KeyValueStorage createStorage() {
         return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
     }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 31baeb0f31..d60e46416b 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -47,7 +47,7 @@ public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTe
 
     /** {@inheritDoc} */
     @Override
-    KeyValueStorage createStorage() {
+    public KeyValueStorage createStorage() {
         return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
     }
 
@@ -102,7 +102,7 @@ public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTe
             }
         });
 
-        storage.startWatches(event -> CompletableFuture.completedFuture(null));
+        storage.startWatches((event, ts) -> 
CompletableFuture.completedFuture(null));
 
         storage.restoreSnapshot(snapshotPath);
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
index 54ba8cfc14..d7adf966d6 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.metastorage.server;
 /** Compaction test for the simple in-memory implementation of {@link 
KeyValueStorage}. */
 public class SimpleInMemoryCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
     @Override
-    KeyValueStorage createStorage() {
+    public KeyValueStorage createStorage() {
         return new SimpleInMemoryKeyValueStorage("test");
     }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 4e4bca8904..d9c5f9256c 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -23,7 +23,7 @@ package org.apache.ignite.internal.metastorage.server;
 class SimpleInMemoryKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTest {
     /** {@inheritDoc} */
     @Override
-    KeyValueStorage createStorage() {
+    public KeyValueStorage createStorage() {
         return new SimpleInMemoryKeyValueStorage("test");
     }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index 1e64ecd803..210a2bcc3d 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -55,7 +56,7 @@ public class WatchProcessorTest {
 
     @BeforeEach
     void setUp() {
-        
when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+        when(revisionCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
 
         watchProcessor.setRevisionCallback(revisionCallback);
     }
@@ -79,7 +80,7 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
 
         var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1);
         var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2);
@@ -92,7 +93,7 @@ public class WatchProcessorTest {
 
         var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
 
-        verify(revisionCallback, 
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
+        verify(revisionCallback, 
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any());
 
         WatchEvent event = watchEventCaptor.getValue();
 
@@ -114,23 +115,27 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1));
+        HybridTimestamp ts = new HybridTimestamp(1, 2);
+
+        watchProcessor.notifyWatches(List.of(entry1), ts);
 
         var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1));
 
         verify(listener1, timeout(1_000)).onUpdate(event);
         verify(listener2, timeout(1_000)).onRevisionUpdated(1);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
+
+        ts = new HybridTimestamp(2, 3);
 
-        watchProcessor.notifyWatches(List.of(entry2));
+        watchProcessor.notifyWatches(List.of(entry2), ts);
 
         event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2));
 
         verify(listener1, timeout(1_000)).onRevisionUpdated(2);
         verify(listener2, timeout(1_000)).onUpdate(event);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
     }
 
     /**
@@ -150,13 +155,13 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
 
         verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
         verify(listener2, 
timeout(1_000)).onError(any(IllegalStateException.class));
 
-        verify(revisionCallback, never()).onRevisionApplied(any());
+        verify(revisionCallback, never()).onRevisionApplied(any(), any());
     }
 
     /**
@@ -182,7 +187,7 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
 
         verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
@@ -190,7 +195,7 @@ public class WatchProcessorTest {
         var entry3 = new EntryImpl("foo".getBytes(UTF_8), null, 2, 0);
         var entry4 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
 
-        watchProcessor.notifyWatches(List.of(entry3, entry4));
+        watchProcessor.notifyWatches(List.of(entry3, entry4), 
HybridTimestamp.MAX_VALUE);
 
         verify(listener1, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
         verify(listener2, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry4), entry4)));
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
similarity index 96%
rename from 
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
rename to 
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index acc9241f06..ce7a4a5b53 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -43,7 +43,7 @@ public abstract class AbstractKeyValueStorageTest {
     /**
      * Returns key value storage for this test.
      */
-    abstract KeyValueStorage createStorage();
+    protected abstract KeyValueStorage createStorage();
 
     protected static byte[] key(int k) {
         return ("key" + k).getBytes(UTF_8);
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 2096e9ab4e..07e9a913d7 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -64,6 +65,9 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     /** Timestamp to revision mapping. */
     private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
 
+    /** Revision to timestamp mapping. */
+    private final Map<Long, HybridTimestamp> revToTsMap = new HashMap<>();
+
     /** Revisions index. Value contains all entries which were modified under 
particular revision. */
     private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new 
TreeMap<>();
 
@@ -120,6 +124,7 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         rev = newRevision;
 
         tsToRevMap.put(ts.longValue(), rev);
+        revToTsMap.put(rev, ts);
 
         notifyWatches();
     }
@@ -468,7 +473,10 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             return;
         }
 
-        watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+        HybridTimestamp ts = revToTsMap.get(updatedEntries.get(0).revision());
+        assert ts != null;
+
+        watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
 
         updatedEntries.clear();
     }
@@ -748,12 +756,4 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     private static long lastRevision(List<Long> revs) {
         return revs.get(revs.size() - 1);
     }
-
-    private static List<Long> listOf(long val) {
-        List<Long> res = new ArrayList<>();
-
-        res.add(val);
-
-        return res;
-    }
 }

Reply via email to