This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 48b3a2992a IGNITE-18026 Rework HLC propagation in parts related to 
Raft (#1496)
48b3a2992a is described below

commit 48b3a2992ac9d4aed72ae33d298d5d819715f735
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Jan 9 17:02:35 2023 +0200

    IGNITE-18026 Rework HLC propagation in parts related to Raft (#1496)
---
 .../internal/raft/ReplicationGroupOptions.java     |  48 -------
 .../apache/ignite/raft/server/ItSafeTimeTest.java  | 122 ------------------
 .../java/org/apache/ignite/internal/raft/Loza.java |  20 +--
 .../internal/raft/server/RaftGroupOptions.java     |  20 ---
 .../internal/raft/server/impl/JraftServerImpl.java |   4 -
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |   6 -
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  14 ---
 .../ignite/raft/jraft/option/FSMCallerOptions.java |  10 --
 .../ignite/raft/jraft/option/NodeOptions.java      |  14 ---
 .../raft/jraft/util/SafeTimeCandidateManager.java  |  90 --------------
 .../ignite/raft/jraft/core/FSMCallerTest.java      |   5 -
 .../jraft/util/SafeTimeCandidatesManagerTest.java  |  51 --------
 .../command/HybridTimestampMessage.java            |   6 +-
 .../command/SafeTimePropagatingCommand.java}       |  11 +-
 .../replicator/command/SafeTimeSyncCommand.java    |   3 +-
 .../replicator/message/ReplicaMessageGroup.java    |   4 +
 .../ignite/distributed/ItTablePersistenceTest.java |   4 +-
 .../distributed/ItTxDistributedTestSingleNode.java |  12 +-
 .../internal/table/distributed/TableManager.java   |  18 ++-
 .../table/distributed/TableMessageGroup.java       |   4 -
 .../table/distributed/command/FinishTxCommand.java |   1 +
 .../distributed/command/PartitionCommand.java      |   4 +-
 .../distributed/command/TxCleanupCommand.java      |   1 +
 .../table/distributed/raft/PartitionListener.java  |  18 ++-
 .../replicator/PartitionReplicaListener.java       |  15 ++-
 .../PartitionRaftCommandsSerializationTest.java    |   7 +-
 .../raft/PartitionCommandListenerTest.java         | 138 +++++++++++++++------
 .../table/impl/DummyInternalTableImpl.java         |   7 +-
 28 files changed, 173 insertions(+), 484 deletions(-)

diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/ReplicationGroupOptions.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/ReplicationGroupOptions.java
deleted file mode 100644
index 07357f2963..0000000000
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/ReplicationGroupOptions.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.raft;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-
-/**
- * Options that are specific for replication group.
- */
-public class ReplicationGroupOptions {
-    /** Safe time. */
-    private PendingComparableValuesTracker<HybridTimestamp> safeTime;
-
-    /**
-     * Safe time.
-     */
-    public PendingComparableValuesTracker<HybridTimestamp> safeTime() {
-        return safeTime;
-    }
-
-    /**
-     * Set the safe time clock.
-     *
-     * @param safeTime Safe time.
-     * @return This, for chaining.
-     */
-    public ReplicationGroupOptions 
safeTime(PendingComparableValuesTracker<HybridTimestamp> safeTime) {
-        this.safeTime = safeTime;
-
-        return this;
-    }
-}
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
deleted file mode 100644
index 8d89164025..0000000000
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.server;
-
-import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
-import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
-import static 
org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.TestHybridClock;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.ReplicationGroupOptions;
-import org.apache.ignite.internal.raft.server.RaftGroupOptions;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.raft.server.counter.CounterListener;
-import org.junit.jupiter.api.Test;
-
-/**
- * Integration test for checking safe time propagation.
- */
-public class ItSafeTimeTest extends JraftAbstractTest {
-    /** Raft group id. */
-    private static final ReplicationGroupId RAFT_GROUP_ID = new 
TestReplicationGroupId("testGroup");
-
-    /** Hybrid clocks. */
-    private final Map<String, HybridClock> clocks = new HashMap<>();
-
-    /** Safe times clocks. */
-    private final Map<String, PendingComparableValuesTracker<HybridTimestamp>> 
safeTimeContainers = new HashMap<>();
-
-    /**
-     * Starts a cluster for the test.
-     *
-     * @throws Exception If failed.
-     */
-    private void startCluster() throws Exception {
-        for (int i = 0; i < NODES; i++) {
-            HybridClock clock = new TestHybridClock(() -> 1L);
-            PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(clock.now());
-
-            startServer(i,
-                    raftServer -> {
-                        String localMemberName = 
raftServer.clusterService().topologyService().localMember().name();
-
-                        clocks.put(localMemberName, clock);
-                        safeTimeContainers.put(localMemberName, safeTime);
-
-                        RaftGroupOptions groupOptions = defaults()
-                                .replicationGroupOptions(new 
ReplicationGroupOptions().safeTime(safeTime));
-
-                        var nodeId = new RaftNodeId(RAFT_GROUP_ID, 
initialConf.peer(localMemberName));
-
-                        raftServer.startRaftNode(nodeId, initialConf, new 
CounterListener(), groupOptions);
-                    },
-                    opts -> {
-                        opts.setClock(clock);
-                        opts.setSafeTimeTracker(safeTime);
-                    }
-            );
-        }
-
-        startClient(RAFT_GROUP_ID);
-    }
-
-    /**
-     * Tests if a raft group become unavailable in case of a critical error.
-     */
-    @Test
-    public void test() throws Exception {
-        startCluster();
-
-        RaftGroupService client1 = clients.get(0);
-
-        client1.refreshLeader().get();
-
-        String leaderName = client1.leader().consistentId();
-
-        final long leaderPhysicalTime = 100;
-
-        clocks.get(leaderName).update(new HybridTimestamp(leaderPhysicalTime, 
0));
-
-        client1.run(incrementAndGetCommand(1)).get();
-
-        assertTrue(waitForCondition(() -> {
-            for (Peer peer : initialConf.peers()) {
-                String consistentId = peer.consistentId();
-
-                PendingComparableValuesTracker<HybridTimestamp> 
safeTimeContainer = safeTimeContainers.get(consistentId);
-
-                // As current time provider for safe time clocks always 
returns 1,
-                // the only way for physical component to reach 
leaderPhysicalTime is safe time propagation mechanism.
-                if (!consistentId.equals(leaderName) && 
safeTimeContainer.current().getPhysical() != leaderPhysicalTime) {
-                    return false;
-                }
-            }
-
-            return true;
-        }, 2000));
-    }
-}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index d293eeb412..25408e4cd8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -39,7 +38,6 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -49,7 +47,6 @@ import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.util.Utils;
-import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -107,25 +104,11 @@ public class Loza implements RaftManager {
      * @param dataPath Data path.
      * @param clock A hybrid logical clock.
      */
-    public Loza(ClusterService clusterNetSvc, RaftConfiguration 
raftConfiguration, Path dataPath, HybridClock clock) {
-        this(clusterNetSvc, raftConfiguration, dataPath, clock, null);
-    }
-
-    /**
-     * The constructor.
-     *
-     * @param clusterNetSvc Cluster network service.
-     * @param raftConfiguration Raft configuration.
-     * @param dataPath Data path.
-     * @param clock A hybrid logical clock.
-     * @param safeTimeTracker Safe time tracker.
-     */
     public Loza(
             ClusterService clusterNetSvc,
             RaftConfiguration raftConfiguration,
             Path dataPath,
-            HybridClock clock,
-            @Nullable PendingComparableValuesTracker<HybridTimestamp> 
safeTimeTracker
+            HybridClock clock
     ) {
         this.clusterNetSvc = clusterNetSvc;
         this.raftConfiguration = raftConfiguration;
@@ -133,7 +116,6 @@ public class Loza implements RaftManager {
         NodeOptions options = new NodeOptions();
 
         options.setClock(clock);
-        options.setSafeTimeTracker(safeTimeTracker);
 
         this.opts = options;
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index f7892d0b1e..d074ae35e8 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.raft.server;
 
-import org.apache.ignite.internal.raft.ReplicationGroupOptions;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
@@ -38,9 +37,6 @@ public class RaftGroupOptions {
     /** Raft meta storage factory. */
     private RaftMetaStorageFactory raftMetaStorageFactory;
 
-    /** Options that are specific for replication group. */
-    private ReplicationGroupOptions replicationGroupOptions;
-
     /**
      * Returns default options as defined by classic Raft (so stores are 
persistent).
      *
@@ -130,20 +126,4 @@ public class RaftGroupOptions {
 
         return this;
     }
-
-    /**
-     * Replication group options.
-     */
-    public ReplicationGroupOptions replicationGroupOptions() {
-        return replicationGroupOptions;
-    }
-
-    /**
-     * Set the replication group options.
-     */
-    public RaftGroupOptions replicationGroupOptions(ReplicationGroupOptions 
replicationGroupOptions) {
-        this.replicationGroupOptions = replicationGroupOptions;
-
-        return this;
-    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 135a642d0f..10e7725036 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -424,10 +424,6 @@ public class JraftServerImpl implements RaftServer {
 
             nodeOptions.setRpcClient(client);
 
-            if (groupOptions.replicationGroupOptions() != null) {
-                
nodeOptions.setSafeTimeTracker(groupOptions.replicationGroupOptions().safeTime());
-            }
-
             var server = new RaftGroupService(
                     nodeId.groupId().toString(),
                     PeerId.fromPeer(nodeId.peer()),
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index bbe6bd5ccc..be0f8207d9 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -59,7 +59,6 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.Requires;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 import org.apache.ignite.raft.jraft.util.Utils;
 
 /**
@@ -157,7 +156,6 @@ public class FSMCallerImpl implements FSMCaller {
     private NodeMetrics nodeMetrics;
     private final CopyOnWriteArrayList<LastAppliedLogIndexListener> 
lastAppliedLogIndexListeners = new CopyOnWriteArrayList<>();
     private RaftMessagesFactory msgFactory;
-    private SafeTimeCandidateManager safeTimeCandidateManager;
 
     public FSMCallerImpl() {
         super();
@@ -190,7 +188,6 @@ public class FSMCallerImpl implements FSMCaller {
         }
         this.error = new RaftException(ErrorType.ERROR_TYPE_NONE);
         this.msgFactory = opts.getRaftMessagesFactory();
-        this.safeTimeCandidateManager = opts.getSafeTimeCandidateManager();
         LOG.info("Starts FSMCaller successfully.");
         return true;
     }
@@ -541,9 +538,6 @@ public class FSMCallerImpl implements FSMCaller {
             this.lastAppliedTerm = lastTerm;
             this.logManager.setAppliedId(lastAppliedId);
             notifyLastAppliedIndexUpdated(lastIndex);
-            if (safeTimeCandidateManager != null) {
-                safeTimeCandidateManager.commitIndex(lastAppliedIndex, 
lastIndex, lastTerm);
-            }
         }
         finally {
             this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - 
startMs);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 706c6f0584..4e2a3e5e14 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -116,7 +116,6 @@ import 
org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.RepeatedTimer;
 import org.apache.ignite.raft.jraft.util.Requires;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
 import org.apache.ignite.raft.jraft.util.ThreadHelper;
@@ -138,8 +137,6 @@ public class NodeImpl implements Node, RaftServerService {
 
     private volatile HybridClock clock;
 
-    private volatile SafeTimeCandidateManager safeTimeCandidateManager;
-
     /**
      * Internal states
      */
@@ -827,7 +824,6 @@ public class NodeImpl implements Node, RaftServerService {
         opts.setBootstrapId(bootstrapId);
         opts.setRaftMessagesFactory(raftOptions.getRaftMessagesFactory());
         
opts.setfSMCallerExecutorDisruptor(options.getfSMCallerExecutorDisruptor());
-        opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
 
         return this.fsmCaller.init(opts);
     }
@@ -863,9 +859,6 @@ public class NodeImpl implements Node, RaftServerService {
         Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service 
factory");
         this.serviceFactory = opts.getServiceFactory();
         this.clock = opts.getNodeOptions().getClock();
-        if (opts.getNodeOptions().getSafeTimeTracker() != null) {
-            this.safeTimeCandidateManager = new 
SafeTimeCandidateManager(opts.getNodeOptions().getSafeTimeTracker());
-        }
         // Term is not an option since changing it is very dangerous
         final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0;
         final LogId bootstrapId = new LogId(opts.getLastLogIndex(), 
bootstrapLogTerm);
@@ -964,9 +957,6 @@ public class NodeImpl implements Node, RaftServerService {
         Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service 
factory");
         this.serviceFactory = opts.getServiceFactory();
         this.clock = opts.getClock();
-        if (opts.getSafeTimeTracker() != null) {
-            this.safeTimeCandidateManager = new 
SafeTimeCandidateManager(opts.getSafeTimeTracker());
-        }
         this.options = opts;
         this.raftOptions = opts.getRaftOptions();
         this.metrics = new NodeMetrics(opts.isEnableMetrics());
@@ -2228,10 +2218,6 @@ public class NodeImpl implements Node, RaftServerService 
{
                     }
                     entries.add(logEntry);
                 }
-
-                if (safeTimeCandidateManager != null) {
-                    safeTimeCandidateManager.addSafeTimeCandidate(index, 
request.term(), request.timestamp());
-                }
             }
 
             final FollowerStableClosure closure = new FollowerStableClosure(
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
index 5ceabf0192..d9855f3d4f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
@@ -25,7 +25,6 @@ import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.LogId;
 import org.apache.ignite.raft.jraft.storage.LogManager;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 
 /**
  * FSM caller options.
@@ -39,7 +38,6 @@ public class FSMCallerOptions {
     private NodeImpl node;
     private RaftMessagesFactory raftMessagesFactory;
     private StripedDisruptor<FSMCallerImpl.ApplyTask> 
fSMCallerExecutorDisruptor;
-    private SafeTimeCandidateManager safeTimeCandidateManager;
 
     public StripedDisruptor<FSMCallerImpl.ApplyTask> 
getfSMCallerExecutorDisruptor() {
         return fSMCallerExecutorDisruptor;
@@ -106,12 +104,4 @@ public class FSMCallerOptions {
     public void setRaftMessagesFactory(RaftMessagesFactory 
raftMessagesFactory) {
         this.raftMessagesFactory = raftMessagesFactory;
     }
-
-    public SafeTimeCandidateManager getSafeTimeCandidateManager() {
-        return safeTimeCandidateManager;
-    }
-
-    public void setSafeTimeCandidateManager(SafeTimeCandidateManager 
safeTimeCandidateManager) {
-        this.safeTimeCandidateManager = safeTimeCandidateManager;
-    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index ae5b6725a6..a810fa131e 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -19,10 +19,8 @@ package org.apache.ignite.raft.jraft.option;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.raft.JraftGroupEventsListener;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
 import org.apache.ignite.raft.jraft.StateMachine;
 import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -239,9 +237,6 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     /** A hybrid clock */
     private HybridClock clock = new HybridClockImpl();
 
-    /** A container for safe time. */
-    private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
-
     /**
      * Amount of Disruptors that will handle the RAFT server.
      */
@@ -603,14 +598,6 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         this.clock = clock;
     }
 
-    public PendingComparableValuesTracker<HybridTimestamp> 
getSafeTimeTracker() {
-        return safeTimeTracker;
-    }
-
-    public void 
setSafeTimeTracker(PendingComparableValuesTracker<HybridTimestamp> 
safeTimeTracker) {
-        this.safeTimeTracker = safeTimeTracker;
-    }
-
     @Override
     public NodeOptions copy() {
         final NodeOptions nodeOptions = new NodeOptions();
@@ -648,7 +635,6 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         
nodeOptions.setRpcInstallSnapshotTimeout(this.getRpcInstallSnapshotTimeout());
         
nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
         nodeOptions.setClock(this.getClock());
-        nodeOptions.setSafeTimeTracker(this.getSafeTimeTracker());
 
         return nodeOptions;
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
deleted file mode 100644
index cdfcedf008..0000000000
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.jraft.util;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-
-/**
- * This manager stores safe time candidates coming with appendEntries 
requests, and applies them after committing corresponding
- * indexes. Only candidates with correct term can be applied, other ones are 
discarded.
- */
-public class SafeTimeCandidateManager {
-    /** Safe time clock. */
-    private final PendingComparableValuesTracker<HybridTimestamp> 
safeTimeTracker;
-
-    /** Candidates map. */
-    private final Map<Long, Map<Long, HybridTimestamp>> safeTimeCandidates = 
new ConcurrentHashMap<>();
-
-    public 
SafeTimeCandidateManager(PendingComparableValuesTracker<HybridTimestamp> 
safeTimeTracker) {
-        this.safeTimeTracker = safeTimeTracker;
-    }
-
-    /**
-     * Add safe time candidate.
-     *
-     * @param index Corresponding log index.
-     * @param term Corresponding term.
-     * @param safeTime Safe time candidate.
-     */
-    public void addSafeTimeCandidate(long index, long term, HybridTimestamp 
safeTime) {
-        safeTimeCandidates.compute(index, (i, candidates) -> {
-            if (candidates == null) {
-                candidates = new HashMap<>();
-            }
-
-            candidates.put(term, safeTime);
-
-            return candidates;
-        });
-    }
-
-    /**
-     * Called on index commit, applies safe time for corresponding index.
-     *
-     * @param prevIndex Previous applied index.
-     * @param index Index.
-     * @param term Term.
-     */
-    public void commitIndex(long prevIndex, long index, long term) {
-        long currentIndex = prevIndex + 1;
-
-        while (currentIndex <= index) {
-            Map<Long, HybridTimestamp> candidates = 
safeTimeCandidates.remove(currentIndex);
-
-            if (candidates != null) {
-                HybridTimestamp safeTime = null;
-
-                for (Map.Entry<Long, HybridTimestamp> e : 
candidates.entrySet()) {
-                    if (e.getKey() == term) {
-                        safeTime = e.getValue();
-                    }
-                }
-
-                if (safeTime != null) {
-                    safeTimeTracker.update(safeTime);
-                }
-            }
-
-            currentIndex++;
-        }
-    }
-}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index 34b4dde924..d275c7f834 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -46,7 +46,6 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -80,8 +79,6 @@ public class FSMCallerTest {
 
     private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker = 
new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
 
-    private SafeTimeCandidateManager safeTimeCandidateManager = new 
SafeTimeCandidateManager(safeTimeTracker);
-
     @BeforeEach
     public void setup() {
         this.fsmCaller = new FSMCallerImpl();
@@ -103,7 +100,6 @@ public class FSMCallerTest {
             1024,
             () -> new FSMCallerImpl.ApplyTask(),
             1));
-        opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
         assertTrue(this.fsmCaller.init(opts));
     }
 
@@ -141,7 +137,6 @@ public class FSMCallerTest {
 
     @Test
     public void testOnCommitted() throws Exception {
-        safeTimeCandidateManager.addSafeTimeCandidate(11, 1, new 
HybridTimestamp(1, 1));
         final LogEntry log = new LogEntry(EntryType.ENTRY_TYPE_DATA);
         log.getId().setIndex(11);
         log.getId().setTerm(1);
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
deleted file mode 100644
index 4f07642fbb..0000000000
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.jraft.util;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/**
- * Test class for {@link SafeTimeCandidateManager}.
- */
-public class SafeTimeCandidatesManagerTest {
-    @Test
-    public void test() {
-        PendingComparableValuesTracker<HybridTimestamp> safeTimeClock = new 
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
-
-        SafeTimeCandidateManager safeTimeCandidateManager = new 
SafeTimeCandidateManager(safeTimeClock);
-
-        safeTimeCandidateManager.addSafeTimeCandidate(1, 1, new 
HybridTimestamp(1, 1));
-
-        safeTimeCandidateManager.commitIndex(0, 1, 1);
-        assertEquals(new HybridTimestamp(1, 1), safeTimeClock.current());
-
-        safeTimeCandidateManager.addSafeTimeCandidate(2, 1, new 
HybridTimestamp(10, 1));
-        safeTimeCandidateManager.addSafeTimeCandidate(2, 2, new 
HybridTimestamp(100, 1));
-        safeTimeCandidateManager.addSafeTimeCandidate(3, 3, new 
HybridTimestamp(1000, 1));
-
-        safeTimeCandidateManager.commitIndex(1, 2, 2);
-        assertEquals(new HybridTimestamp(100, 1), safeTimeClock.current());
-
-        safeTimeCandidateManager.commitIndex(2, 3, 3);
-        assertEquals(new HybridTimestamp(1000, 1), safeTimeClock.current());
-    }
-}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/HybridTimestampMessage.java
similarity index 87%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
rename to 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/HybridTimestampMessage.java
index b69545d1c5..055ab6107c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/HybridTimestampMessage.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.command;
+package org.apache.ignite.internal.replicator.command;
 
 import java.io.Serializable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Interface to represent {@link HybridTimestamp} as a {@link NetworkMessage}.
  */
-@Transferable(TableMessageGroup.Commands.HYBRID_TIMESTAMP)
+@Transferable(ReplicaMessageGroup.HYBRID_TIMESTAMP)
 public interface HybridTimestampMessage extends NetworkMessage, Serializable {
     long physical();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
similarity index 78%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
copy to 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
index 33c7547567..d054104c06 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
@@ -15,17 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.command;
+package org.apache.ignite.internal.replicator.command;
 
-import java.util.UUID;
 import org.apache.ignite.internal.raft.WriteCommand;
 
 /**
- * Partition transactional command.
+ * Common interface for commands carrying safe time.
  */
-public interface PartitionCommand extends WriteCommand {
+public interface SafeTimePropagatingCommand extends WriteCommand {
     /**
-     * Returns a transaction id.
+     * Returns safe time.
      */
-    UUID txId();
+    HybridTimestampMessage safeTime();
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
index 86549c26e7..ff4c794306 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.replicator.command;
 
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.network.annotations.Transferable;
 
@@ -25,5 +24,5 @@ import org.apache.ignite.network.annotations.Transferable;
  * Write command to synchronize safe time periodically.
  */
 @Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_COMMAND)
-public interface SafeTimeSyncCommand extends WriteCommand {
+public interface SafeTimeSyncCommand extends SafeTimePropagatingCommand {
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
index d01089d5f1..15eb985e6c 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.replicator.message;
 
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.network.annotations.MessageGroup;
 
@@ -51,4 +52,7 @@ public interface ReplicaMessageGroup {
 
     /** Message type for {@link SafeTimeSyncCommand}. */
     short SAFE_TIME_SYNC_COMMAND = 40;
+
+    /** Message type for {@link HybridTimestampMessage}. */
+    short HYBRID_TIMESTAMP = 60;
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d0bf0e540a..55863ca962 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
@@ -275,7 +276,8 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                             new TestTxStateStorage(),
                             txManager,
                             Map::of,
-                            0
+                            0,
+                            new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0))
                     );
 
                     paths.put(listener, workDir);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 9673817b74..d7bdd1f777 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -264,8 +264,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                     cluster.get(i),
                     raftConfiguration,
                     workDir.resolve("node" + i),
-                    clock,
-                    new PendingComparableValuesTracker<>(clock.now())
+                    clock
             );
 
             raftSrv.start();
@@ -417,6 +416,9 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
                 PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(partAssignments);
 
+                PendingComparableValuesTracker<HybridTimestamp> safeTime =
+                        new 
PendingComparableValuesTracker<>(clocks.get(assignment).now());
+
                 CompletableFuture<Void> partitionReadyFuture = 
raftServers.get(assignment).startRaftGroupNode(
                         new RaftNodeId(grpId, configuration.peer(assignment)),
                         configuration,
@@ -425,15 +427,13 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                                 new TestTxStateStorage(),
                                 txManagers.get(assignment),
                                 () -> Map.of(pkStorage.get().id(), 
pkStorage.get()),
-                                partId
+                                partId,
+                                safeTime
                         ),
                         RaftGroupEventsListener.noopLsnr
                 ).thenAccept(
                         raftSvc -> {
                             try {
-                                
PendingComparableValuesTracker<HybridTimestamp> safeTime =
-                                        new 
PendingComparableValuesTracker<>(clocks.get(assignment).now());
-
                                 replicaManagers.get(assignment).startReplica(
                                         new TablePartitionId(tblId, partId),
                                         new PartitionReplicaListener(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e61c216837..d341c5397e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -93,7 +93,6 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.ReplicationGroupOptions;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -756,8 +755,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                         RaftGroupOptions groupOptions = 
groupOptionsForPartition(
                                                 internalTbl.storage(),
                                                 internalTbl.txStateStorage(),
-                                                partitionKey(internalTbl, 
partId),
-                                                safeTime
+                                                partitionKey(internalTbl, 
partId)
                                         );
 
                                         Peer serverPeer = 
newConfiguration.peer(localMemberName);
@@ -774,7 +772,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                             
txStatePartitionStorage,
                                                             txManager,
                                                             
table.indexStorageAdapters(partId),
-                                                            partId
+                                                            partId,
+                                                            safeTime
                                                     ),
                                                     new 
RebalanceRaftGroupEventsListener(
                                                             metaStorageMgr,
@@ -907,8 +906,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     private RaftGroupOptions groupOptionsForPartition(
             MvTableStorage mvTableStorage,
             TxStateTableStorage txStateTableStorage,
-            PartitionKey partitionKey,
-            PendingComparableValuesTracker<HybridTimestamp> safeTime
+            PartitionKey partitionKey
     ) {
         RaftGroupOptions raftGroupOptions;
 
@@ -928,8 +926,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 incomingSnapshotsExecutor
         ));
 
-        raftGroupOptions.replicationGroupOptions(new 
ReplicationGroupOptions().safeTime(safeTime));
-
         return raftGroupOptions;
     }
 
@@ -1841,8 +1837,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         RaftGroupOptions groupOptions = 
groupOptionsForPartition(
                                 internalTable.storage(),
                                 internalTable.txStateStorage(),
-                                partitionKey(internalTable, partId),
-                                safeTime
+                                partitionKey(internalTable, partId)
                         );
 
                         RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -1850,7 +1845,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 txStatePartitionStorage,
                                 txManager,
                                 tbl.indexStorageAdapters(partId),
-                                partId
+                                partId,
+                                safeTime
                         );
 
                         RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 7955b34108..8a2b1990d1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.table.distributed;
 import static 
org.apache.ignite.internal.table.distributed.TableMessageGroup.GROUP_TYPE;
 
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
-import 
org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
@@ -153,9 +152,6 @@ public interface TableMessageGroup {
         /** Message type for {@link UpdateCommand}. */
         short UPDATE = 43;
 
-        /** Message type for {@link HybridTimestampMessage}. */
-        short HYBRID_TIMESTAMP = 60;
-
         /** Message type for {@link TablePartitionIdMessage}. */
         short TABLE_PARTITION_ID = 61;
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
index 77a7f789e2..11b7a24913 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.command;
 
 import java.util.List;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.network.annotations.Transferable;
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
index 33c7547567..d28ed66ea4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.internal.table.distributed.command;
 
 import java.util.UUID;
-import org.apache.ignite.internal.raft.WriteCommand;
+import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 
 /**
  * Partition transactional command.
  */
-public interface PartitionCommand extends WriteCommand {
+public interface PartitionCommand extends SafeTimePropagatingCommand {
     /**
      * Returns a transaction id.
      */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
index f336faaea2..2621731e00 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.network.annotations.Transferable;
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index aa7f0c7ab3..3e39959196 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.CommittedConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -91,25 +93,31 @@ public class PartitionListener implements RaftGroupListener 
{
     /** Rows that were inserted, updated or removed. */
     private final HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>();
 
+    /** Safe time tracker. */
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
     /**
      * The constructor.
      *
      * @param partitionDataStorage  The storage.
      * @param txManager Transaction manager.
      * @param partitionId Partition ID this listener serves.
+     * @param safeTime Safe time tracker.
      */
     public PartitionListener(
             PartitionDataStorage partitionDataStorage,
             TxStateStorage txStateStorage,
             TxManager txManager,
             Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
-            int partitionId
+            int partitionId,
+            PendingComparableValuesTracker<HybridTimestamp> safeTime
     ) {
         this.storage = partitionDataStorage;
         this.txStateStorage = txStateStorage;
         this.txManager = txManager;
         this.indexes = indexes;
         this.partitionId = partitionId;
+        this.safeTime = safeTime;
 
         // TODO: IGNITE-18502 Implement a pending update storage
         try (PartitionTimestampCursor cursor = 
partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
@@ -162,6 +170,14 @@ public class PartitionListener implements 
RaftGroupListener {
 
             storage.acquirePartitionSnapshotsReadLock();
 
+            if (command instanceof SafeTimePropagatingCommand) {
+                SafeTimePropagatingCommand safeTimePropagatingCommand = 
(SafeTimePropagatingCommand) command;
+
+                assert safeTimePropagatingCommand.safeTime() != null;
+
+                
safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
+            }
+
             try {
                 if (command instanceof UpdateCommand) {
                     handleUpdateCommand((UpdateCommand) command, commandIndex, 
commandTerm);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 607855ae41..8c080fd2e9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
@@ -77,7 +78,6 @@ import 
org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import 
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
-import 
org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
@@ -509,7 +509,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future.
      */
     private CompletionStage<Void> 
processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
-        return 
raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().build());
+        return 
raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
     }
 
     /**
@@ -987,11 +987,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         txTimestampUpdateMap.put(txId, fut);
 
-        HybridTimestamp commitTimestamp = commit ? hybridClock.now() : null;
+        HybridTimestamp currentTimestamp = hybridClock.now();
+        HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
 
         FinishTxCommandBuilder finishTxCmdBldr = msgFactory.finishTxCommand()
                 .txId(txId)
                 .commit(commit)
+                .safeTime(hybridTimestamp(currentTimestamp))
                 .tablePartitionIds(aggregatedGroupIds.stream()
                         .map(rgId -> tablePartitionId((TablePartitionId) 
rgId)).collect(Collectors.toList()));
 
@@ -1035,6 +1037,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .txId(request.txId())
                 .commit(request.commit())
                 .commitTimestamp(timestampMsg)
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build();
 
         return raftClient
@@ -1916,7 +1919,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return {@link HybridTimestampMessage} object obtained from {@link 
HybridTimestamp}.
      */
     private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
-        return tmstmp != null ? msgFactory.hybridTimestampMessage()
+        return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
                 .physical(tmstmp.getPhysical())
                 .logical(tmstmp.getLogical())
                 .build()
@@ -1936,7 +1939,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         UpdateCommandBuilder bldr = msgFactory.updateCommand()
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowUuid(rowUuid)
-                .txId(txId);
+                .txId(txId)
+                .safeTime(hybridTimestamp(hybridClock.now()));
 
         if (rowBuf != null) {
             bldr.rowBuffer(rowBuf);
@@ -1958,6 +1962,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowsToUpdate(rowsToUpdate)
                 .txId(txId)
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build();
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 7d885903f9..5eab41ea95 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -33,6 +33,8 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -57,6 +59,9 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
     /** Message factory to create messages - RAFT commands. */
     private TableMessagesFactory msgFactory = new TableMessagesFactory();
 
+    /** Factory for replica messages. */
+    private ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
     @BeforeAll
     static void beforeAll() {
         var marshallerFactory = new ReflectionMarshallerFactory();
@@ -216,7 +221,7 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
     }
 
     private HybridTimestampMessage hybridTimestampMessage(HybridTimestamp 
tmstmp) {
-        return msgFactory.hybridTimestampMessage()
+        return replicaMessagesFactory.hybridTimestampMessage()
                 .logical(tmstmp.getLogical())
                 .physical(tmstmp.getPhysical())
                 .build();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index d688b11635..f691e75bbf 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -50,6 +51,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -58,6 +60,8 @@ import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.CommittedConfiguration;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import 
org.apache.ignite.internal.replicator.command.SafeTimeSyncCommandBuilder;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
@@ -93,6 +97,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.BeforeEach;
@@ -158,6 +163,15 @@ public class PartitionCommandListenerTest {
     /** Factory for command messages. */
     private TableMessagesFactory msgFactory = new TableMessagesFactory();
 
+    /** Factory for replica messages. */
+    private ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+    /** Hybrid clock. */
+    private HybridClock hybridClock;
+
+    /** Safe time tracker. */
+    private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
+
     @Captor
     private ArgumentCaptor<Throwable> commandClosureResultCaptor;
 
@@ -174,12 +188,17 @@ public class PartitionCommandListenerTest {
 
         ReplicaService replicaService = mock(ReplicaService.class, 
RETURNS_DEEP_STUBS);
 
+        hybridClock = new HybridClockImpl();
+
+        safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
+
         commandListener = new PartitionListener(
                 partitionDataStorage,
                 txStateStorage,
-                new TxManagerImpl(replicaService, new HeapLockManager(), new 
HybridClockImpl()),
+                new TxManagerImpl(replicaService, new HeapLockManager(), 
hybridClock),
                 () -> Map.of(pkStorage.id(), pkStorage),
-                PARTITION_ID
+                PARTITION_ID,
+                safeTimeTracker
         );
     }
 
@@ -268,7 +287,8 @@ public class PartitionCommandListenerTest {
                 txStateStorage,
                 new TxManagerImpl(replicaService, new HeapLockManager(), new 
HybridClockImpl()),
                 () -> Map.of(pkStorage.id(), pkStorage),
-                PARTITION_ID
+                PARTITION_ID,
+                new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
         );
 
         txStateStorage.lastApplied(3L, 1L);
@@ -306,12 +326,26 @@ public class PartitionCommandListenerTest {
     void testSkipWriteCommandByAppliedIndex() {
         mvPartitionStorage.lastApplied(10L, 1L);
 
+        HybridTimestampMessage timestamp = hybridTimestamp(hybridClock.now());
+
+        UpdateCommand updateCommand = mock(UpdateCommand.class);
+        when(updateCommand.safeTime()).thenReturn(timestamp);
+
+        TxCleanupCommand txCleanupCommand = mock(TxCleanupCommand.class);
+        when(txCleanupCommand.safeTime()).thenReturn(timestamp);
+
+        SafeTimeSyncCommand safeTimeSyncCommand = 
mock(SafeTimeSyncCommand.class);
+        when(safeTimeSyncCommand.safeTime()).thenReturn(timestamp);
+
+        FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
+        when(finishTxCommand.safeTime()).thenReturn(timestamp);
+
         // Checks for MvPartitionStorage.
         commandListener.onWrite(List.of(
-                writeCommandCommandClosure(3, 1, mock(UpdateCommand.class), 
commandClosureResultCaptor),
-                writeCommandCommandClosure(10, 1, mock(UpdateCommand.class), 
commandClosureResultCaptor),
-                writeCommandCommandClosure(4, 1, mock(TxCleanupCommand.class), 
commandClosureResultCaptor),
-                writeCommandCommandClosure(5, 1, 
mock(SafeTimeSyncCommand.class), commandClosureResultCaptor)
+                writeCommandCommandClosure(3, 1, updateCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(10, 1, updateCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(4, 1, txCleanupCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(5, 1, safeTimeSyncCommand, 
commandClosureResultCaptor)
         ).iterator());
 
         verify(mvPartitionStorage, 
never()).runConsistently(any(WriteClosure.class));
@@ -326,8 +360,8 @@ public class PartitionCommandListenerTest {
         commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
 
         commandListener.onWrite(List.of(
-                writeCommandCommandClosure(2, 1, mock(FinishTxCommand.class), 
commandClosureResultCaptor),
-                writeCommandCommandClosure(10, 1, mock(FinishTxCommand.class), 
commandClosureResultCaptor)
+                writeCommandCommandClosure(2, 1, finishTxCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(10, 1, finishTxCommand, 
commandClosureResultCaptor)
         ).iterator());
 
         verify(txStateStorage, never()).compareAndSet(any(UUID.class), 
any(TxState.class), any(TxMeta.class), anyLong(), anyLong());
@@ -338,7 +372,10 @@ public class PartitionCommandListenerTest {
 
     @Test
     void updatesLastAppliedForSafeTimeSyncCommands() {
-        SafeTimeSyncCommand safeTimeSyncCommand = new 
ReplicaMessagesFactory().safeTimeSyncCommand().build();
+        SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory()
+                .safeTimeSyncCommand()
+                .safeTime(hybridTimestamp(hybridClock.now()))
+                .build();
 
         commandListener.onWrite(List.of(
                 writeCommandCommandClosure(3, 2, safeTimeSyncCommand, 
commandClosureResultCaptor)
@@ -349,7 +386,9 @@ public class PartitionCommandListenerTest {
 
     @Test
     void locksOnCommandApplication() {
-        SafeTimeSyncCommandBuilder safeTimeSyncCommand = new 
ReplicaMessagesFactory().safeTimeSyncCommand();
+        SafeTimeSyncCommandBuilder safeTimeSyncCommand = new 
ReplicaMessagesFactory()
+                .safeTimeSyncCommand()
+                .safeTime(hybridTimestamp(hybridClock.now()));
 
         commandListener.onWrite(List.of(
                 writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), 
commandClosureResultCaptor)
@@ -407,6 +446,25 @@ public class PartitionCommandListenerTest {
         
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
     }
 
+    @Test
+    public void testSafeTime() {
+        HybridClock testClock = new TestHybridClock(() -> 1);
+
+        applySafeTimeCommand(SafeTimeSyncCommand.class, testClock.now());
+        applySafeTimeCommand(SafeTimeSyncCommand.class, testClock.now());
+    }
+
+    private void applySafeTimeCommand(Class<? extends 
SafeTimePropagatingCommand> cls, HybridTimestamp timestamp) {
+        HybridTimestampMessage safeTime = hybridTimestamp(timestamp);
+
+        SafeTimePropagatingCommand command = mock(cls);
+        when(command.safeTime()).thenReturn(safeTime);
+
+        CommandClosure<WriteCommand> closure = writeCommandCommandClosure(1, 
1, command, commandClosureResultCaptor);
+        commandListener.onWrite(asList(closure).iterator());
+        assertEquals(timestamp, safeTimeTracker.current());
+    }
+
     /**
      * Crate a command closure.
      *
@@ -516,14 +574,13 @@ public class PartitionCommandListenerTest {
                                 .build())
                 .rowsToUpdate(rows)
                 .txId(txId)
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build());
         invokeBatchedCommand(msgFactory.txCleanupCommand()
                 .txId(txId)
                 .commit(true)
-                .commitTimestamp(msgFactory.hybridTimestampMessage()
-                        .physical(commitTimestamp.getPhysical())
-                        .logical(commitTimestamp.getLogical())
-                        .build())
+                .commitTimestamp(hybridTimestamp(commitTimestamp))
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build());
     }
 
@@ -553,14 +610,13 @@ public class PartitionCommandListenerTest {
                                 .build())
                 .rowsToUpdate(rows)
                 .txId(txId)
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build());
         invokeBatchedCommand(msgFactory.txCleanupCommand()
                 .txId(txId)
                 .commit(true)
-                .commitTimestamp(msgFactory.hybridTimestampMessage()
-                        .physical(commitTimestamp.getPhysical())
-                        .logical(commitTimestamp.getLogical())
-                        .build())
+                .commitTimestamp(hybridTimestamp(commitTimestamp))
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build());
     }
 
@@ -588,14 +644,13 @@ public class PartitionCommandListenerTest {
                                 .build())
                 .rowsToUpdate(keyRows)
                 .txId(txId)
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build());
         invokeBatchedCommand(msgFactory.txCleanupCommand()
                 .txId(txId)
                 .commit(true)
-                .commitTimestamp(msgFactory.hybridTimestampMessage()
-                        .physical(commitTimestamp.getPhysical())
-                        .logical(commitTimestamp.getLogical())
-                        .build())
+                .commitTimestamp(hybridTimestamp(commitTimestamp))
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build());
     }
 
@@ -626,6 +681,7 @@ public class PartitionCommandListenerTest {
                             .rowUuid(rowId.uuid())
                             .rowBuffer(row.byteBuffer())
                             .txId(txId)
+                            .safeTime(hybridTimestamp(hybridClock.now()))
                             .build());
 
             doAnswer(invocation -> {
@@ -640,10 +696,8 @@ public class PartitionCommandListenerTest {
         txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
                 .txId(txId)
                 .commit(true)
-                .commitTimestamp(msgFactory.hybridTimestampMessage()
-                        .physical(commitTimestamp.getPhysical())
-                        .logical(commitTimestamp.getLogical())
-                        .build())
+                .commitTimestamp(hybridTimestamp(commitTimestamp))
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build()));
     }
 
@@ -671,6 +725,7 @@ public class PartitionCommandListenerTest {
                                     .partitionId(PARTITION_ID).build())
                             .rowUuid(rowId.uuid())
                             .txId(txId)
+                            .safeTime(hybridTimestamp(hybridClock.now()))
                             .build());
 
             doAnswer(invocation -> {
@@ -685,10 +740,8 @@ public class PartitionCommandListenerTest {
         txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
                 .txId(txId)
                 .commit(true)
-                .commitTimestamp(msgFactory.hybridTimestampMessage()
-                        .physical(commitTimestamp.getPhysical())
-                        .logical(commitTimestamp.getLogical())
-                        .build())
+                .commitTimestamp(hybridTimestamp(commitTimestamp))
+                .safeTime(hybridTimestamp(hybridClock.now()))
                 .build()));
     }
 
@@ -747,6 +800,7 @@ public class PartitionCommandListenerTest {
                             .rowUuid(Timestamp.nextVersion().toUuid())
                             .rowBuffer(row.byteBuffer())
                             .txId(txId)
+                            .safeTime(hybridTimestamp(hybridClock.now()))
                             .build());
 
             doAnswer(invocation -> {
@@ -758,16 +812,12 @@ public class PartitionCommandListenerTest {
 
         HybridTimestamp now = CLOCK.now();
 
-
-
         txIds.forEach(txId -> invokeBatchedCommand(
                 msgFactory.txCleanupCommand()
                         .txId(txId)
                         .commit(true)
-                        .commitTimestamp(msgFactory.hybridTimestampMessage()
-                                .physical(now.getPhysical())
-                                .logical(now.getLogical())
-                                .build())
+                        .commitTimestamp(hybridTimestamp(now))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
                         .build()));
     }
 
@@ -829,4 +879,18 @@ public class PartitionCommandListenerTest {
 
         return null;
     }
+
+    /**
+     * Method to convert from {@link HybridTimestamp} object to 
NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link 
HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link 
HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
+        return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
+                .physical(tmstmp.getPhysical())
+                .logical(tmstmp.getLogical())
+                .build()
+                : null;
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 64738defb3..407aac6d4a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -33,6 +33,7 @@ import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.WriteCommand;
@@ -237,6 +238,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         IndexLocker pkLocker = new HashIndexLocker(indexId, true, 
this.txManager.lockManager(), row2tuple);
 
         HybridClock clock = new HybridClockImpl();
+        PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(clock.now());
 
         replicaListener = new PartitionReplicaListener(
                 mvPartStorage,
@@ -250,7 +252,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 pkStorage,
                 () -> Map.of(),
                 clock,
-                new PendingComparableValuesTracker<>(clock.now()),
+                safeTime,
                 txStateStorage().getOrCreateTxStateStorage(0),
                 placementDriver,
                 peer -> true
@@ -261,7 +263,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 txStateStorage().getOrCreateTxStateStorage(0),
                 this.txManager,
                 () -> Map.of(pkStorage.get().id(), pkStorage.get()),
-                0
+                0,
+                safeTime
         );
     }
 

Reply via email to