This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-19028
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 42a805242eab98ca567451919e182a2634d30089
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu Mar 30 14:47:18 2023 +0400

    ..
---
 .../ignite/internal/hlc/HybridClockImpl.java       |   9 ++
 .../ignite/internal/hlc/HybridTimestamp.java       |   7 +-
 ...butionZoneManagerLogicalTopologyEventsTest.java |   4 +-
 .../DistributionZoneManagerScaleUpTest.java        |   3 +-
 .../DistributionZoneManagerWatchListenerTest.java  |   3 +-
 .../impl/ItMetaStorageManagerImplTest.java         |   7 +-
 .../impl/ItMetaStorageMultipleNodesTest.java       |   5 +-
 .../impl/ItMetaStorageServicePersistenceTest.java  |   6 +-
 .../metastorage/impl/ItMetaStorageServiceTest.java |  16 ++-
 .../metastorage/impl/ItMetaStorageWatchTest.java   |   5 +-
 .../server/raft/ItMetaStorageRaftGroupTest.java    |  14 +-
 .../metastorage/command/GetAndPutAllCommand.java   |  27 +---
 .../metastorage/command/GetAndPutCommand.java      |   3 +-
 .../command/GetAndRemoveAllCommand.java            |  22 +--
 .../metastorage/command/GetAndRemoveCommand.java   |   3 +-
 ...oveCommand.java => HybridTimestampMessage.java} |  22 +--
 .../metastorage/command/InvokeCommand.java         |   3 +-
 ...veCommand.java => MetaStorageWriteCommand.java} |  20 +--
 .../command/MetastorageCommandsMessageGroup.java   |   4 +
 .../metastorage/command/MultiInvokeCommand.java    |   3 +-
 .../metastorage/command/PutAllCommand.java         |  37 +----
 .../internal/metastorage/command/PutCommand.java   |   3 +-
 .../metastorage/command/RemoveAllCommand.java      |  22 +--
 .../metastorage/command/RemoveCommand.java         |   3 +-
 ...ultiInvokeCommand.java => SyncTimeCommand.java} |  12 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  22 ++-
 .../impl/MetaStorageRaftGroupEventsListener.java   |  31 ++++-
 .../metastorage/impl/MetaStorageServiceImpl.java   | 153 +++++++++++++++++++--
 .../server/raft/MetaStorageLearnerListener.java    |  12 +-
 .../server/raft/MetaStorageListener.java           |  12 +-
 .../server/raft/MetaStorageWriteHandler.java       |  29 +++-
 .../time/ClusterTime.java}                         |  27 ++--
 .../metastorage/server/time/ClusterTimeImpl.java   | 148 ++++++++++++++++++++
 .../impl/MetaStorageRangeCursorTest.java           |   7 +-
 .../impl/StandaloneMetaStorageManager.java         |   4 +-
 .../processor/messages/MessageImplGenerator.java   |  23 +++-
 .../ignite/network/annotations/WithSetter.java}    |  27 ++--
 .../PlacementDriverManagerTest.java                |   3 +-
 .../internal/raft/service/RaftGroupListener.java   |  12 ++
 .../jraft/rpc/impl/ActionRequestProcessor.java     |   9 +-
 .../ItDistributedConfigurationPropertiesTest.java  |   5 +-
 .../ItDistributedConfigurationStorageTest.java     |   5 +-
 .../storage/ItRebalanceDistributedTest.java        |   4 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../TableManagerDistributionZonesTest.java         |   3 +-
 .../utils/RebalanceUtilUpdateAssignmentsTest.java  |   3 +-
 47 files changed, 577 insertions(+), 231 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 91f02c7fec..7965c3e0e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -49,6 +49,15 @@ public class HybridClockImpl implements HybridClock {
         this.latestTime = new 
HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0);
     }
 
+    /**
+     * The constructor which initializes the latest time to some initial time.
+     *
+     * @param initialTime Initial time.
+     */
+    public HybridClockImpl(HybridTimestamp initialTime) {
+        this.latestTime = initialTime;
+    }
+
     /** {@inheritDoc} */
     @Override
     public HybridTimestamp now() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index e06a975ae4..ed2301c35e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.hlc;
 
 import java.io.Serializable;
 import org.apache.ignite.internal.tostring.S;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * A hybrid timestamp that combines physical clock and logical clock.
@@ -68,10 +67,8 @@ public final class HybridTimestamp implements 
Comparable<HybridTimestamp>, Seria
      * @param times Times for comparing.
      * @return The highest hybrid timestamp.
      */
-    public static @Nullable HybridTimestamp max(HybridTimestamp... times) {
-        if (times.length == 0) {
-            return null;
-        }
+    public static HybridTimestamp max(HybridTimestamp... times) {
+        assert times.length > 0;
 
         HybridTimestamp maxTime = times[0];
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index 82185aa6bd..d56068dbb7 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -59,6 +59,8 @@ import 
org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
@@ -149,7 +151,7 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class));
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 0bd9fa3878..aba45505f0 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.schema.configuration.TableChange;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -190,7 +191,7 @@ public class DistributionZoneManagerScaleUpTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTime.class));
 
         metaStorageService = mock(RaftGroupService.class);
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index 84f1a807e6..5ebc2f466e 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.schema.configuration.TableChange;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -164,7 +165,7 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTime.class));
 
         metaStorageService = mock(RaftGroupService.class);
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index c37073a221..7eeaf10ed3 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -107,7 +108,8 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 cmgManager,
                 mock(LogicalTopologyService.class),
                 raftManager,
-                storage
+                storage,
+                mock(HybridClock.class)
         );
 
         vaultManager.start();
@@ -282,7 +284,8 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 cmgManager,
                 mock(LogicalTopologyService.class),
                 raftManager,
-                storage
+                storage,
+                mock(HybridClock.class)
         );
 
         metaStorageManager.stop();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index 9a90c3d8cf..4c01301f53 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
@@ -48,6 +49,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer
 import org.apache.ignite.internal.configuration.SecurityConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -142,7 +144,8 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
                     cmgManager,
                     new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
                     raftManager,
-                    new SimpleInMemoryKeyValueStorage(name())
+                    new SimpleInMemoryKeyValueStorage(name()),
+                    mock(HybridClock.class)
             );
         }
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index a629716cf6..a483f3979a 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
@@ -29,6 +30,7 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest;
@@ -69,7 +71,7 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
         ClusterNode followerNode = getNode(server);
 
-        metaStorage = new MetaStorageServiceImpl(service, new 
IgniteSpinBusyLock(), followerNode);
+        metaStorage = new MetaStorageServiceImpl(service, new 
IgniteSpinBusyLock(), followerNode, mock(ClusterTime.class));
 
         // Put some data in the metastorage
         metaStorage.put(FIRST_KEY, FIRST_VALUE).get();
