This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 48b3a2992a IGNITE-18026 Rework HLC propagation in parts related to
Raft (#1496)
48b3a2992a is described below
commit 48b3a2992ac9d4aed72ae33d298d5d819715f735
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Jan 9 17:02:35 2023 +0200
IGNITE-18026 Rework HLC propagation in parts related to Raft (#1496)
---
.../internal/raft/ReplicationGroupOptions.java | 48 -------
.../apache/ignite/raft/server/ItSafeTimeTest.java | 122 ------------------
.../java/org/apache/ignite/internal/raft/Loza.java | 20 +--
.../internal/raft/server/RaftGroupOptions.java | 20 ---
.../internal/raft/server/impl/JraftServerImpl.java | 4 -
.../ignite/raft/jraft/core/FSMCallerImpl.java | 6 -
.../apache/ignite/raft/jraft/core/NodeImpl.java | 14 ---
.../ignite/raft/jraft/option/FSMCallerOptions.java | 10 --
.../ignite/raft/jraft/option/NodeOptions.java | 14 ---
.../raft/jraft/util/SafeTimeCandidateManager.java | 90 --------------
.../ignite/raft/jraft/core/FSMCallerTest.java | 5 -
.../jraft/util/SafeTimeCandidatesManagerTest.java | 51 --------
.../command/HybridTimestampMessage.java | 6 +-
.../command/SafeTimePropagatingCommand.java} | 11 +-
.../replicator/command/SafeTimeSyncCommand.java | 3 +-
.../replicator/message/ReplicaMessageGroup.java | 4 +
.../ignite/distributed/ItTablePersistenceTest.java | 4 +-
.../distributed/ItTxDistributedTestSingleNode.java | 12 +-
.../internal/table/distributed/TableManager.java | 18 ++-
.../table/distributed/TableMessageGroup.java | 4 -
.../table/distributed/command/FinishTxCommand.java | 1 +
.../distributed/command/PartitionCommand.java | 4 +-
.../distributed/command/TxCleanupCommand.java | 1 +
.../table/distributed/raft/PartitionListener.java | 18 ++-
.../replicator/PartitionReplicaListener.java | 15 ++-
.../PartitionRaftCommandsSerializationTest.java | 7 +-
.../raft/PartitionCommandListenerTest.java | 138 +++++++++++++++------
.../table/impl/DummyInternalTableImpl.java | 7 +-
28 files changed, 173 insertions(+), 484 deletions(-)
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/ReplicationGroupOptions.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/ReplicationGroupOptions.java
deleted file mode 100644
index 07357f2963..0000000000
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/ReplicationGroupOptions.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.raft;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-
-/**
- * Options that are specific for replication group.
- */
-public class ReplicationGroupOptions {
- /** Safe time. */
- private PendingComparableValuesTracker<HybridTimestamp> safeTime;
-
- /**
- * Safe time.
- */
- public PendingComparableValuesTracker<HybridTimestamp> safeTime() {
- return safeTime;
- }
-
- /**
- * Set the safe time clock.
- *
- * @param safeTime Safe time.
- * @return This, for chaining.
- */
- public ReplicationGroupOptions
safeTime(PendingComparableValuesTracker<HybridTimestamp> safeTime) {
- this.safeTime = safeTime;
-
- return this;
- }
-}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
deleted file mode 100644
index 8d89164025..0000000000
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.server;
-
-import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
-import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
-import static
org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.TestHybridClock;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.ReplicationGroupOptions;
-import org.apache.ignite.internal.raft.server.RaftGroupOptions;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.raft.server.counter.CounterListener;
-import org.junit.jupiter.api.Test;
-
-/**
- * Integration test for checking safe time propagation.
- */
-public class ItSafeTimeTest extends JraftAbstractTest {
- /** Raft group id. */
- private static final ReplicationGroupId RAFT_GROUP_ID = new
TestReplicationGroupId("testGroup");
-
- /** Hybrid clocks. */
- private final Map<String, HybridClock> clocks = new HashMap<>();
-
- /** Safe times clocks. */
- private final Map<String, PendingComparableValuesTracker<HybridTimestamp>>
safeTimeContainers = new HashMap<>();
-
- /**
- * Starts a cluster for the test.
- *
- * @throws Exception If failed.
- */
- private void startCluster() throws Exception {
- for (int i = 0; i < NODES; i++) {
- HybridClock clock = new TestHybridClock(() -> 1L);
- PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(clock.now());
-
- startServer(i,
- raftServer -> {
- String localMemberName =
raftServer.clusterService().topologyService().localMember().name();
-
- clocks.put(localMemberName, clock);
- safeTimeContainers.put(localMemberName, safeTime);
-
- RaftGroupOptions groupOptions = defaults()
- .replicationGroupOptions(new
ReplicationGroupOptions().safeTime(safeTime));
-
- var nodeId = new RaftNodeId(RAFT_GROUP_ID,
initialConf.peer(localMemberName));
-
- raftServer.startRaftNode(nodeId, initialConf, new
CounterListener(), groupOptions);
- },
- opts -> {
- opts.setClock(clock);
- opts.setSafeTimeTracker(safeTime);
- }
- );
- }
-
- startClient(RAFT_GROUP_ID);
- }
-
- /**
- * Tests if a raft group become unavailable in case of a critical error.
- */
- @Test
- public void test() throws Exception {
- startCluster();
-
- RaftGroupService client1 = clients.get(0);
-
- client1.refreshLeader().get();
-
- String leaderName = client1.leader().consistentId();
-
- final long leaderPhysicalTime = 100;
-
- clocks.get(leaderName).update(new HybridTimestamp(leaderPhysicalTime,
0));
-
- client1.run(incrementAndGetCommand(1)).get();
-
- assertTrue(waitForCondition(() -> {
- for (Peer peer : initialConf.peers()) {
- String consistentId = peer.consistentId();
-
- PendingComparableValuesTracker<HybridTimestamp>
safeTimeContainer = safeTimeContainers.get(consistentId);
-
- // As current time provider for safe time clocks always
returns 1,
- // the only way for physical component to reach
leaderPhysicalTime is safe time propagation mechanism.
- if (!consistentId.equals(leaderName) &&
safeTimeContainer.current().getPhysical() != leaderPhysicalTime) {
- return false;
- }
- }
-
- return true;
- }, 2000));
- }
-}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index d293eeb412..25408e4cd8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -39,7 +38,6 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
@@ -49,7 +47,6 @@ import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.util.Utils;
-import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -107,25 +104,11 @@ public class Loza implements RaftManager {
* @param dataPath Data path.
* @param clock A hybrid logical clock.
*/
- public Loza(ClusterService clusterNetSvc, RaftConfiguration
raftConfiguration, Path dataPath, HybridClock clock) {
- this(clusterNetSvc, raftConfiguration, dataPath, clock, null);
- }
-
- /**
- * The constructor.
- *
- * @param clusterNetSvc Cluster network service.
- * @param raftConfiguration Raft configuration.
- * @param dataPath Data path.
- * @param clock A hybrid logical clock.
- * @param safeTimeTracker Safe time tracker.
- */
public Loza(
ClusterService clusterNetSvc,
RaftConfiguration raftConfiguration,
Path dataPath,
- HybridClock clock,
- @Nullable PendingComparableValuesTracker<HybridTimestamp>
safeTimeTracker
+ HybridClock clock
) {
this.clusterNetSvc = clusterNetSvc;
this.raftConfiguration = raftConfiguration;
@@ -133,7 +116,6 @@ public class Loza implements RaftManager {
NodeOptions options = new NodeOptions();
options.setClock(clock);
- options.setSafeTimeTracker(safeTimeTracker);
this.opts = options;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index f7892d0b1e..d074ae35e8 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.raft.server;
-import org.apache.ignite.internal.raft.ReplicationGroupOptions;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
@@ -38,9 +37,6 @@ public class RaftGroupOptions {
/** Raft meta storage factory. */
private RaftMetaStorageFactory raftMetaStorageFactory;
- /** Options that are specific for replication group. */
- private ReplicationGroupOptions replicationGroupOptions;
-
/**
* Returns default options as defined by classic Raft (so stores are
persistent).
*
@@ -130,20 +126,4 @@ public class RaftGroupOptions {
return this;
}
-
- /**
- * Replication group options.
- */
- public ReplicationGroupOptions replicationGroupOptions() {
- return replicationGroupOptions;
- }
-
- /**
- * Set the replication group options.
- */
- public RaftGroupOptions replicationGroupOptions(ReplicationGroupOptions
replicationGroupOptions) {
- this.replicationGroupOptions = replicationGroupOptions;
-
- return this;
- }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 135a642d0f..10e7725036 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -424,10 +424,6 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setRpcClient(client);
- if (groupOptions.replicationGroupOptions() != null) {
-
nodeOptions.setSafeTimeTracker(groupOptions.replicationGroupOptions().safeTime());
- }
-
var server = new RaftGroupService(
nodeId.groupId().toString(),
PeerId.fromPeer(nodeId.peer()),
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index bbe6bd5ccc..be0f8207d9 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -59,7 +59,6 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.Requires;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
import org.apache.ignite.raft.jraft.util.Utils;
/**
@@ -157,7 +156,6 @@ public class FSMCallerImpl implements FSMCaller {
private NodeMetrics nodeMetrics;
private final CopyOnWriteArrayList<LastAppliedLogIndexListener>
lastAppliedLogIndexListeners = new CopyOnWriteArrayList<>();
private RaftMessagesFactory msgFactory;
- private SafeTimeCandidateManager safeTimeCandidateManager;
public FSMCallerImpl() {
super();
@@ -190,7 +188,6 @@ public class FSMCallerImpl implements FSMCaller {
}
this.error = new RaftException(ErrorType.ERROR_TYPE_NONE);
this.msgFactory = opts.getRaftMessagesFactory();
- this.safeTimeCandidateManager = opts.getSafeTimeCandidateManager();
LOG.info("Starts FSMCaller successfully.");
return true;
}
@@ -541,9 +538,6 @@ public class FSMCallerImpl implements FSMCaller {
this.lastAppliedTerm = lastTerm;
this.logManager.setAppliedId(lastAppliedId);
notifyLastAppliedIndexUpdated(lastIndex);
- if (safeTimeCandidateManager != null) {
- safeTimeCandidateManager.commitIndex(lastAppliedIndex,
lastIndex, lastTerm);
- }
}
finally {
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() -
startMs);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 706c6f0584..4e2a3e5e14 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -116,7 +116,6 @@ import
org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.RepeatedTimer;
import org.apache.ignite.raft.jraft.util.Requires;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
@@ -138,8 +137,6 @@ public class NodeImpl implements Node, RaftServerService {
private volatile HybridClock clock;
- private volatile SafeTimeCandidateManager safeTimeCandidateManager;
-
/**
* Internal states
*/
@@ -827,7 +824,6 @@ public class NodeImpl implements Node, RaftServerService {
opts.setBootstrapId(bootstrapId);
opts.setRaftMessagesFactory(raftOptions.getRaftMessagesFactory());
opts.setfSMCallerExecutorDisruptor(options.getfSMCallerExecutorDisruptor());
- opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
return this.fsmCaller.init(opts);
}
@@ -863,9 +859,6 @@ public class NodeImpl implements Node, RaftServerService {
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service
factory");
this.serviceFactory = opts.getServiceFactory();
this.clock = opts.getNodeOptions().getClock();
- if (opts.getNodeOptions().getSafeTimeTracker() != null) {
- this.safeTimeCandidateManager = new
SafeTimeCandidateManager(opts.getNodeOptions().getSafeTimeTracker());
- }
// Term is not an option since changing it is very dangerous
final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0;
final LogId bootstrapId = new LogId(opts.getLastLogIndex(),
bootstrapLogTerm);
@@ -964,9 +957,6 @@ public class NodeImpl implements Node, RaftServerService {
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service
factory");
this.serviceFactory = opts.getServiceFactory();
this.clock = opts.getClock();
- if (opts.getSafeTimeTracker() != null) {
- this.safeTimeCandidateManager = new
SafeTimeCandidateManager(opts.getSafeTimeTracker());
- }
this.options = opts;
this.raftOptions = opts.getRaftOptions();
this.metrics = new NodeMetrics(opts.isEnableMetrics());
@@ -2228,10 +2218,6 @@ public class NodeImpl implements Node, RaftServerService
{
}
entries.add(logEntry);
}
-
- if (safeTimeCandidateManager != null) {
- safeTimeCandidateManager.addSafeTimeCandidate(index,
request.term(), request.timestamp());
- }
}
final FollowerStableClosure closure = new FollowerStableClosure(
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
index 5ceabf0192..d9855f3d4f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
@@ -25,7 +25,6 @@ import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.storage.LogManager;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
/**
* FSM caller options.
@@ -39,7 +38,6 @@ public class FSMCallerOptions {
private NodeImpl node;
private RaftMessagesFactory raftMessagesFactory;
private StripedDisruptor<FSMCallerImpl.ApplyTask>
fSMCallerExecutorDisruptor;
- private SafeTimeCandidateManager safeTimeCandidateManager;
public StripedDisruptor<FSMCallerImpl.ApplyTask>
getfSMCallerExecutorDisruptor() {
return fSMCallerExecutorDisruptor;
@@ -106,12 +104,4 @@ public class FSMCallerOptions {
public void setRaftMessagesFactory(RaftMessagesFactory
raftMessagesFactory) {
this.raftMessagesFactory = raftMessagesFactory;
}
-
- public SafeTimeCandidateManager getSafeTimeCandidateManager() {
- return safeTimeCandidateManager;
- }
-
- public void setSafeTimeCandidateManager(SafeTimeCandidateManager
safeTimeCandidateManager) {
- this.safeTimeCandidateManager = safeTimeCandidateManager;
- }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index ae5b6725a6..a810fa131e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -19,10 +19,8 @@ package org.apache.ignite.raft.jraft.option;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -239,9 +237,6 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
/** A hybrid clock */
private HybridClock clock = new HybridClockImpl();
- /** A container for safe time. */
- private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
-
/**
* Amount of Disruptors that will handle the RAFT server.
*/
@@ -603,14 +598,6 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
this.clock = clock;
}
- public PendingComparableValuesTracker<HybridTimestamp>
getSafeTimeTracker() {
- return safeTimeTracker;
- }
-
- public void
setSafeTimeTracker(PendingComparableValuesTracker<HybridTimestamp>
safeTimeTracker) {
- this.safeTimeTracker = safeTimeTracker;
- }
-
@Override
public NodeOptions copy() {
final NodeOptions nodeOptions = new NodeOptions();
@@ -648,7 +635,6 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
nodeOptions.setRpcInstallSnapshotTimeout(this.getRpcInstallSnapshotTimeout());
nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
nodeOptions.setClock(this.getClock());
- nodeOptions.setSafeTimeTracker(this.getSafeTimeTracker());
return nodeOptions;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
deleted file mode 100644
index cdfcedf008..0000000000
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.jraft.util;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-
-/**
- * This manager stores safe time candidates coming with appendEntries
requests, and applies them after committing corresponding
- * indexes. Only candidates with correct term can be applied, other ones are
discarded.
- */
-public class SafeTimeCandidateManager {
- /** Safe time clock. */
- private final PendingComparableValuesTracker<HybridTimestamp>
safeTimeTracker;
-
- /** Candidates map. */
- private final Map<Long, Map<Long, HybridTimestamp>> safeTimeCandidates =
new ConcurrentHashMap<>();
-
- public
SafeTimeCandidateManager(PendingComparableValuesTracker<HybridTimestamp>
safeTimeTracker) {
- this.safeTimeTracker = safeTimeTracker;
- }
-
- /**
- * Add safe time candidate.
- *
- * @param index Corresponding log index.
- * @param term Corresponding term.
- * @param safeTime Safe time candidate.
- */
- public void addSafeTimeCandidate(long index, long term, HybridTimestamp
safeTime) {
- safeTimeCandidates.compute(index, (i, candidates) -> {
- if (candidates == null) {
- candidates = new HashMap<>();
- }
-
- candidates.put(term, safeTime);
-
- return candidates;
- });
- }
-
- /**
- * Called on index commit, applies safe time for corresponding index.
- *
- * @param prevIndex Previous applied index.
- * @param index Index.
- * @param term Term.
- */
- public void commitIndex(long prevIndex, long index, long term) {
- long currentIndex = prevIndex + 1;
-
- while (currentIndex <= index) {
- Map<Long, HybridTimestamp> candidates =
safeTimeCandidates.remove(currentIndex);
-
- if (candidates != null) {
- HybridTimestamp safeTime = null;
-
- for (Map.Entry<Long, HybridTimestamp> e :
candidates.entrySet()) {
- if (e.getKey() == term) {
- safeTime = e.getValue();
- }
- }
-
- if (safeTime != null) {
- safeTimeTracker.update(safeTime);
- }
- }
-
- currentIndex++;
- }
- }
-}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index 34b4dde924..d275c7f834 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -46,7 +46,6 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
-import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -80,8 +79,6 @@ public class FSMCallerTest {
private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker =
new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
- private SafeTimeCandidateManager safeTimeCandidateManager = new
SafeTimeCandidateManager(safeTimeTracker);
-
@BeforeEach
public void setup() {
this.fsmCaller = new FSMCallerImpl();
@@ -103,7 +100,6 @@ public class FSMCallerTest {
1024,
() -> new FSMCallerImpl.ApplyTask(),
1));
- opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
assertTrue(this.fsmCaller.init(opts));
}
@@ -141,7 +137,6 @@ public class FSMCallerTest {
@Test
public void testOnCommitted() throws Exception {
- safeTimeCandidateManager.addSafeTimeCandidate(11, 1, new
HybridTimestamp(1, 1));
final LogEntry log = new LogEntry(EntryType.ENTRY_TYPE_DATA);
log.getId().setIndex(11);
log.getId().setTerm(1);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
deleted file mode 100644
index 4f07642fbb..0000000000
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.jraft.util;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/**
- * Test class for {@link SafeTimeCandidateManager}.
- */
-public class SafeTimeCandidatesManagerTest {
- @Test
- public void test() {
- PendingComparableValuesTracker<HybridTimestamp> safeTimeClock = new
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
-
- SafeTimeCandidateManager safeTimeCandidateManager = new
SafeTimeCandidateManager(safeTimeClock);
-
- safeTimeCandidateManager.addSafeTimeCandidate(1, 1, new
HybridTimestamp(1, 1));
-
- safeTimeCandidateManager.commitIndex(0, 1, 1);
- assertEquals(new HybridTimestamp(1, 1), safeTimeClock.current());
-
- safeTimeCandidateManager.addSafeTimeCandidate(2, 1, new
HybridTimestamp(10, 1));
- safeTimeCandidateManager.addSafeTimeCandidate(2, 2, new
HybridTimestamp(100, 1));
- safeTimeCandidateManager.addSafeTimeCandidate(3, 3, new
HybridTimestamp(1000, 1));
-
- safeTimeCandidateManager.commitIndex(1, 2, 2);
- assertEquals(new HybridTimestamp(100, 1), safeTimeClock.current());
-
- safeTimeCandidateManager.commitIndex(2, 3, 3);
- assertEquals(new HybridTimestamp(1000, 1), safeTimeClock.current());
- }
-}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/HybridTimestampMessage.java
similarity index 87%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
rename to
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/HybridTimestampMessage.java
index b69545d1c5..055ab6107c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/HybridTimestampMessage.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.command;
+package org.apache.ignite.internal.replicator.command;
import java.io.Serializable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Interface to represent {@link HybridTimestamp} as a {@link NetworkMessage}.
*/
-@Transferable(TableMessageGroup.Commands.HYBRID_TIMESTAMP)
+@Transferable(ReplicaMessageGroup.HYBRID_TIMESTAMP)
public interface HybridTimestampMessage extends NetworkMessage, Serializable {
long physical();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
similarity index 78%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
copy to
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
index 33c7547567..d054104c06 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
@@ -15,17 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.command;
+package org.apache.ignite.internal.replicator.command;
-import java.util.UUID;
import org.apache.ignite.internal.raft.WriteCommand;
/**
- * Partition transactional command.
+ * Common interface for commands carrying safe time.
*/
-public interface PartitionCommand extends WriteCommand {
+public interface SafeTimePropagatingCommand extends WriteCommand {
/**
- * Returns a transaction id.
+ * Returns safe time.
*/
- UUID txId();
+ HybridTimestampMessage safeTime();
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
index 86549c26e7..ff4c794306 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.replicator.command;
-import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.network.annotations.Transferable;
@@ -25,5 +24,5 @@ import org.apache.ignite.network.annotations.Transferable;
* Write command to synchronize safe time periodically.
*/
@Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_COMMAND)
-public interface SafeTimeSyncCommand extends WriteCommand {
+public interface SafeTimeSyncCommand extends SafeTimePropagatingCommand {
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
index d01089d5f1..15eb985e6c 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.replicator.message;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.network.annotations.MessageGroup;
@@ -51,4 +52,7 @@ public interface ReplicaMessageGroup {
/** Message type for {@link SafeTimeSyncCommand}. */
short SAFE_TIME_SYNC_COMMAND = 40;
+
+ /** Message type for {@link HybridTimestampMessage}. */
+ short HYBRID_TIMESTAMP = 60;
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d0bf0e540a..55863ca962 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
@@ -275,7 +276,8 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
new TestTxStateStorage(),
txManager,
Map::of,
- 0
+ 0,
+ new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0))
);
paths.put(listener, workDir);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 9673817b74..d7bdd1f777 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -264,8 +264,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
cluster.get(i),
raftConfiguration,
workDir.resolve("node" + i),
- clock,
- new PendingComparableValuesTracker<>(clock.now())
+ clock
);
raftSrv.start();
@@ -417,6 +416,9 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(partAssignments);
+ PendingComparableValuesTracker<HybridTimestamp> safeTime =
+ new
PendingComparableValuesTracker<>(clocks.get(assignment).now());
+
CompletableFuture<Void> partitionReadyFuture =
raftServers.get(assignment).startRaftGroupNode(
new RaftNodeId(grpId, configuration.peer(assignment)),
configuration,
@@ -425,15 +427,13 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
new TestTxStateStorage(),
txManagers.get(assignment),
() -> Map.of(pkStorage.get().id(),
pkStorage.get()),
- partId
+ partId,
+ safeTime
),
RaftGroupEventsListener.noopLsnr
).thenAccept(
raftSvc -> {
try {
-
PendingComparableValuesTracker<HybridTimestamp> safeTime =
- new
PendingComparableValuesTracker<>(clocks.get(assignment).now());
-
replicaManagers.get(assignment).startReplica(
new TablePartitionId(tblId, partId),
new PartitionReplicaListener(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e61c216837..d341c5397e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -93,7 +93,6 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.ReplicationGroupOptions;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -756,8 +755,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
RaftGroupOptions groupOptions =
groupOptionsForPartition(
internalTbl.storage(),
internalTbl.txStateStorage(),
- partitionKey(internalTbl,
partId),
- safeTime
+ partitionKey(internalTbl,
partId)
);
Peer serverPeer =
newConfiguration.peer(localMemberName);
@@ -774,7 +772,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
txStatePartitionStorage,
txManager,
table.indexStorageAdapters(partId),
- partId
+ partId,
+ safeTime
),
new
RebalanceRaftGroupEventsListener(
metaStorageMgr,
@@ -907,8 +906,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private RaftGroupOptions groupOptionsForPartition(
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
- PartitionKey partitionKey,
- PendingComparableValuesTracker<HybridTimestamp> safeTime
+ PartitionKey partitionKey
) {
RaftGroupOptions raftGroupOptions;
@@ -928,8 +926,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
incomingSnapshotsExecutor
));
- raftGroupOptions.replicationGroupOptions(new
ReplicationGroupOptions().safeTime(safeTime));
-
return raftGroupOptions;
}
@@ -1841,8 +1837,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
RaftGroupOptions groupOptions =
groupOptionsForPartition(
internalTable.storage(),
internalTable.txStateStorage(),
- partitionKey(internalTable, partId),
- safeTime
+ partitionKey(internalTable, partId)
);
RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -1850,7 +1845,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
txStatePartitionStorage,
txManager,
tbl.indexStorageAdapters(partId),
- partId
+ partId,
+ safeTime
);
RaftGroupEventsListener raftGrpEvtsLsnr = new
RebalanceRaftGroupEventsListener(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 7955b34108..8a2b1990d1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.table.distributed;
import static
org.apache.ignite.internal.table.distributed.TableMessageGroup.GROUP_TYPE;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
-import
org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
@@ -153,9 +152,6 @@ public interface TableMessageGroup {
/** Message type for {@link UpdateCommand}. */
short UPDATE = 43;
- /** Message type for {@link HybridTimestampMessage}. */
- short HYBRID_TIMESTAMP = 60;
-
/** Message type for {@link TablePartitionIdMessage}. */
short TABLE_PARTITION_ID = 61;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
index 77a7f789e2..11b7a24913 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.command;
import java.util.List;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.network.annotations.Transferable;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
index 33c7547567..d28ed66ea4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
@@ -18,12 +18,12 @@
package org.apache.ignite.internal.table.distributed.command;
import java.util.UUID;
-import org.apache.ignite.internal.raft.WriteCommand;
+import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
/**
* Partition transactional command.
*/
-public interface PartitionCommand extends WriteCommand {
+public interface PartitionCommand extends SafeTimePropagatingCommand {
/**
* Returns a transaction id.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
index f336faaea2..2621731e00 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.command;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.network.annotations.Transferable;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index aa7f0c7ab3..3e39959196 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -91,25 +93,31 @@ public class PartitionListener implements RaftGroupListener
{
/** Rows that were inserted, updated or removed. */
private final HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>();
+ /** Safe time tracker. */
+ private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
/**
* The constructor.
*
* @param partitionDataStorage The storage.
* @param txManager Transaction manager.
* @param partitionId Partition ID this listener serves.
+ * @param safeTime Safe time tracker.
*/
public PartitionListener(
PartitionDataStorage partitionDataStorage,
TxStateStorage txStateStorage,
TxManager txManager,
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
- int partitionId
+ int partitionId,
+ PendingComparableValuesTracker<HybridTimestamp> safeTime
) {
this.storage = partitionDataStorage;
this.txStateStorage = txStateStorage;
this.txManager = txManager;
this.indexes = indexes;
this.partitionId = partitionId;
+ this.safeTime = safeTime;
// TODO: IGNITE-18502 Implement a pending update storage
try (PartitionTimestampCursor cursor =
partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
@@ -162,6 +170,14 @@ public class PartitionListener implements
RaftGroupListener {
storage.acquirePartitionSnapshotsReadLock();
+ if (command instanceof SafeTimePropagatingCommand) {
+ SafeTimePropagatingCommand safeTimePropagatingCommand =
(SafeTimePropagatingCommand) command;
+
+ assert safeTimePropagatingCommand.safeTime() != null;
+
+
safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
+ }
+
try {
if (command instanceof UpdateCommand) {
handleUpdateCommand((UpdateCommand) command, commandIndex,
commandTerm);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 607855ae41..8c080fd2e9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
@@ -77,7 +78,6 @@ import
org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
-import
org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
@@ -509,7 +509,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future.
*/
private CompletionStage<Void>
processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
- return
raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().build());
+ return
raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
}
/**
@@ -987,11 +987,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
txTimestampUpdateMap.put(txId, fut);
- HybridTimestamp commitTimestamp = commit ? hybridClock.now() : null;
+ HybridTimestamp currentTimestamp = hybridClock.now();
+ HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
FinishTxCommandBuilder finishTxCmdBldr = msgFactory.finishTxCommand()
.txId(txId)
.commit(commit)
+ .safeTime(hybridTimestamp(currentTimestamp))
.tablePartitionIds(aggregatedGroupIds.stream()
.map(rgId -> tablePartitionId((TablePartitionId)
rgId)).collect(Collectors.toList()));
@@ -1035,6 +1037,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
.txId(request.txId())
.commit(request.commit())
.commitTimestamp(timestampMsg)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build();
return raftClient
@@ -1916,7 +1919,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return {@link HybridTimestampMessage} object obtained from {@link
HybridTimestamp}.
*/
private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
- return tmstmp != null ? msgFactory.hybridTimestampMessage()
+ return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
.physical(tmstmp.getPhysical())
.logical(tmstmp.getLogical())
.build()
@@ -1936,7 +1939,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
UpdateCommandBuilder bldr = msgFactory.updateCommand()
.tablePartitionId(tablePartitionId(tablePartId))
.rowUuid(rowUuid)
- .txId(txId);
+ .txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()));
if (rowBuf != null) {
bldr.rowBuffer(rowBuf);
@@ -1958,6 +1962,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
.tablePartitionId(tablePartitionId(tablePartId))
.rowsToUpdate(rowsToUpdate)
.txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build();
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 7d885903f9..5eab41ea95 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -33,6 +33,8 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -57,6 +59,9 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
/** Message factory to create messages - RAFT commands. */
private TableMessagesFactory msgFactory = new TableMessagesFactory();
+ /** Factory for replica messages. */
+ private ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+
@BeforeAll
static void beforeAll() {
var marshallerFactory = new ReflectionMarshallerFactory();
@@ -216,7 +221,7 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
}
private HybridTimestampMessage hybridTimestampMessage(HybridTimestamp
tmstmp) {
- return msgFactory.hybridTimestampMessage()
+ return replicaMessagesFactory.hybridTimestampMessage()
.logical(tmstmp.getLogical())
.physical(tmstmp.getPhysical())
.build();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index d688b11635..f691e75bbf 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.raft;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -50,6 +51,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -58,6 +60,8 @@ import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.replicator.command.SafeTimeSyncCommandBuilder;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
@@ -93,6 +97,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.BeforeEach;
@@ -158,6 +163,15 @@ public class PartitionCommandListenerTest {
/** Factory for command messages. */
private TableMessagesFactory msgFactory = new TableMessagesFactory();
+ /** Factory for replica messages. */
+ private ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+
+ /** Hybrid clock. */
+ private HybridClock hybridClock;
+
+ /** Safe time tracker. */
+ private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
+
@Captor
private ArgumentCaptor<Throwable> commandClosureResultCaptor;
@@ -174,12 +188,17 @@ public class PartitionCommandListenerTest {
ReplicaService replicaService = mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
+ hybridClock = new HybridClockImpl();
+
+ safeTimeTracker = new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
+
commandListener = new PartitionListener(
partitionDataStorage,
txStateStorage,
- new TxManagerImpl(replicaService, new HeapLockManager(), new
HybridClockImpl()),
+ new TxManagerImpl(replicaService, new HeapLockManager(),
hybridClock),
() -> Map.of(pkStorage.id(), pkStorage),
- PARTITION_ID
+ PARTITION_ID,
+ safeTimeTracker
);
}
@@ -268,7 +287,8 @@ public class PartitionCommandListenerTest {
txStateStorage,
new TxManagerImpl(replicaService, new HeapLockManager(), new
HybridClockImpl()),
() -> Map.of(pkStorage.id(), pkStorage),
- PARTITION_ID
+ PARTITION_ID,
+ new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
);
txStateStorage.lastApplied(3L, 1L);
@@ -306,12 +326,26 @@ public class PartitionCommandListenerTest {
void testSkipWriteCommandByAppliedIndex() {
mvPartitionStorage.lastApplied(10L, 1L);
+ HybridTimestampMessage timestamp = hybridTimestamp(hybridClock.now());
+
+ UpdateCommand updateCommand = mock(UpdateCommand.class);
+ when(updateCommand.safeTime()).thenReturn(timestamp);
+
+ TxCleanupCommand txCleanupCommand = mock(TxCleanupCommand.class);
+ when(txCleanupCommand.safeTime()).thenReturn(timestamp);
+
+ SafeTimeSyncCommand safeTimeSyncCommand =
mock(SafeTimeSyncCommand.class);
+ when(safeTimeSyncCommand.safeTime()).thenReturn(timestamp);
+
+ FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
+ when(finishTxCommand.safeTime()).thenReturn(timestamp);
+
// Checks for MvPartitionStorage.
commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 1, mock(UpdateCommand.class),
commandClosureResultCaptor),
- writeCommandCommandClosure(10, 1, mock(UpdateCommand.class),
commandClosureResultCaptor),
- writeCommandCommandClosure(4, 1, mock(TxCleanupCommand.class),
commandClosureResultCaptor),
- writeCommandCommandClosure(5, 1,
mock(SafeTimeSyncCommand.class), commandClosureResultCaptor)
+ writeCommandCommandClosure(3, 1, updateCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(10, 1, updateCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(4, 1, txCleanupCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(5, 1, safeTimeSyncCommand,
commandClosureResultCaptor)
).iterator());
verify(mvPartitionStorage,
never()).runConsistently(any(WriteClosure.class));
@@ -326,8 +360,8 @@ public class PartitionCommandListenerTest {
commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
commandListener.onWrite(List.of(
- writeCommandCommandClosure(2, 1, mock(FinishTxCommand.class),
commandClosureResultCaptor),
- writeCommandCommandClosure(10, 1, mock(FinishTxCommand.class),
commandClosureResultCaptor)
+ writeCommandCommandClosure(2, 1, finishTxCommand,
commandClosureResultCaptor),
+ writeCommandCommandClosure(10, 1, finishTxCommand,
commandClosureResultCaptor)
).iterator());
verify(txStateStorage, never()).compareAndSet(any(UUID.class),
any(TxState.class), any(TxMeta.class), anyLong(), anyLong());
@@ -338,7 +372,10 @@ public class PartitionCommandListenerTest {
@Test
void updatesLastAppliedForSafeTimeSyncCommands() {
- SafeTimeSyncCommand safeTimeSyncCommand = new
ReplicaMessagesFactory().safeTimeSyncCommand().build();
+ SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory()
+ .safeTimeSyncCommand()
+ .safeTime(hybridTimestamp(hybridClock.now()))
+ .build();
commandListener.onWrite(List.of(
writeCommandCommandClosure(3, 2, safeTimeSyncCommand,
commandClosureResultCaptor)
@@ -349,7 +386,9 @@ public class PartitionCommandListenerTest {
@Test
void locksOnCommandApplication() {
- SafeTimeSyncCommandBuilder safeTimeSyncCommand = new
ReplicaMessagesFactory().safeTimeSyncCommand();
+ SafeTimeSyncCommandBuilder safeTimeSyncCommand = new
ReplicaMessagesFactory()
+ .safeTimeSyncCommand()
+ .safeTime(hybridTimestamp(hybridClock.now()));
commandListener.onWrite(List.of(
writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(),
commandClosureResultCaptor)
@@ -407,6 +446,25 @@ public class PartitionCommandListenerTest {
inOrder.verify(partitionDataStorage).releasePartitionSnapshotsReadLock();
}
+ @Test
+ public void testSafeTime() {
+ HybridClock testClock = new TestHybridClock(() -> 1);
+
+ applySafeTimeCommand(SafeTimeSyncCommand.class, testClock.now());
+ applySafeTimeCommand(SafeTimeSyncCommand.class, testClock.now());
+ }
+
+ private void applySafeTimeCommand(Class<? extends
SafeTimePropagatingCommand> cls, HybridTimestamp timestamp) {
+ HybridTimestampMessage safeTime = hybridTimestamp(timestamp);
+
+ SafeTimePropagatingCommand command = mock(cls);
+ when(command.safeTime()).thenReturn(safeTime);
+
+ CommandClosure<WriteCommand> closure = writeCommandCommandClosure(1,
1, command, commandClosureResultCaptor);
+ commandListener.onWrite(asList(closure).iterator());
+ assertEquals(timestamp, safeTimeTracker.current());
+ }
+
/**
* Crate a command closure.
*
@@ -516,14 +574,13 @@ public class PartitionCommandListenerTest {
.build())
.rowsToUpdate(rows)
.txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
invokeBatchedCommand(msgFactory.txCleanupCommand()
.txId(txId)
.commit(true)
- .commitTimestamp(msgFactory.hybridTimestampMessage()
- .physical(commitTimestamp.getPhysical())
- .logical(commitTimestamp.getLogical())
- .build())
+ .commitTimestamp(hybridTimestamp(commitTimestamp))
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
}
@@ -553,14 +610,13 @@ public class PartitionCommandListenerTest {
.build())
.rowsToUpdate(rows)
.txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
invokeBatchedCommand(msgFactory.txCleanupCommand()
.txId(txId)
.commit(true)
- .commitTimestamp(msgFactory.hybridTimestampMessage()
- .physical(commitTimestamp.getPhysical())
- .logical(commitTimestamp.getLogical())
- .build())
+ .commitTimestamp(hybridTimestamp(commitTimestamp))
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
}
@@ -588,14 +644,13 @@ public class PartitionCommandListenerTest {
.build())
.rowsToUpdate(keyRows)
.txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
invokeBatchedCommand(msgFactory.txCleanupCommand()
.txId(txId)
.commit(true)
- .commitTimestamp(msgFactory.hybridTimestampMessage()
- .physical(commitTimestamp.getPhysical())
- .logical(commitTimestamp.getLogical())
- .build())
+ .commitTimestamp(hybridTimestamp(commitTimestamp))
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
}
@@ -626,6 +681,7 @@ public class PartitionCommandListenerTest {
.rowUuid(rowId.uuid())
.rowBuffer(row.byteBuffer())
.txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
doAnswer(invocation -> {
@@ -640,10 +696,8 @@ public class PartitionCommandListenerTest {
txIds.forEach(txId ->
invokeBatchedCommand(msgFactory.txCleanupCommand()
.txId(txId)
.commit(true)
- .commitTimestamp(msgFactory.hybridTimestampMessage()
- .physical(commitTimestamp.getPhysical())
- .logical(commitTimestamp.getLogical())
- .build())
+ .commitTimestamp(hybridTimestamp(commitTimestamp))
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build()));
}
@@ -671,6 +725,7 @@ public class PartitionCommandListenerTest {
.partitionId(PARTITION_ID).build())
.rowUuid(rowId.uuid())
.txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
doAnswer(invocation -> {
@@ -685,10 +740,8 @@ public class PartitionCommandListenerTest {
txIds.forEach(txId ->
invokeBatchedCommand(msgFactory.txCleanupCommand()
.txId(txId)
.commit(true)
- .commitTimestamp(msgFactory.hybridTimestampMessage()
- .physical(commitTimestamp.getPhysical())
- .logical(commitTimestamp.getLogical())
- .build())
+ .commitTimestamp(hybridTimestamp(commitTimestamp))
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build()));
}
@@ -747,6 +800,7 @@ public class PartitionCommandListenerTest {
.rowUuid(Timestamp.nextVersion().toUuid())
.rowBuffer(row.byteBuffer())
.txId(txId)
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build());
doAnswer(invocation -> {
@@ -758,16 +812,12 @@ public class PartitionCommandListenerTest {
HybridTimestamp now = CLOCK.now();
-
-
txIds.forEach(txId -> invokeBatchedCommand(
msgFactory.txCleanupCommand()
.txId(txId)
.commit(true)
- .commitTimestamp(msgFactory.hybridTimestampMessage()
- .physical(now.getPhysical())
- .logical(now.getLogical())
- .build())
+ .commitTimestamp(hybridTimestamp(now))
+ .safeTime(hybridTimestamp(hybridClock.now()))
.build()));
}
@@ -829,4 +879,18 @@ public class PartitionCommandListenerTest {
return null;
}
+
+ /**
+ * Method to convert from {@link HybridTimestamp} object to
NetworkMessage-based {@link HybridTimestampMessage} object.
+ *
+ * @param tmstmp {@link HybridTimestamp} object to convert to {@link
HybridTimestampMessage}.
+ * @return {@link HybridTimestampMessage} object obtained from {@link
HybridTimestamp}.
+ */
+ private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
+ return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
+ .physical(tmstmp.getPhysical())
+ .logical(tmstmp.getLogical())
+ .build()
+ : null;
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 64738defb3..407aac6d4a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -33,6 +33,7 @@ import javax.naming.OperationNotSupportedException;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.WriteCommand;
@@ -237,6 +238,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
IndexLocker pkLocker = new HashIndexLocker(indexId, true,
this.txManager.lockManager(), row2tuple);
HybridClock clock = new HybridClockImpl();
+ PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(clock.now());
replicaListener = new PartitionReplicaListener(
mvPartStorage,
@@ -250,7 +252,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
pkStorage,
() -> Map.of(),
clock,
- new PendingComparableValuesTracker<>(clock.now()),
+ safeTime,
txStateStorage().getOrCreateTxStateStorage(0),
placementDriver,
peer -> true
@@ -261,7 +263,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
txStateStorage().getOrCreateTxStateStorage(0),
this.txManager,
() -> Map.of(pkStorage.get().id(), pkStorage.get()),
- 0
+ 0,
+ safeTime
);
}