@@ -145,7 +147,7 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
             return s;
         });
 
-        return new MetaStorageListener(storage);
+        return new MetaStorageListener(storage, mock(ClusterTime.class));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index 4b27d01123..8483aa744b 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -90,6 +90,8 @@ import 
org.apache.ignite.internal.metastorage.server.ValueCondition.Type;
 import 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageLearnerListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -177,6 +179,8 @@ public class ItMetaStorageServiceTest {
 
         private final KeyValueStorage mockStorage;
 
+        private final ClusterTime clusterTime;
+
         private RaftGroupService metaStorageRaftService;
 
         private MetaStorageService metaStorageService;
@@ -184,13 +188,17 @@ public class ItMetaStorageServiceTest {
         Node(ClusterService clusterService, RaftConfiguration 
raftConfiguration, Path dataPath) {
             this.clusterService = clusterService;
 
+            HybridClockImpl clock = new HybridClockImpl();
+
             this.raftManager = new Loza(
                     clusterService,
                     raftConfiguration,
                     dataPath.resolve(name()),
-                    new HybridClockImpl()
+                    clock
             );
 
+            this.clusterTime = new ClusterTimeImpl(new IgniteSpinBusyLock(), 
clock);
+
             this.mockStorage = mock(KeyValueStorage.class);
         }
 
@@ -206,7 +214,7 @@ public class ItMetaStorageServiceTest {
 
             ClusterNode node = clusterService.topologyService().localMember();
 
-            metaStorageService = new 
MetaStorageServiceImpl(metaStorageRaftService, new IgniteSpinBusyLock(), node);
+            metaStorageService = new 
MetaStorageServiceImpl(metaStorageRaftService, new IgniteSpinBusyLock(), node, 
clusterTime);
         }
 
         String name() {
@@ -222,7 +230,9 @@ public class ItMetaStorageServiceTest {
 
             assert peer != null;
 
-            RaftGroupListener listener = isLearner ? new 
MetaStorageLearnerListener(mockStorage) : new MetaStorageListener(mockStorage);
+            RaftGroupListener listener = isLearner
+                    ? new MetaStorageLearnerListener(mockStorage, clusterTime)
+                    : new MetaStorageListener(mockStorage, clusterTime);
 
             var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index e5e758f862..b1bd82b840 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
@@ -46,6 +47,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer
 import org.apache.ignite.internal.configuration.SecurityConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -141,7 +143,8 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
                     cmgManager,
                     new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
                     raftManager,
-                    new RocksDbKeyValueStorage(name(), 
basePath.resolve("storage"))
+                    new RocksDbKeyValueStorage(name(), 
basePath.resolve("storage")),
+                    mock(HybridClock.class)
             );
 
             components.add(metaStorageManager);
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index f83f2a09ed..e791c93adf 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageService;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
 import org.apache.ignite.internal.raft.RaftNodeId;
@@ -249,7 +251,8 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
                 .value;
 
         MetaStorageService metaStorageSvc = new MetaStorageServiceImpl(
-                raftGroupServiceOfLiveServer, new IgniteSpinBusyLock(), 
liveServer.clusterService().topologyService().localMember());
+                raftGroupServiceOfLiveServer, new IgniteSpinBusyLock(), 
liveServer.clusterService().topologyService().localMember(),
+                mock(ClusterTime.class));
 
         var resultFuture = new CompletableFuture<Void>();
 
@@ -367,15 +370,18 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
 
         var raftNodeId1 = new RaftNodeId(MetastorageGroupId.INSTANCE, 
membersConfiguration.peer(localMemberName(cluster.get(0))));
 
-        metaStorageRaftSrv1.startRaftNode(raftNodeId1, membersConfiguration, 
new MetaStorageListener(mockStorage), defaults());
+        metaStorageRaftSrv1.startRaftNode(raftNodeId1, membersConfiguration,
+                new MetaStorageListener(mockStorage, mock(ClusterTime.class)), 
defaults());
 
         var raftNodeId2 = new RaftNodeId(MetastorageGroupId.INSTANCE, 
membersConfiguration.peer(localMemberName(cluster.get(1))));
 
-        metaStorageRaftSrv2.startRaftNode(raftNodeId2, membersConfiguration, 
new MetaStorageListener(mockStorage), defaults());
+        metaStorageRaftSrv2.startRaftNode(raftNodeId2, membersConfiguration,
+                new MetaStorageListener(mockStorage, mock(ClusterTime.class)), 
defaults());
 
         var raftNodeId3 = new RaftNodeId(MetastorageGroupId.INSTANCE, 
membersConfiguration.peer(localMemberName(cluster.get(2))));
 
-        metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration, 
new MetaStorageListener(mockStorage), defaults());
+        metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration,
+                new MetaStorageListener(mockStorage, mock(ClusterTime.class)), 
defaults());
 
         metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start(
                 MetastorageGroupId.INSTANCE,
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java
index ea69cc0625..39cdf02765 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java
@@ -17,11 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -29,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable;
  * previous entries for given keys.
  */
 @Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT_ALL)
-public interface GetAndPutAllCommand extends WriteCommand {
+public interface GetAndPutAllCommand extends MetaStorageWriteCommand {
     /**
      * Returns keys.
      */
@@ -39,25 +35,4 @@ public interface GetAndPutAllCommand extends WriteCommand {
      * Returns values.
      */
     List<byte[]> values();
-
-    /**
-     * Static constructor.
-     *
-     * @param commandsFactory Commands factory.
-     * @param map Values.
-     */
-    static GetAndPutAllCommand getAndPutAllCommand(MetaStorageCommandsFactory 
commandsFactory, Map<ByteArray, byte[]> map) {
-        int size = map.size();
-
-        List<byte[]> keys = new ArrayList<>(size);
-        List<byte[]> values = new ArrayList<>(size);
-
-        for (Map.Entry<ByteArray, byte[]> e : map.entrySet()) {
-            keys.add(e.getKey().bytes());
-
-            values.add(e.getValue());
-        }
-
-        return 
commandsFactory.getAndPutAllCommand().keys(keys).values(values).build();
-    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java
index 7042e505fc..fc2210610c 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -25,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable;
  * a previous entry for the given key.
  */
 @Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT)
-public interface GetAndPutCommand extends WriteCommand {
+public interface GetAndPutCommand extends MetaStorageWriteCommand {
     /**
      * Returns the key. Couldn't be {@code null}.
      */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java
index 5943405c73..98aadde70b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java
@@ -17,36 +17,16 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Get and remove all command for MetaStorageCommandListener that removes 
entries for given keys and retrieves previous entries.
  */
 @Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE_ALL)
-public interface GetAndRemoveAllCommand extends WriteCommand {
+public interface GetAndRemoveAllCommand extends MetaStorageWriteCommand {
     /**
      * Returns the keys collection. Couldn't be {@code null}.
      */
     List<byte[]> keys();
-
-    /**
-     * Static constructor.
-     *
-     * @param commandsFactory Commands factory.
-     * @param keys The keys collection. Couldn't be {@code null}.
-     */
-    static GetAndRemoveAllCommand 
getAndRemoveAllCommand(MetaStorageCommandsFactory commandsFactory, 
Set<ByteArray> keys) {
-        List<byte[]> keysList = new ArrayList<>(keys.size());
-
-        for (ByteArray key : keys) {
-            keysList.add(key.bytes());
-        }
-
-        return commandsFactory.getAndRemoveAllCommand().keys(keysList).build();
-    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java
index a778695978..42c83df1e3 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -25,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable;
  * given key.
  */
 @Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE)
-public interface GetAndRemoveCommand extends WriteCommand {
+public interface GetAndRemoveCommand extends MetaStorageWriteCommand {
     /**
      * Returns the key. Couldn't be {@code null}.
      */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/HybridTimestampMessage.java
similarity index 68%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
copy to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/HybridTimestampMessage.java
index 5b2bce2e2b..93e6d481b7 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/HybridTimestampMessage.java
@@ -17,16 +17,18 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.raft.WriteCommand;
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
-/**
- * Remove command for MetaStorageCommandListener that removes an entry for the 
given key.
- */
-@Transferable(MetastorageCommandsMessageGroup.REMOVE)
-public interface RemoveCommand extends WriteCommand {
-    /**
-     * Returns the key. Couldn't be {@code null}.
-     */
-    byte[] key();
+@Transferable(MetastorageCommandsMessageGroup.HYBRID_TS)
+public interface HybridTimestampMessage extends NetworkMessage, Serializable {
+    long physical();
+
+    int logical();
+
+    default HybridTimestamp asHybridTimestamp() {
+        return new HybridTimestamp(physical(), logical());
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
index be00fb59d6..b50f58de6a 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java
@@ -20,14 +20,13 @@ package org.apache.ignite.internal.metastorage.command;
 import java.util.Collection;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Represents invoke command for meta storage.
  */
 @Transferable(MetastorageCommandsMessageGroup.INVOKE)
-public interface InvokeCommand extends WriteCommand {
+public interface InvokeCommand extends MetaStorageWriteCommand {
     /**
      * Returns condition.
      *
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
similarity index 72%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
copy to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
index 5b2bce2e2b..9015503f88 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
@@ -18,15 +18,15 @@
 package org.apache.ignite.internal.metastorage.command;
 
 import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.network.annotations.WithSetter;
 
-/**
- * Remove command for MetaStorageCommandListener that removes an entry for the 
given key.
- */
-@Transferable(MetastorageCommandsMessageGroup.REMOVE)
-public interface RemoveCommand extends WriteCommand {
-    /**
-     * Returns the key. Couldn't be {@code null}.
-     */
-    byte[] key();
+public interface MetaStorageWriteCommand extends WriteCommand {
+    HybridTimestampMessage initiatorTime();
+
+    @WithSetter
+    HybridTimestampMessage safeTime();
+
+    default void safeTime(HybridTimestampMessage safeTime) {
+        // No-op.
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 8856166a9d..ba8da6d766 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -84,4 +84,8 @@ public interface MetastorageCommandsMessageGroup {
 
     /** Message type for {@link CloseAllCursorsCommand}. */
     short CLOSE_ALL_CURSORS = 64;
+
+    short HYBRID_TS = 65;
+
+    short SYNC_TIME = 66;
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java
index 843f418641..ae90fb8f9e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java
@@ -18,14 +18,13 @@
 package org.apache.ignite.internal.metastorage.command;
 
 import org.apache.ignite.internal.metastorage.dsl.Iif;
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Represents invoke command with nested conditions and execution branches.
  */
 @Transferable(MetastorageCommandsMessageGroup.MULTI_INVOKE)
-public interface MultiInvokeCommand extends WriteCommand {
+public interface MultiInvokeCommand extends MetaStorageWriteCommand {
     /**
      * Returns if statement.
      *
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java
index e97e7a6906..402724daa2 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java
@@ -17,18 +17,14 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Put all command for MetaStorageCommandListener that inserts or updates 
entries with given keys and given values.
  */
 @Transferable(MetastorageCommandsMessageGroup.PUT_ALL)
-public interface PutAllCommand extends WriteCommand {
+public interface PutAllCommand extends MetaStorageWriteCommand {
     /**
      * Returns entries keys.
      */
@@ -38,35 +34,4 @@ public interface PutAllCommand extends WriteCommand {
      * Returns entries values.
      */
     List<byte[]> values();
-
-    /**
-     * Static constructor.
-     *
-     * @param commandsFactory Commands factory.
-     * @param vals The map of keys and corresponding values. Couldn't be 
{@code null} or empty.
-     */
-    static PutAllCommand putAllCommand(MetaStorageCommandsFactory 
commandsFactory, Map<ByteArray, byte[]> vals) {
-        assert !vals.isEmpty();
-
-        int size = vals.size();
-
-        List<byte[]> keys = new ArrayList<>(size);
-
-        List<byte[]> values = new ArrayList<>(size);
-
-        for (Map.Entry<ByteArray, byte[]> e : vals.entrySet()) {
-            byte[] key = e.getKey().bytes();
-
-            byte[] val = e.getValue();
-
-            assert key != null : "Key could not be null.";
-            assert val != null : "Value could not be null.";
-
-            keys.add(key);
-
-            values.add(val);
-        }
-
-        return 
commandsFactory.putAllCommand().keys(keys).values(values).build();
-    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java
index eb176875f1..3facff93fa 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -25,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable;
  * previous entry for the given key.
  */
 @Transferable(MetastorageCommandsMessageGroup.PUT)
-public interface PutCommand extends WriteCommand {
+public interface PutCommand extends MetaStorageWriteCommand {
     /**
      * Returns the key. Couldn't be {@code null}.
      */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java
index 291d185500..c4824c1602 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java
@@ -17,36 +17,16 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Remove all command for MetaStorageCommandListener that removes entries for 
given keys.
  */
 @Transferable(MetastorageCommandsMessageGroup.REMOVE_ALL)
-public interface RemoveAllCommand extends WriteCommand {
+public interface RemoveAllCommand extends MetaStorageWriteCommand {
     /**
      * Returns the keys list. Couldn't be {@code null}.
      */
     List<byte[]> keys();
-
-    /**
-     * Static constructor.
-     *
-     * @param commandsFactory Commands factory.
-     * @param keys The keys collection. Couldn't be {@code null}.
-     */
-    static RemoveAllCommand removeAllCommand(MetaStorageCommandsFactory 
commandsFactory, Set<ByteArray> keys) {
-        List<byte[]> list = new ArrayList<>(keys.size());
-
-        for (ByteArray key : keys) {
-            list.add(key.bytes());
-        }
-
-        return commandsFactory.removeAllCommand().keys(list).build();
-    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
index 5b2bce2e2b..d78686b5cd 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
@@ -17,14 +17,13 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Remove command for MetaStorageCommandListener that removes an entry for the 
given key.
  */
 @Transferable(MetastorageCommandsMessageGroup.REMOVE)
-public interface RemoveCommand extends WriteCommand {
+public interface RemoveCommand extends MetaStorageWriteCommand {
     /**
      * Returns the key. Couldn't be {@code null}.
      */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
similarity index 79%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java
copy to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
index 843f418641..ccf2506a8e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
@@ -17,19 +17,13 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Represents invoke command with nested conditions and execution branches.
  */
-@Transferable(MetastorageCommandsMessageGroup.MULTI_INVOKE)
-public interface MultiInvokeCommand extends WriteCommand {
-    /**
-     * Returns if statement.
-     *
-     * @return if statement.
-     */
-    Iif iif();
+@Transferable(MetastorageCommandsMessageGroup.SYNC_TIME)
+public interface SyncTimeCommand extends WriteCommand {
+    HybridTimestampMessage safeTime();
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index d1fe91c36e..d4d2889e68 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -45,6 +46,7 @@ import 
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageLearnerListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -103,6 +105,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private final ClusterTimeImpl clusterTime;
+
     /**
      * The constructor.
      *
@@ -119,7 +123,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             ClusterManagementGroupManager cmgMgr,
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
-            KeyValueStorage storage
+            KeyValueStorage storage,
+            HybridClock clock
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
@@ -127,6 +132,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
+        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
     }
 
     private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
@@ -148,8 +154,14 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
-                        new MetaStorageListener(storage),
-                        new MetaStorageRaftGroupEventsListener(busyLock, 
clusterService, logicalTopologyService, metaStorageSvcFut)
+                        new MetaStorageListener(storage, clusterTime),
+                        new MetaStorageRaftGroupEventsListener(
+                                busyLock,
+                                clusterService,
+                                logicalTopologyService,
+                                metaStorageSvcFut,
+                                clusterTime
+                        )
                 );
             } else {
                 PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
@@ -161,7 +173,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
-                        new MetaStorageLearnerListener(storage),
+                        new MetaStorageLearnerListener(storage, clusterTime),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
@@ -169,7 +181,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, busyLock, thisNode));
+        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, busyLock, thisNode, clusterTime));
     }
 
     @Override
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
index 9fe455fff4..9b09c02805 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
@@ -31,6 +31,8 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -65,16 +67,20 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
 
     private final Object serializationFutureMux = new Object();
 
+    private final ClusterTimeImpl clusterTime;
+
     MetaStorageRaftGroupEventsListener(
             IgniteSpinBusyLock busyLock,
             ClusterService clusterService,
             LogicalTopologyService logicalTopologyService,
-            CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut
+            CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut,
+            ClusterTimeImpl clusterTime
     ) {
         this.busyLock = busyLock;
         this.nodeName = clusterService.localConfiguration().getName();
         this.logicalTopologyService = logicalTopologyService;
         this.metaStorageSvcFut = metaStorageSvcFut;
+        this.clusterTime = clusterTime;
     }
 
     @Override
@@ -84,6 +90,18 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
 
             // Update learner configuration (in case we missed some topology 
updates) and initialize the serialization future.
             serializationFuture = executeIfLeaderImpl(this::resetLearners);
+
+            serializationFuture = executeWithStatus((service, term1, isLeader) 
-> {
+                if (isLeader) {
+                    this.resetLearners(service, term1);
+
+                    clusterTime.startLeaderTimer(service);
+                } else {
+                    clusterTime.stopLeaderTimer();
+                }
+
+                return null;
+            });
         }
     }
 
@@ -127,6 +145,10 @@ public class MetaStorageRaftGroupEventsListener implements 
RaftGroupEventsListen
         CompletableFuture<Void> apply(MetaStorageServiceImpl service, long 
term);
     }
 
+    private interface OnStatusAction {
+        CompletableFuture<Void> apply(MetaStorageServiceImpl service, long 
term, boolean isLeader);
+    }
+
     /**
      * Executes the given action if the current node is the Meta Storage 
leader.
      */
@@ -155,11 +177,16 @@ public class MetaStorageRaftGroupEventsListener 
implements RaftGroupEventsListen
     }
 
     private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action) 
{
+        return executeWithStatus((service, term, isLeader) -> 
action.apply(service, term));
+    }
+
+    private CompletableFuture<Void> executeWithStatus(OnStatusAction action) {
         return metaStorageSvcFut.thenCompose(service -> 
service.raftGroupService().refreshAndGetLeaderWithTerm()
                 .thenCompose(leaderWithTerm -> {
                     String leaderName = leaderWithTerm.leader().consistentId();
 
-                    return leaderName.equals(nodeName) ? action.apply(service, 
leaderWithTerm.term()) : completedFuture(null);
+                    boolean isLeader = leaderName.equals(nodeName);
+                    return action.apply(service, leaderWithTerm.term(), 
isLeader);
                 }));
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index eee5162582..824f1d31c6 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -18,11 +18,8 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static 
org.apache.ignite.internal.metastorage.command.GetAllCommand.getAllCommand;
-import static 
org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand.getAndPutAllCommand;
-import static 
org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand.getAndRemoveAllCommand;
-import static 
org.apache.ignite.internal.metastorage.command.PutAllCommand.putAllCommand;
-import static 
org.apache.ignite.internal.metastorage.command.RemoveAllCommand.removeAllCommand;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -40,6 +38,7 @@ import 
org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
 import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
+import org.apache.ignite.internal.metastorage.command.HybridTimestampMessage;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
 import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
@@ -47,10 +46,12 @@ import 
org.apache.ignite.internal.metastorage.command.PutAllCommand;
 import org.apache.ignite.internal.metastorage.command.PutCommand;
 import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
 import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -72,13 +73,16 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
     /** Local node. */
     private final ClusterNode localNode;
 
+    private final ClusterTime clusterTime;
+
     /**
      * Constructor.
      *
      * @param metaStorageRaftGrpSvc Meta storage raft group service.
      * @param localNode Local node.
      */
-    public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, 
IgniteSpinBusyLock busyLock, ClusterNode localNode) {
+    public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, 
IgniteSpinBusyLock busyLock, ClusterNode localNode,
+            ClusterTime clusterTime) {
         this.context = new MetaStorageServiceContext(
                 metaStorageRaftGrpSvc,
                 new MetaStorageCommandsFactory(),
@@ -88,6 +92,7 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
         );
 
         this.localNode = localNode;
+        this.clusterTime = clusterTime;
     }
 
     RaftGroupService raftGroupService() {
@@ -126,28 +131,36 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
 
     @Override
     public CompletableFuture<Void> put(ByteArray key, byte[] value) {
-        PutCommand putCommand = 
context.commandsFactory().putCommand().key(key.bytes()).value(value).build();
+        PutCommand putCommand = context.commandsFactory().putCommand()
+                .key(key.bytes())
+                .value(value)
+                .initiatorTime(hybridTimestamp(clusterTime.now()))
+                .build();
 
         return context.raftService().run(putCommand);
     }
 
     @Override
     public CompletableFuture<Entry> getAndPut(ByteArray key, byte[] value) {
-        GetAndPutCommand getAndPutCommand = 
context.commandsFactory().getAndPutCommand().key(key.bytes()).value(value).build();
+        GetAndPutCommand getAndPutCommand = 
context.commandsFactory().getAndPutCommand()
+                .key(key.bytes())
+                .value(value)
+                .initiatorTime(hybridTimestamp(clusterTime.now()))
+                .build();
 
         return context.raftService().run(getAndPutCommand);
     }
 
     @Override
     public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
-        PutAllCommand putAllCommand = putAllCommand(context.commandsFactory(), 
vals);
+        PutAllCommand putAllCommand = putAllCommand(context.commandsFactory(), 
vals, clusterTime.now());
 
         return context.raftService().run(putAllCommand);
     }
 
     @Override
     public CompletableFuture<Map<ByteArray, Entry>> 
getAndPutAll(Map<ByteArray, byte[]> vals) {
-        GetAndPutAllCommand getAndPutAllCommand = 
getAndPutAllCommand(context.commandsFactory(), vals);
+        GetAndPutAllCommand getAndPutAllCommand = 
getAndPutAllCommand(context.commandsFactory(), vals, clusterTime.now());
 
         return context.raftService().<List<Entry>>run(getAndPutAllCommand)
                 .thenApply(MetaStorageServiceImpl::multipleEntryResult);
@@ -155,28 +168,30 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
 
     @Override
     public CompletableFuture<Void> remove(ByteArray key) {
-        RemoveCommand removeCommand = 
context.commandsFactory().removeCommand().key(key.bytes()).build();
+        RemoveCommand removeCommand = 
context.commandsFactory().removeCommand().key(key.bytes())
+                .initiatorTime(hybridTimestamp(clusterTime.now())).build();
 
         return context.raftService().run(removeCommand);
     }
 
     @Override
     public CompletableFuture<Entry> getAndRemove(ByteArray key) {
-        GetAndRemoveCommand getAndRemoveCommand = 
context.commandsFactory().getAndRemoveCommand().key(key.bytes()).build();
+        GetAndRemoveCommand getAndRemoveCommand = 
context.commandsFactory().getAndRemoveCommand().key(key.bytes())
+                .initiatorTime(hybridTimestamp(clusterTime.now())).build();
 
         return context.raftService().run(getAndRemoveCommand);
     }
 
     @Override
     public CompletableFuture<Void> removeAll(Set<ByteArray> keys) {
-        RemoveAllCommand removeAllCommand = 
removeAllCommand(context.commandsFactory(), keys);
+        RemoveAllCommand removeAllCommand = 
removeAllCommand(context.commandsFactory(), keys, clusterTime.now());
 
         return context.raftService().run(removeAllCommand);
     }
 
     @Override
     public CompletableFuture<Map<ByteArray, Entry>> 
getAndRemoveAll(Set<ByteArray> keys) {
-        GetAndRemoveAllCommand getAndRemoveAllCommand = 
getAndRemoveAllCommand(context.commandsFactory(), keys);
+        GetAndRemoveAllCommand getAndRemoveAllCommand = 
getAndRemoveAllCommand(context.commandsFactory(), keys, clusterTime.now());
 
         return context.raftService().<List<Entry>>run(getAndRemoveAllCommand)
                 .thenApply(MetaStorageServiceImpl::multipleEntryResult);
@@ -197,6 +212,7 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
                 .condition(condition)
                 .success(success)
                 .failure(failure)
+                .initiatorTime(hybridTimestamp(clusterTime.now()))
                 .build();
 
         return context.raftService().run(invokeCommand);
@@ -206,6 +222,7 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
     public CompletableFuture<StatementResult> invoke(Iif iif) {
         MultiInvokeCommand multiInvokeCommand = 
context.commandsFactory().multiInvokeCommand()
                 .iif(iif)
+                .initiatorTime(hybridTimestamp(clusterTime.now()))
                 .build();
 
         return context.raftService().run(multiInvokeCommand);
@@ -258,6 +275,14 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
         return new CursorPublisher(context, createPrefixCommand);
     }
 
+    public CompletableFuture<Void> syncTime(HybridTimestamp hybridTimestamp) {
+        SyncTimeCommand syncTimeCommand = 
context.commandsFactory().syncTimeCommand()
+                .safeTime(hybridTimestamp(clusterTime.now()))
+                .build();
+
+        return context.raftService().run(syncTimeCommand);
+    }
+
     // TODO: IGNITE-14734 Implement.
     @Override
     public CompletableFuture<Void> compact() {
@@ -283,4 +308,106 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
 
         return res;
     }
+
+    /**
+     * Creates put all command.
+     *
+     * @param commandsFactory Commands factory.
+     * @param vals The map of keys and corresponding values. Couldn't be 
{@code null} or empty.
+     * @param ts Local time.
+     */
+    private PutAllCommand putAllCommand(MetaStorageCommandsFactory 
commandsFactory, Map<ByteArray, byte[]> vals, HybridTimestamp ts) {
+        assert !vals.isEmpty();
+
+        int size = vals.size();
+
+        List<byte[]> keys = new ArrayList<>(size);
+
+        List<byte[]> values = new ArrayList<>(size);
+
+        for (Map.Entry<ByteArray, byte[]> e : vals.entrySet()) {
+            byte[] key = e.getKey().bytes();
+
+            byte[] val = e.getValue();
+
+            assert key != null : "Key could not be null.";
+            assert val != null : "Value could not be null.";
+
+            keys.add(key);
+
+            values.add(val);
+        }
+
+        return commandsFactory.putAllCommand()
+                .keys(keys)
+                .values(values)
+                .initiatorTime(hybridTimestamp(ts))
+                .build();
+    }
+
+    /**
+     * Creates get and put all command.
+     *
+     * @param commandsFactory Commands factory.
+     * @param map Values.
+     * @param ts Local time.
+     */
+    private GetAndPutAllCommand getAndPutAllCommand(MetaStorageCommandsFactory 
commandsFactory, Map<ByteArray, byte[]> map,
+            HybridTimestamp ts) {
+        int size = map.size();
+
+        List<byte[]> keys = new ArrayList<>(size);
+        List<byte[]> values = new ArrayList<>(size);
+
+        for (Map.Entry<ByteArray, byte[]> e : map.entrySet()) {
+            keys.add(e.getKey().bytes());
+
+            values.add(e.getValue());
+        }
+
+        return commandsFactory.getAndPutAllCommand()
+                .keys(keys)
+                .values(values)
+                .initiatorTime(hybridTimestamp(ts))
+                .build();
+    }
+
+    /**
+     * Creates get and remove all command.
+     *
+     * @param commandsFactory Commands factory.
+     * @param keys The keys collection. Couldn't be {@code null}.
+     * @param ts Local time.
+     */
+    private GetAndRemoveAllCommand 
getAndRemoveAllCommand(MetaStorageCommandsFactory commandsFactory, 
Set<ByteArray> keys,
+            HybridTimestamp ts) {
+        List<byte[]> keysList = new ArrayList<>(keys.size());
+
+        for (ByteArray key : keys) {
+            keysList.add(key.bytes());
+        }
+
+        return 
commandsFactory.getAndRemoveAllCommand().keys(keysList).initiatorTime(hybridTimestamp(ts)).build();
+    }
+
+    /**
+     * Creates remove all command.
+     *
+     * @param commandsFactory Commands factory.
+     * @param keys The keys collection. Couldn't be {@code null}.
+     * @param ts Local time.
+     */
+    private RemoveAllCommand removeAllCommand(MetaStorageCommandsFactory 
commandsFactory, Set<ByteArray> keys, HybridTimestamp ts) {
+        List<byte[]> list = new ArrayList<>(keys.size());
+
+        for (ByteArray key : keys) {
+            list.add(key.bytes());
+        }
+
+        return 
commandsFactory.removeAllCommand().keys(list).initiatorTime(hybridTimestamp(ts)).build();
+    }
+
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp ts) {
+        return 
context.commandsFactory().hybridTimestampMessage().physical(ts.getPhysical()).logical(ts.getLogical()).build();
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java
index 7902e66781..7dd7e918f5 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java
@@ -21,6 +21,9 @@ import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -34,9 +37,9 @@ public class MetaStorageLearnerListener implements 
RaftGroupListener {
 
     private final MetaStorageWriteHandler writeHandler;
 
-    public MetaStorageLearnerListener(KeyValueStorage storage) {
+    public MetaStorageLearnerListener(KeyValueStorage storage, ClusterTimeImpl 
clusterTime) {
         this.storage = storage;
-        this.writeHandler = new MetaStorageWriteHandler(storage);
+        this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime);
     }
 
     @Override
@@ -56,6 +59,11 @@ public class MetaStorageLearnerListener implements 
RaftGroupListener {
         }
     }
 
+    @Override
+    public void onBeforeApply(Command command) {
+        writeHandler.beforeApply(command);
+    }
+
     @Override
     public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
         storage.snapshot(path).whenComplete((unused, throwable) -> 
doneClo.accept(throwable));
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index d3c2ceaace..409380b88b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -39,6 +39,9 @@ import 
org.apache.ignite.internal.metastorage.command.cursor.NextBatchCommand;
 import org.apache.ignite.internal.metastorage.command.response.BatchResponse;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -64,9 +67,9 @@ public class MetaStorageListener implements RaftGroupListener 
{
      *
      * @param storage Storage.
      */
-    public MetaStorageListener(KeyValueStorage storage) {
+    public MetaStorageListener(KeyValueStorage storage, ClusterTimeImpl 
clusterTime) {
         this.storage = storage;
-        this.writeHandler = new MetaStorageWriteHandler(storage);
+        this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime);
         this.cursors = new ConcurrentHashMap<>();
     }
 
@@ -217,6 +220,11 @@ public class MetaStorageListener implements 
RaftGroupListener {
         }
     }
 
+    @Override
+    public void onBeforeApply(Command command) {
+        writeHandler.beforeApply(command);
+    }
+
     private void closeCursor(IgniteUuid cursorId) {
         CursorMeta cursorMeta = cursors.remove(cursorId);
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index ddc91f2f65..663856744a 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -19,17 +19,22 @@ package org.apache.ignite.internal.metastorage.server.raft;
 
 import java.io.Serializable;
 import java.util.Collection;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
 import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.command.HybridTimestampMessage;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
 import org.apache.ignite.internal.metastorage.command.PutAllCommand;
 import org.apache.ignite.internal.metastorage.command.PutCommand;
 import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
 import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
 import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
 import org.apache.ignite.internal.metastorage.dsl.ConditionType;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
@@ -46,6 +51,9 @@ import 
org.apache.ignite.internal.metastorage.server.RevisionCondition;
 import org.apache.ignite.internal.metastorage.server.Statement;
 import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
 import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 
@@ -53,10 +61,13 @@ import 
org.apache.ignite.internal.raft.service.CommandClosure;
  * Class containing some common logic for Meta Storage Raft group listeners.
  */
 class MetaStorageWriteHandler {
+    private static final MetaStorageCommandsFactory 
META_STORAGE_COMMANDS_FACTORY = new MetaStorageCommandsFactory();
     private final KeyValueStorage storage;
+    private final ClusterTimeImpl clusterTime;
 
-    MetaStorageWriteHandler(KeyValueStorage storage) {
+    MetaStorageWriteHandler(KeyValueStorage storage, ClusterTimeImpl 
clusterTime) {
         this.storage = storage;
+        this.clusterTime = clusterTime;
     }
 
     /**
@@ -122,6 +133,10 @@ class MetaStorageWriteHandler {
             MultiInvokeCommand cmd = (MultiInvokeCommand) command;
 
             clo.result(storage.invoke(toIf(cmd.iif())));
+        } else if (command instanceof SyncTimeCommand) {
+            clusterTime.updateSafeTime(((SyncTimeCommand) 
command).safeTime().asHybridTimestamp());
+
+            clo.result(null);
         } else {
             return false;
         }
@@ -234,4 +249,16 @@ class MetaStorageWriteHandler {
                 throw new IllegalArgumentException("Unexpected revision 
condition type " + type);
         }
     }
+
+    void beforeApply(Command command) {
+        if (command instanceof MetaStorageWriteCommand) {
+            clusterTime.adjust(((MetaStorageWriteCommand) 
command).initiatorTime().asHybridTimestamp());
+
+            ((MetaStorageWriteCommand) 
command).safeTime(hybridTimestamp(clusterTime.now()));
+        }
+    }
+
+    private static HybridTimestampMessage hybridTimestamp(HybridTimestamp ts) {
+        return 
META_STORAGE_COMMANDS_FACTORY.hybridTimestampMessage().physical(ts.getPhysical()).logical(ts.getLogical()).build();
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
similarity index 55%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java
copy to 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
index eb176875f1..7f631ef8dd 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
@@ -15,24 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.command;
+package org.apache.ignite.internal.metastorage.server.time;
 
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.annotations.Transferable;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
 
 /**
- * Put command for MetaStorageCommandListener that inserts or updates an entry 
with the given key and the given value and retrieves a
- * previous entry for the given key.
+ * Cluster time.
  */
-@Transferable(MetastorageCommandsMessageGroup.PUT)
-public interface PutCommand extends WriteCommand {
+public interface ClusterTime {
     /**
-     * Returns the key. Couldn't be {@code null}.
+     * Returns current cluster time.
+     *
+     * @return Current cluster time.
      */
-    byte[] key();
+    HybridTimestamp now();
 
     /**
-     * Returns the value. Couldn't be {@code null}.
+     * Provides the future that is completed when cluster time reaches given 
one. If the time is greater or equal
+     * then the given one, returns completed future.
+     *
+     * @param time Timestamp to wait for.
+     * @return Future.
      */
-    byte[] value();
+    CompletableFuture<Void> waitFor(HybridTimestamp time);
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
new file mode 100644
index 0000000000..2f33436bd7
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+
+public class ClusterTimeImpl implements ClusterTime {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(ClusterTimeImpl.class);
+
+    private final IgniteSpinBusyLock busyLock;
+
+    private volatile @Nullable LeaderTimer leaderTimer;
+
+    private final HybridClock clock;
+
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+        this.busyLock = busyLock;
+        this.clock = clock;
+        this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+    }
+
+    public void startLeaderTimer(MetaStorageServiceImpl service) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            assert leaderTimer == null;
+
+            LeaderTimer newTimer = new LeaderTimer(service);
+
+            leaderTimer = newTimer;
+
+            newTimer.start();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    public void stopLeaderTimer() {
+        LeaderTimer timer = leaderTimer;
+
+        assert timer != null;
+
+        timer.stop();
+
+        leaderTimer = null;
+    }
+
+    @Override
+    public HybridTimestamp now() {
+        return clock.now();
+    }
+
+    @Override
+    public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+        return safeTime.waitFor(time);
+    }
+
+    public void updateSafeTime(HybridTimestamp ts) {
+        this.safeTime.update(ts);
+    }
+
+    public void adjust(HybridTimestamp ts) {
+        this.clock.update(ts);
+    }
+
+    private class LeaderTimer {
+
+        private final MetaStorageServiceImpl service;
+
+        /** Scheduled executor for cluster time sync. */
+        private final ScheduledExecutorService 
scheduledClusterTimeSyncExecutor =
+                Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("scheduled-cluster-time-sync-thread", LOG));
+
+        private LeaderTimer(MetaStorageServiceImpl service) {
+            this.service = service;
+        }
+
+        void start() {
+            schedule();
+        }
+
+        private void schedule() {
+            try {
+                scheduledClusterTimeSyncExecutor.scheduleAtFixedRate(
+                        this::disseminateTime,
+                        0,
+                        1,
+                        TimeUnit.SECONDS
+                );
+            } catch (RejectedExecutionException ignored) {
+                // Scheduler was stopped, it is ok.
+            }
+        }
+
+        void disseminateTime() {
+            if (!busyLock.enterBusy()) {
+                // Shutting down.
+                return;
+            }
+
+            try {
+                HybridTimestamp now = clock.now();
+
+                service.syncTime(now);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        void stop() {
+            
IgniteUtils.shutdownAndAwaitTermination(scheduledClusterTimeSyncExecutor, 1, 
TimeUnit.SECONDS);
+        }
+    }
+}
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java
index 95871e2399..e07ed4e66b 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
@@ -42,6 +43,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
@@ -83,13 +85,14 @@ public class MetaStorageRangeCursorTest {
         when(storage.range(any(), any(), 
anyBoolean())).thenReturn(Cursor.fromIterable(expectedEntries));
         when(storage.range(any(), any(), anyLong(), 
anyBoolean())).thenReturn(Cursor.fromIterable(expectedEntries));
 
-        listener = new MetaStorageListener(storage);
+        listener = new MetaStorageListener(storage, mock(ClusterTime.class));
 
         when(raftGroupService.run(any())).thenAnswer(invocation -> 
runCommand(invocation.getArgument(0)));
 
         var localNode = new ClusterNode("test", "test", new 
NetworkAddress("localhost", 10000));
 
-        MetaStorageService metaStorageService = new 
MetaStorageServiceImpl(raftGroupService, new IgniteSpinBusyLock(), localNode);
+        MetaStorageService metaStorageService = new 
MetaStorageServiceImpl(raftGroupService, new IgniteSpinBusyLock(), localNode,
+                mock(ClusterTime.class));
 
         checkCursor(metaStorageService.range(intToBytes(0), 
intToBytes(keyTo)), expectedEntries);
         checkCursor(metaStorageService.range(intToBytes(0), intToBytes(keyTo), 
0), expectedEntries);
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 0da79d691e..e3342069c0 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -29,8 +29,10 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.ReadCommand;
@@ -95,7 +97,7 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
      */
     private StandaloneMetaStorageManager(VaultManager vaultMgr, ClusterService 
clusterService, ClusterManagementGroupManager cmgMgr,
             LogicalTopologyService logicalTopologyService, RaftManager 
raftMgr, KeyValueStorage storage) {
-        super(vaultMgr, clusterService, cmgMgr, logicalTopologyService, 
raftMgr, storage);
+        super(vaultMgr, clusterService, cmgMgr, logicalTopologyService, 
raftMgr, storage, mock(HybridClock.class));
     }
 
     private static ClusterService mockClusterService() {
diff --git 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 84e63cc954..98a1eee909 100644
--- 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.WithSetter;
 
 /**
  * Class for generating implementations of the {@link NetworkMessage} 
interfaces and their builders, generated by a {@link
@@ -98,7 +99,7 @@ public class MessageImplGenerator {
         List<ExecutableElement> getters = message.getters();
 
         var fields = new ArrayList<FieldSpec>(getters.size());
-        var getterImpls = new ArrayList<MethodSpec>(getters.size());
+        var methodImpls = new ArrayList<MethodSpec>(getters.size());
 
         // create a field and a getter implementation for every getter in the 
message interface
         for (ExecutableElement getter : getters) {
@@ -111,8 +112,9 @@ public class MessageImplGenerator {
                     .addModifiers(Modifier.PRIVATE);
 
             boolean isMarshallable = getter.getAnnotation(Marshallable.class) 
!= null;
+            boolean generateSetter = getter.getAnnotation(WithSetter.class) != 
null;
 
-            if (!isMarshallable) {
+            if (!isMarshallable && !generateSetter) {
                 fieldBuilder.addModifiers(Modifier.FINAL);
             }
 
@@ -132,21 +134,32 @@ public class MessageImplGenerator {
                         .addStatement("return $N", marshallableFieldArray)
                         .build();
 
-                getterImpls.add(baGetterImpl);
+                methodImpls.add(baGetterImpl);
+            }
+
+            if (generateSetter) {
+                MethodSpec setterImpl = MethodSpec.methodBuilder(getterName)
+                        .returns(TypeName.VOID)
+                        .addModifiers(Modifier.PUBLIC)
+                        .addParameter(getterReturnType, getterName)
+                        .addStatement("this.$L = $L", getterName, getterName)
+                        .build();
+
+                methodImpls.add(setterImpl);
             }
 
             MethodSpec getterImpl = MethodSpec.overriding(getter)
                     .addStatement("return $N", field)
                     .build();
 
-            getterImpls.add(getterImpl);
+            methodImpls.add(getterImpl);
         }
 
         TypeSpec.Builder messageImpl = 
TypeSpec.classBuilder(messageImplClassName)
                 .addModifiers(Modifier.PUBLIC)
                 .addSuperinterface(message.className())
                 .addFields(fields)
-                .addMethods(getterImpls)
+                .addMethods(methodImpls)
                 .addMethod(constructor(fields));
 
         // group type constant and getter
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
 
b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java
similarity index 58%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
copy to 
modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java
index 5b2bce2e2b..61e27fb2e1 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java
+++ 
b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java
@@ -15,18 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.command;
-
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.annotations.Transferable;
+package org.apache.ignite.network.annotations;
 
 /**
- * Remove command for MetaStorageCommandListener that removes an entry for the 
given key.
+ * Annotation for the {@link Transferable} class methods. If a method is 
marked with this annotation,
+ * a setter for the field with the same name as method's will be generated.
+ * In order to have access to this setter via interface one can use default 
method:
+ * <pre>
+ * {@code
+ *  @WithSetter
+ *  HybridTimestampMessage safeTime();
+ *
+ *  default void safeTime(HybridTimestampMessage safeTime) {
+ *       // No-op.
+ *  }
+ * }
+ * </pre>
+ * Note that fields with setters will not be final.
  */
-@Transferable(MetastorageCommandsMessageGroup.REMOVE)
-public interface RemoveCommand extends WriteCommand {
-    /**
-     * Returns the key. Couldn't be {@code null}.
-     */
-    byte[] key();
+public @interface WithSetter {
 }
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index eb8c04df9a..e73840369f 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -153,7 +153,8 @@ public class PlacementDriverManagerTest extends 
IgniteAbstractTest {
                 cmgManager,
                 logicalTopologyService,
                 raftManager,
-                storage
+                storage,
+                mock(HybridClock.class)
         );
 
         placementDriverManager = new PlacementDriverManager(
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
index 212a34807c..c5e0a0ba35 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.raft.service;
 
 import java.nio.file.Path;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * A listener for replication group events.
@@ -81,4 +84,13 @@ public interface RaftGroupListener {
      * Invoked once after a raft node has been shut down.
      */
     void onShutdown();
+
+    /**
+     * Invoked before submitting a command to a raft group.
+     *
+     * @param command The command.
+     */
+    default void onBeforeApply(Command command) {
+        // No-op.
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index b966f6e275..b91b60e3fe 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -19,8 +19,8 @@ package org.apache.ignite.raft.jraft.rpc.impl;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.concurrent.Executor;
-import org.apache.ignite.internal.logger.IgniteLogger;
+import java.util.concurrent.CompletableFuture;import 
java.util.concurrent.Executor;
+import java.util.function.BiFunction;import 
org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.Command;
@@ -73,6 +73,11 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
             return;
         }
 
+        JraftServerImpl.DelegatingStateMachine fsm = 
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
+
+        // Apply a filter before committing to STM.
+        fsm.getListener().onBeforeApply(request.command());
+
         if (request.command() instanceof WriteCommand) {
             applyWrite(node, request, rpcCtx);
         } else {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 15d061a732..d07d3487c1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
 import java.util.List;
@@ -47,6 +48,7 @@ import org.apache.ignite.internal.configuration.storage.Data;
 import 
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -154,7 +156,8 @@ public class ItDistributedConfigurationPropertiesTest {
                     cmgManager,
                     new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
                     raftManager,
-                    new SimpleInMemoryKeyValueStorage(name())
+                    new SimpleInMemoryKeyValueStorage(name()),
+                    mock(HybridClock.class)
             );
 
             // create a custom storage implementation that is able to "lose" 
some storage updates
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 4db3c7f5d2..694d475182 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.Serializable;
 import java.nio.file.Path;
@@ -40,6 +41,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer
 import org.apache.ignite.internal.configuration.SecurityConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -129,7 +131,8 @@ public class ItDistributedConfigurationStorageTest {
                     cmgManager,
                     new LogicalTopologyServiceImpl(logicalTopology, 
cmgManager),
                     raftManager,
-                    new SimpleInMemoryKeyValueStorage(name())
+                    new SimpleInMemoryKeyValueStorage(name()),
+                    mock(HybridClock.class)
             );
 
             cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultManager);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 8d03cfd3d9..8c85194bb8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -677,7 +678,8 @@ public class ItRebalanceDistributedTest {
                     raftManager,
                     
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
                             ? new RocksDbKeyValueStorage(nodeName, 
resolveDir(dir, "metaStorage"))
-                            : new SimpleInMemoryKeyValueStorage(nodeName)
+                            : new SimpleInMemoryKeyValueStorage(nodeName),
+                    mock(HybridClock.class)
             );
 
             cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultManager);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 32345e60a7..d200a9f890 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -301,7 +301,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 cmgManager,
                 new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
                 raftMgr,
-                new RocksDbKeyValueStorage(name, dir.resolve("metastorage"))
+                new RocksDbKeyValueStorage(name, dir.resolve("metastorage")),
+                mock(HybridClock.class)
         );
 
         var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 52a494ac92..9fa07b1a1c 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -380,7 +380,8 @@ public class IgniteImpl implements Ignite {
                 cmgMgr,
                 logicalTopologyService,
                 raftMgr,
-                new RocksDbKeyValueStorage(name, 
workDir.resolve(METASTORAGE_DB_PATH))
+                new RocksDbKeyValueStorage(name, 
workDir.resolve(METASTORAGE_DB_PATH)),
+                clock
         );
 
         this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vaultMgr);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index 5b8396b5d8..90b692a848 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
@@ -147,7 +148,7 @@ public class TableManagerDistributionZonesTest extends 
IgniteAbstractTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTime.class));
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
index a9f62a449c..d00f2cbf8e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -122,7 +123,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTime.class));
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 

Reply via email to