This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-19028 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 42a805242eab98ca567451919e182a2634d30089 Author: Semyon Danilov <[email protected]> AuthorDate: Thu Mar 30 14:47:18 2023 +0400 .. --- .../ignite/internal/hlc/HybridClockImpl.java | 9 ++ .../ignite/internal/hlc/HybridTimestamp.java | 7 +- ...butionZoneManagerLogicalTopologyEventsTest.java | 4 +- .../DistributionZoneManagerScaleUpTest.java | 3 +- .../DistributionZoneManagerWatchListenerTest.java | 3 +- .../impl/ItMetaStorageManagerImplTest.java | 7 +- .../impl/ItMetaStorageMultipleNodesTest.java | 5 +- .../impl/ItMetaStorageServicePersistenceTest.java | 6 +- .../metastorage/impl/ItMetaStorageServiceTest.java | 16 ++- .../metastorage/impl/ItMetaStorageWatchTest.java | 5 +- .../server/raft/ItMetaStorageRaftGroupTest.java | 14 +- .../metastorage/command/GetAndPutAllCommand.java | 27 +--- .../metastorage/command/GetAndPutCommand.java | 3 +- .../command/GetAndRemoveAllCommand.java | 22 +-- .../metastorage/command/GetAndRemoveCommand.java | 3 +- ...oveCommand.java => HybridTimestampMessage.java} | 22 +-- .../metastorage/command/InvokeCommand.java | 3 +- ...veCommand.java => MetaStorageWriteCommand.java} | 20 +-- .../command/MetastorageCommandsMessageGroup.java | 4 + .../metastorage/command/MultiInvokeCommand.java | 3 +- .../metastorage/command/PutAllCommand.java | 37 +---- .../internal/metastorage/command/PutCommand.java | 3 +- .../metastorage/command/RemoveAllCommand.java | 22 +-- .../metastorage/command/RemoveCommand.java | 3 +- ...ultiInvokeCommand.java => SyncTimeCommand.java} | 12 +- .../metastorage/impl/MetaStorageManagerImpl.java | 22 ++- .../impl/MetaStorageRaftGroupEventsListener.java | 31 ++++- .../metastorage/impl/MetaStorageServiceImpl.java | 153 +++++++++++++++++++-- .../server/raft/MetaStorageLearnerListener.java | 12 +- .../server/raft/MetaStorageListener.java | 12 +- .../server/raft/MetaStorageWriteHandler.java | 29 +++- .../time/ClusterTime.java} | 27 ++-- .../metastorage/server/time/ClusterTimeImpl.java | 148 ++++++++++++++++++++ .../impl/MetaStorageRangeCursorTest.java | 7 +- .../impl/StandaloneMetaStorageManager.java | 4 +- .../processor/messages/MessageImplGenerator.java | 23 +++- .../ignite/network/annotations/WithSetter.java} | 27 ++-- .../PlacementDriverManagerTest.java | 3 +- .../internal/raft/service/RaftGroupListener.java | 12 ++ .../jraft/rpc/impl/ActionRequestProcessor.java | 9 +- .../ItDistributedConfigurationPropertiesTest.java | 5 +- .../ItDistributedConfigurationStorageTest.java | 5 +- .../storage/ItRebalanceDistributedTest.java | 4 +- .../runner/app/ItIgniteNodeRestartTest.java | 3 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- .../TableManagerDistributionZonesTest.java | 3 +- .../utils/RebalanceUtilUpdateAssignmentsTest.java | 3 +- 47 files changed, 577 insertions(+), 231 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java index 91f02c7fec..7965c3e0e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java @@ -49,6 +49,15 @@ public class HybridClockImpl implements HybridClock { this.latestTime = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0); } + /** + * The constructor which initializes the latest time to some initial time. + * + * @param initialTime Initial time. + */ + public HybridClockImpl(HybridTimestamp initialTime) { + this.latestTime = initialTime; + } + /** {@inheritDoc} */ @Override public HybridTimestamp now() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java index e06a975ae4..ed2301c35e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.hlc; import java.io.Serializable; import org.apache.ignite.internal.tostring.S; -import org.jetbrains.annotations.Nullable; /** * A hybrid timestamp that combines physical clock and logical clock. @@ -68,10 +67,8 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria * @param times Times for comparing. * @return The highest hybrid timestamp. */ - public static @Nullable HybridTimestamp max(HybridTimestamp... times) { - if (times.length == 0) { - return null; - } + public static HybridTimestamp max(HybridTimestamp... times) { + assert times.length > 0; HybridTimestamp maxTime = times[0]; diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java index 82185aa6bd..d56068dbb7 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java @@ -59,6 +59,8 @@ import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; @@ -149,7 +151,7 @@ public class DistributionZoneManagerLogicalTopologyEventsTest { keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); - MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage); + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class)); RaftGroupService metaStorageService = mock(RaftGroupService.class); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java index 0bd9fa3878..aba45505f0 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.impl.EntryImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.schema.configuration.TableChange; import org.apache.ignite.internal.schema.configuration.TableConfiguration; @@ -190,7 +191,7 @@ public class DistributionZoneManagerScaleUpTest { keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); - MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage); + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTime.class)); metaStorageService = mock(RaftGroupService.class); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java index 84f1a807e6..5ebc2f466e 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.metastorage.WatchListener; import org.apache.ignite.internal.metastorage.impl.EntryImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.schema.configuration.TableChange; import org.apache.ignite.internal.schema.configuration.TableConfiguration; @@ -164,7 +165,7 @@ public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); - MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage); + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTime.class)); metaStorageService = mock(RaftGroupService.class); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index c37073a221..7eeaf10ed3 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.Entry; @@ -107,7 +108,8 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { cmgManager, mock(LogicalTopologyService.class), raftManager, - storage + storage, + mock(HybridClock.class) ); vaultManager.start(); @@ -282,7 +284,8 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { cmgManager, mock(LogicalTopologyService.class), raftManager, - storage + storage, + mock(HybridClock.class) ); metaStorageManager.stop(); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java index 9a90c3d8cf..4c01301f53 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java @@ -30,6 +30,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -48,6 +49,7 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer import org.apache.ignite.internal.configuration.SecurityConfiguration; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.Entry; @@ -142,7 +144,8 @@ public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest { cmgManager, new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, - new SimpleInMemoryKeyValueStorage(name()) + new SimpleInMemoryKeyValueStorage(name()), + mock(HybridClock.class) ); } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java index a629716cf6..a483f3979a 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.metastorage.impl; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -29,6 +30,7 @@ import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest; @@ -69,7 +71,7 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception { ClusterNode followerNode = getNode(server); - metaStorage = new MetaStorageServiceImpl(service, new IgniteSpinBusyLock(), followerNode); + metaStorage = new MetaStorageServiceImpl(service, new IgniteSpinBusyLock(), followerNode, mock(ClusterTime.class)); // Put some data in the metastorage metaStorage.put(FIRST_KEY, FIRST_VALUE).get(); @@ -145,7 +147,7 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps return s; }); - return new MetaStorageListener(storage); + return new MetaStorageListener(storage, mock(ClusterTime.class)); } /** {@inheritDoc} */ diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java index 4b27d01123..8483aa744b 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java @@ -90,6 +90,8 @@ import org.apache.ignite.internal.metastorage.server.ValueCondition.Type; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageLearnerListener; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; @@ -177,6 +179,8 @@ public class ItMetaStorageServiceTest { private final KeyValueStorage mockStorage; + private final ClusterTime clusterTime; + private RaftGroupService metaStorageRaftService; private MetaStorageService metaStorageService; @@ -184,13 +188,17 @@ public class ItMetaStorageServiceTest { Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) { this.clusterService = clusterService; + HybridClockImpl clock = new HybridClockImpl(); + this.raftManager = new Loza( clusterService, raftConfiguration, dataPath.resolve(name()), - new HybridClockImpl() + clock ); + this.clusterTime = new ClusterTimeImpl(new IgniteSpinBusyLock(), clock); + this.mockStorage = mock(KeyValueStorage.class); } @@ -206,7 +214,7 @@ public class ItMetaStorageServiceTest { ClusterNode node = clusterService.topologyService().localMember(); - metaStorageService = new MetaStorageServiceImpl(metaStorageRaftService, new IgniteSpinBusyLock(), node); + metaStorageService = new MetaStorageServiceImpl(metaStorageRaftService, new IgniteSpinBusyLock(), node, clusterTime); } String name() { @@ -222,7 +230,9 @@ public class ItMetaStorageServiceTest { assert peer != null; - RaftGroupListener listener = isLearner ? new MetaStorageLearnerListener(mockStorage) : new MetaStorageListener(mockStorage); + RaftGroupListener listener = isLearner + ? new MetaStorageLearnerListener(mockStorage, clusterTime) + : new MetaStorageListener(mockStorage, clusterTime); var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index e5e758f862..b1bd82b840 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -46,6 +47,7 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer import org.apache.ignite.internal.configuration.SecurityConfiguration; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; @@ -141,7 +143,8 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { cmgManager, new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, - new RocksDbKeyValueStorage(name(), basePath.resolve("storage")) + new RocksDbKeyValueStorage(name(), basePath.resolve("storage")), + mock(HybridClock.class) ); components.add(metaStorageManager); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java index f83f2a09ed..e791c93adf 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.ArrayList; @@ -51,6 +52,7 @@ import org.apache.ignite.internal.metastorage.impl.EntryImpl; import org.apache.ignite.internal.metastorage.impl.MetaStorageService; import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; import org.apache.ignite.internal.raft.RaftNodeId; @@ -249,7 +251,8 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { .value; MetaStorageService metaStorageSvc = new MetaStorageServiceImpl( - raftGroupServiceOfLiveServer, new IgniteSpinBusyLock(), liveServer.clusterService().topologyService().localMember()); + raftGroupServiceOfLiveServer, new IgniteSpinBusyLock(), liveServer.clusterService().topologyService().localMember(), + mock(ClusterTime.class)); var resultFuture = new CompletableFuture<Void>(); @@ -367,15 +370,18 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { var raftNodeId1 = new RaftNodeId(MetastorageGroupId.INSTANCE, membersConfiguration.peer(localMemberName(cluster.get(0)))); - metaStorageRaftSrv1.startRaftNode(raftNodeId1, membersConfiguration, new MetaStorageListener(mockStorage), defaults()); + metaStorageRaftSrv1.startRaftNode(raftNodeId1, membersConfiguration, + new MetaStorageListener(mockStorage, mock(ClusterTime.class)), defaults()); var raftNodeId2 = new RaftNodeId(MetastorageGroupId.INSTANCE, membersConfiguration.peer(localMemberName(cluster.get(1)))); - metaStorageRaftSrv2.startRaftNode(raftNodeId2, membersConfiguration, new MetaStorageListener(mockStorage), defaults()); + metaStorageRaftSrv2.startRaftNode(raftNodeId2, membersConfiguration, + new MetaStorageListener(mockStorage, mock(ClusterTime.class)), defaults()); var raftNodeId3 = new RaftNodeId(MetastorageGroupId.INSTANCE, membersConfiguration.peer(localMemberName(cluster.get(2)))); - metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration, new MetaStorageListener(mockStorage), defaults()); + metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration, + new MetaStorageListener(mockStorage, mock(ClusterTime.class)), defaults()); metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start( MetastorageGroupId.INSTANCE, diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java index ea69cc0625..39cdf02765 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutAllCommand.java @@ -17,11 +17,7 @@ package org.apache.ignite.internal.metastorage.command; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.lang.ByteArray; import org.apache.ignite.network.annotations.Transferable; /** @@ -29,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable; * previous entries for given keys. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT_ALL) -public interface GetAndPutAllCommand extends WriteCommand { +public interface GetAndPutAllCommand extends MetaStorageWriteCommand { /** * Returns keys. */ @@ -39,25 +35,4 @@ public interface GetAndPutAllCommand extends WriteCommand { * Returns values. */ List<byte[]> values(); - - /** - * Static constructor. - * - * @param commandsFactory Commands factory. - * @param map Values. - */ - static GetAndPutAllCommand getAndPutAllCommand(MetaStorageCommandsFactory commandsFactory, Map<ByteArray, byte[]> map) { - int size = map.size(); - - List<byte[]> keys = new ArrayList<>(size); - List<byte[]> values = new ArrayList<>(size); - - for (Map.Entry<ByteArray, byte[]> e : map.entrySet()) { - keys.add(e.getKey().bytes()); - - values.add(e.getValue()); - } - - return commandsFactory.getAndPutAllCommand().keys(keys).values(values).build(); - } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java index 7042e505fc..fc2210610c 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndPutCommand.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.metastorage.command; -import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** @@ -25,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable; * a previous entry for the given key. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT) -public interface GetAndPutCommand extends WriteCommand { +public interface GetAndPutCommand extends MetaStorageWriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java index 5943405c73..98aadde70b 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveAllCommand.java @@ -17,36 +17,16 @@ package org.apache.ignite.internal.metastorage.command; -import java.util.ArrayList; import java.util.List; -import java.util.Set; -import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.lang.ByteArray; import org.apache.ignite.network.annotations.Transferable; /** * Get and remove all command for MetaStorageCommandListener that removes entries for given keys and retrieves previous entries. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE_ALL) -public interface GetAndRemoveAllCommand extends WriteCommand { +public interface GetAndRemoveAllCommand extends MetaStorageWriteCommand { /** * Returns the keys collection. Couldn't be {@code null}. */ List<byte[]> keys(); - - /** - * Static constructor. - * - * @param commandsFactory Commands factory. - * @param keys The keys collection. Couldn't be {@code null}. - */ - static GetAndRemoveAllCommand getAndRemoveAllCommand(MetaStorageCommandsFactory commandsFactory, Set<ByteArray> keys) { - List<byte[]> keysList = new ArrayList<>(keys.size()); - - for (ByteArray key : keys) { - keysList.add(key.bytes()); - } - - return commandsFactory.getAndRemoveAllCommand().keys(keysList).build(); - } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java index a778695978..42c83df1e3 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetAndRemoveCommand.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.metastorage.command; -import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** @@ -25,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable; * given key. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE) -public interface GetAndRemoveCommand extends WriteCommand { +public interface GetAndRemoveCommand extends MetaStorageWriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/HybridTimestampMessage.java similarity index 68% copy from modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java copy to modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/HybridTimestampMessage.java index 5b2bce2e2b..93e6d481b7 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/HybridTimestampMessage.java @@ -17,16 +17,18 @@ package org.apache.ignite.internal.metastorage.command; -import org.apache.ignite.internal.raft.WriteCommand; +import java.io.Serializable; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; -/** - * Remove command for MetaStorageCommandListener that removes an entry for the given key. - */ -@Transferable(MetastorageCommandsMessageGroup.REMOVE) -public interface RemoveCommand extends WriteCommand { - /** - * Returns the key. Couldn't be {@code null}. - */ - byte[] key(); +@Transferable(MetastorageCommandsMessageGroup.HYBRID_TS) +public interface HybridTimestampMessage extends NetworkMessage, Serializable { + long physical(); + + int logical(); + + default HybridTimestamp asHybridTimestamp() { + return new HybridTimestamp(physical(), logical()); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java index be00fb59d6..b50f58de6a 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java @@ -20,14 +20,13 @@ package org.apache.ignite.internal.metastorage.command; import java.util.Collection; import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.Operation; -import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** * Represents invoke command for meta storage. */ @Transferable(MetastorageCommandsMessageGroup.INVOKE) -public interface InvokeCommand extends WriteCommand { +public interface InvokeCommand extends MetaStorageWriteCommand { /** * Returns condition. * diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java similarity index 72% copy from modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java copy to modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java index 5b2bce2e2b..9015503f88 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java @@ -18,15 +18,15 @@ package org.apache.ignite.internal.metastorage.command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.annotations.Transferable; +import org.apache.ignite.network.annotations.WithSetter; -/** - * Remove command for MetaStorageCommandListener that removes an entry for the given key. - */ -@Transferable(MetastorageCommandsMessageGroup.REMOVE) -public interface RemoveCommand extends WriteCommand { - /** - * Returns the key. Couldn't be {@code null}. - */ - byte[] key(); +public interface MetaStorageWriteCommand extends WriteCommand { + HybridTimestampMessage initiatorTime(); + + @WithSetter + HybridTimestampMessage safeTime(); + + default void safeTime(HybridTimestampMessage safeTime) { + // No-op. + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java index 8856166a9d..ba8da6d766 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java @@ -84,4 +84,8 @@ public interface MetastorageCommandsMessageGroup { /** Message type for {@link CloseAllCursorsCommand}. */ short CLOSE_ALL_CURSORS = 64; + + short HYBRID_TS = 65; + + short SYNC_TIME = 66; } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java index 843f418641..ae90fb8f9e 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java @@ -18,14 +18,13 @@ package org.apache.ignite.internal.metastorage.command; import org.apache.ignite.internal.metastorage.dsl.Iif; -import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** * Represents invoke command with nested conditions and execution branches. */ @Transferable(MetastorageCommandsMessageGroup.MULTI_INVOKE) -public interface MultiInvokeCommand extends WriteCommand { +public interface MultiInvokeCommand extends MetaStorageWriteCommand { /** * Returns if statement. * diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java index e97e7a6906..402724daa2 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutAllCommand.java @@ -17,18 +17,14 @@ package org.apache.ignite.internal.metastorage.command; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.lang.ByteArray; import org.apache.ignite.network.annotations.Transferable; /** * Put all command for MetaStorageCommandListener that inserts or updates entries with given keys and given values. */ @Transferable(MetastorageCommandsMessageGroup.PUT_ALL) -public interface PutAllCommand extends WriteCommand { +public interface PutAllCommand extends MetaStorageWriteCommand { /** * Returns entries keys. */ @@ -38,35 +34,4 @@ public interface PutAllCommand extends WriteCommand { * Returns entries values. */ List<byte[]> values(); - - /** - * Static constructor. - * - * @param commandsFactory Commands factory. - * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty. - */ - static PutAllCommand putAllCommand(MetaStorageCommandsFactory commandsFactory, Map<ByteArray, byte[]> vals) { - assert !vals.isEmpty(); - - int size = vals.size(); - - List<byte[]> keys = new ArrayList<>(size); - - List<byte[]> values = new ArrayList<>(size); - - for (Map.Entry<ByteArray, byte[]> e : vals.entrySet()) { - byte[] key = e.getKey().bytes(); - - byte[] val = e.getValue(); - - assert key != null : "Key could not be null."; - assert val != null : "Value could not be null."; - - keys.add(key); - - values.add(val); - } - - return commandsFactory.putAllCommand().keys(keys).values(values).build(); - } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java index eb176875f1..3facff93fa 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.metastorage.command; -import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** @@ -25,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable; * previous entry for the given key. */ @Transferable(MetastorageCommandsMessageGroup.PUT) -public interface PutCommand extends WriteCommand { +public interface PutCommand extends MetaStorageWriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java index 291d185500..c4824c1602 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveAllCommand.java @@ -17,36 +17,16 @@ package org.apache.ignite.internal.metastorage.command; -import java.util.ArrayList; import java.util.List; -import java.util.Set; -import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.lang.ByteArray; import org.apache.ignite.network.annotations.Transferable; /** * Remove all command for MetaStorageCommandListener that removes entries for given keys. */ @Transferable(MetastorageCommandsMessageGroup.REMOVE_ALL) -public interface RemoveAllCommand extends WriteCommand { +public interface RemoveAllCommand extends MetaStorageWriteCommand { /** * Returns the keys list. Couldn't be {@code null}. */ List<byte[]> keys(); - - /** - * Static constructor. - * - * @param commandsFactory Commands factory. - * @param keys The keys collection. Couldn't be {@code null}. - */ - static RemoveAllCommand removeAllCommand(MetaStorageCommandsFactory commandsFactory, Set<ByteArray> keys) { - List<byte[]> list = new ArrayList<>(keys.size()); - - for (ByteArray key : keys) { - list.add(key.bytes()); - } - - return commandsFactory.removeAllCommand().keys(list).build(); - } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java index 5b2bce2e2b..d78686b5cd 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java @@ -17,14 +17,13 @@ package org.apache.ignite.internal.metastorage.command; -import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** * Remove command for MetaStorageCommandListener that removes an entry for the given key. */ @Transferable(MetastorageCommandsMessageGroup.REMOVE) -public interface RemoveCommand extends WriteCommand { +public interface RemoveCommand extends MetaStorageWriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java similarity index 79% copy from modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java copy to modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java index 843f418641..ccf2506a8e 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MultiInvokeCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java @@ -17,19 +17,13 @@ package org.apache.ignite.internal.metastorage.command; -import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; /** * Represents invoke command with nested conditions and execution branches. */ -@Transferable(MetastorageCommandsMessageGroup.MULTI_INVOKE) -public interface MultiInvokeCommand extends WriteCommand { - /** - * Returns if statement. - * - * @return if statement. - */ - Iif iif(); +@Transferable(MetastorageCommandsMessageGroup.SYNC_TIME) +public interface SyncTimeCommand extends WriteCommand { + HybridTimestampMessage safeTime(); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index d1fe91c36e..d4d2889e68 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; @@ -45,6 +46,7 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageLearnerListener; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; @@ -103,6 +105,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager { /** Prevents double stopping of the component. */ private final AtomicBoolean isStopped = new AtomicBoolean(); + private final ClusterTimeImpl clusterTime; + /** * The constructor. * @@ -119,7 +123,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager { ClusterManagementGroupManager cmgMgr, LogicalTopologyService logicalTopologyService, RaftManager raftMgr, - KeyValueStorage storage + KeyValueStorage storage, + HybridClock clock ) { this.vaultMgr = vaultMgr; this.clusterService = clusterService; @@ -127,6 +132,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { this.cmgMgr = cmgMgr; this.logicalTopologyService = logicalTopologyService; this.storage = storage; + this.clusterTime = new ClusterTimeImpl(busyLock, clock); } private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) { @@ -148,8 +154,14 @@ public class MetaStorageManagerImpl implements MetaStorageManager { raftServiceFuture = raftMgr.startRaftGroupNode( new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), configuration, - new MetaStorageListener(storage), - new MetaStorageRaftGroupEventsListener(busyLock, clusterService, logicalTopologyService, metaStorageSvcFut) + new MetaStorageListener(storage, clusterTime), + new MetaStorageRaftGroupEventsListener( + busyLock, + clusterService, + logicalTopologyService, + metaStorageSvcFut, + clusterTime + ) ); } else { PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName)); @@ -161,7 +173,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { raftServiceFuture = raftMgr.startRaftGroupNode( new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), configuration, - new MetaStorageLearnerListener(storage), + new MetaStorageLearnerListener(storage, clusterTime), RaftGroupEventsListener.noopLsnr ); } @@ -169,7 +181,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { return CompletableFuture.failedFuture(e); } - return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, busyLock, thisNode)); + return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, busyLock, thisNode, clusterTime)); } @Override diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java index 9fe455fff4..9b09c02805 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java @@ -31,6 +31,8 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; @@ -65,16 +67,20 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen private final Object serializationFutureMux = new Object(); + private final ClusterTimeImpl clusterTime; + MetaStorageRaftGroupEventsListener( IgniteSpinBusyLock busyLock, ClusterService clusterService, LogicalTopologyService logicalTopologyService, - CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut + CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut, + ClusterTimeImpl clusterTime ) { this.busyLock = busyLock; this.nodeName = clusterService.localConfiguration().getName(); this.logicalTopologyService = logicalTopologyService; this.metaStorageSvcFut = metaStorageSvcFut; + this.clusterTime = clusterTime; } @Override @@ -84,6 +90,18 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen // Update learner configuration (in case we missed some topology updates) and initialize the serialization future. serializationFuture = executeIfLeaderImpl(this::resetLearners); + + serializationFuture = executeWithStatus((service, term1, isLeader) -> { + if (isLeader) { + this.resetLearners(service, term1); + + clusterTime.startLeaderTimer(service); + } else { + clusterTime.stopLeaderTimer(); + } + + return null; + }); } } @@ -127,6 +145,10 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen CompletableFuture<Void> apply(MetaStorageServiceImpl service, long term); } + private interface OnStatusAction { + CompletableFuture<Void> apply(MetaStorageServiceImpl service, long term, boolean isLeader); + } + /** * Executes the given action if the current node is the Meta Storage leader. */ @@ -155,11 +177,16 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen } private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action) { + return executeWithStatus((service, term, isLeader) -> action.apply(service, term)); + } + + private CompletableFuture<Void> executeWithStatus(OnStatusAction action) { return metaStorageSvcFut.thenCompose(service -> service.raftGroupService().refreshAndGetLeaderWithTerm() .thenCompose(leaderWithTerm -> { String leaderName = leaderWithTerm.leader().consistentId(); - return leaderName.equals(nodeName) ? action.apply(service, leaderWithTerm.term()) : completedFuture(null); + boolean isLeader = leaderName.equals(nodeName); + return action.apply(service, leaderWithTerm.term(), isLeader); })); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index eee5162582..824f1d31c6 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -18,11 +18,8 @@ package org.apache.ignite.internal.metastorage.impl; import static org.apache.ignite.internal.metastorage.command.GetAllCommand.getAllCommand; -import static org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand.getAndPutAllCommand; -import static org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand.getAndRemoveAllCommand; -import static org.apache.ignite.internal.metastorage.command.PutAllCommand.putAllCommand; -import static org.apache.ignite.internal.metastorage.command.RemoveAllCommand.removeAllCommand; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -31,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.Flow.Publisher; import java.util.function.Function; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; @@ -40,6 +38,7 @@ import org.apache.ignite.internal.metastorage.command.GetAndPutCommand; import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand; import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand; import org.apache.ignite.internal.metastorage.command.GetCommand; +import org.apache.ignite.internal.metastorage.command.HybridTimestampMessage; import org.apache.ignite.internal.metastorage.command.InvokeCommand; import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; @@ -47,10 +46,12 @@ import org.apache.ignite.internal.metastorage.command.PutAllCommand; import org.apache.ignite.internal.metastorage.command.PutCommand; import org.apache.ignite.internal.metastorage.command.RemoveAllCommand; import org.apache.ignite.internal.metastorage.command.RemoveCommand; +import org.apache.ignite.internal.metastorage.command.SyncTimeCommand; import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.dsl.Operation; import org.apache.ignite.internal.metastorage.dsl.StatementResult; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.thread.NamedThreadFactory; @@ -72,13 +73,16 @@ public class MetaStorageServiceImpl implements MetaStorageService { /** Local node. */ private final ClusterNode localNode; + private final ClusterTime clusterTime; + /** * Constructor. * * @param metaStorageRaftGrpSvc Meta storage raft group service. * @param localNode Local node. */ - public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, IgniteSpinBusyLock busyLock, ClusterNode localNode) { + public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, IgniteSpinBusyLock busyLock, ClusterNode localNode, + ClusterTime clusterTime) { this.context = new MetaStorageServiceContext( metaStorageRaftGrpSvc, new MetaStorageCommandsFactory(), @@ -88,6 +92,7 @@ public class MetaStorageServiceImpl implements MetaStorageService { ); this.localNode = localNode; + this.clusterTime = clusterTime; } RaftGroupService raftGroupService() { @@ -126,28 +131,36 @@ public class MetaStorageServiceImpl implements MetaStorageService { @Override public CompletableFuture<Void> put(ByteArray key, byte[] value) { - PutCommand putCommand = context.commandsFactory().putCommand().key(key.bytes()).value(value).build(); + PutCommand putCommand = context.commandsFactory().putCommand() + .key(key.bytes()) + .value(value) + .initiatorTime(hybridTimestamp(clusterTime.now())) + .build(); return context.raftService().run(putCommand); } @Override public CompletableFuture<Entry> getAndPut(ByteArray key, byte[] value) { - GetAndPutCommand getAndPutCommand = context.commandsFactory().getAndPutCommand().key(key.bytes()).value(value).build(); + GetAndPutCommand getAndPutCommand = context.commandsFactory().getAndPutCommand() + .key(key.bytes()) + .value(value) + .initiatorTime(hybridTimestamp(clusterTime.now())) + .build(); return context.raftService().run(getAndPutCommand); } @Override public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) { - PutAllCommand putAllCommand = putAllCommand(context.commandsFactory(), vals); + PutAllCommand putAllCommand = putAllCommand(context.commandsFactory(), vals, clusterTime.now()); return context.raftService().run(putAllCommand); } @Override public CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(Map<ByteArray, byte[]> vals) { - GetAndPutAllCommand getAndPutAllCommand = getAndPutAllCommand(context.commandsFactory(), vals); + GetAndPutAllCommand getAndPutAllCommand = getAndPutAllCommand(context.commandsFactory(), vals, clusterTime.now()); return context.raftService().<List<Entry>>run(getAndPutAllCommand) .thenApply(MetaStorageServiceImpl::multipleEntryResult); @@ -155,28 +168,30 @@ public class MetaStorageServiceImpl implements MetaStorageService { @Override public CompletableFuture<Void> remove(ByteArray key) { - RemoveCommand removeCommand = context.commandsFactory().removeCommand().key(key.bytes()).build(); + RemoveCommand removeCommand = context.commandsFactory().removeCommand().key(key.bytes()) + .initiatorTime(hybridTimestamp(clusterTime.now())).build(); return context.raftService().run(removeCommand); } @Override public CompletableFuture<Entry> getAndRemove(ByteArray key) { - GetAndRemoveCommand getAndRemoveCommand = context.commandsFactory().getAndRemoveCommand().key(key.bytes()).build(); + GetAndRemoveCommand getAndRemoveCommand = context.commandsFactory().getAndRemoveCommand().key(key.bytes()) + .initiatorTime(hybridTimestamp(clusterTime.now())).build(); return context.raftService().run(getAndRemoveCommand); } @Override public CompletableFuture<Void> removeAll(Set<ByteArray> keys) { - RemoveAllCommand removeAllCommand = removeAllCommand(context.commandsFactory(), keys); + RemoveAllCommand removeAllCommand = removeAllCommand(context.commandsFactory(), keys, clusterTime.now()); return context.raftService().run(removeAllCommand); } @Override public CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(Set<ByteArray> keys) { - GetAndRemoveAllCommand getAndRemoveAllCommand = getAndRemoveAllCommand(context.commandsFactory(), keys); + GetAndRemoveAllCommand getAndRemoveAllCommand = getAndRemoveAllCommand(context.commandsFactory(), keys, clusterTime.now()); return context.raftService().<List<Entry>>run(getAndRemoveAllCommand) .thenApply(MetaStorageServiceImpl::multipleEntryResult); @@ -197,6 +212,7 @@ public class MetaStorageServiceImpl implements MetaStorageService { .condition(condition) .success(success) .failure(failure) + .initiatorTime(hybridTimestamp(clusterTime.now())) .build(); return context.raftService().run(invokeCommand); @@ -206,6 +222,7 @@ public class MetaStorageServiceImpl implements MetaStorageService { public CompletableFuture<StatementResult> invoke(Iif iif) { MultiInvokeCommand multiInvokeCommand = context.commandsFactory().multiInvokeCommand() .iif(iif) + .initiatorTime(hybridTimestamp(clusterTime.now())) .build(); return context.raftService().run(multiInvokeCommand); @@ -258,6 +275,14 @@ public class MetaStorageServiceImpl implements MetaStorageService { return new CursorPublisher(context, createPrefixCommand); } + public CompletableFuture<Void> syncTime(HybridTimestamp hybridTimestamp) { + SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand() + .safeTime(hybridTimestamp(clusterTime.now())) + .build(); + + return context.raftService().run(syncTimeCommand); + } + // TODO: IGNITE-14734 Implement. @Override public CompletableFuture<Void> compact() { @@ -283,4 +308,106 @@ public class MetaStorageServiceImpl implements MetaStorageService { return res; } + + /** + * Creates put all command. + * + * @param commandsFactory Commands factory. + * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty. + * @param ts Local time. + */ + private PutAllCommand putAllCommand(MetaStorageCommandsFactory commandsFactory, Map<ByteArray, byte[]> vals, HybridTimestamp ts) { + assert !vals.isEmpty(); + + int size = vals.size(); + + List<byte[]> keys = new ArrayList<>(size); + + List<byte[]> values = new ArrayList<>(size); + + for (Map.Entry<ByteArray, byte[]> e : vals.entrySet()) { + byte[] key = e.getKey().bytes(); + + byte[] val = e.getValue(); + + assert key != null : "Key could not be null."; + assert val != null : "Value could not be null."; + + keys.add(key); + + values.add(val); + } + + return commandsFactory.putAllCommand() + .keys(keys) + .values(values) + .initiatorTime(hybridTimestamp(ts)) + .build(); + } + + /** + * Creates get and put all command. + * + * @param commandsFactory Commands factory. + * @param map Values. + * @param ts Local time. + */ + private GetAndPutAllCommand getAndPutAllCommand(MetaStorageCommandsFactory commandsFactory, Map<ByteArray, byte[]> map, + HybridTimestamp ts) { + int size = map.size(); + + List<byte[]> keys = new ArrayList<>(size); + List<byte[]> values = new ArrayList<>(size); + + for (Map.Entry<ByteArray, byte[]> e : map.entrySet()) { + keys.add(e.getKey().bytes()); + + values.add(e.getValue()); + } + + return commandsFactory.getAndPutAllCommand() + .keys(keys) + .values(values) + .initiatorTime(hybridTimestamp(ts)) + .build(); + } + + /** + * Creates get and remove all command. + * + * @param commandsFactory Commands factory. + * @param keys The keys collection. Couldn't be {@code null}. + * @param ts Local time. + */ + private GetAndRemoveAllCommand getAndRemoveAllCommand(MetaStorageCommandsFactory commandsFactory, Set<ByteArray> keys, + HybridTimestamp ts) { + List<byte[]> keysList = new ArrayList<>(keys.size()); + + for (ByteArray key : keys) { + keysList.add(key.bytes()); + } + + return commandsFactory.getAndRemoveAllCommand().keys(keysList).initiatorTime(hybridTimestamp(ts)).build(); + } + + /** + * Creates remove all command. + * + * @param commandsFactory Commands factory. + * @param keys The keys collection. Couldn't be {@code null}. + * @param ts Local time. + */ + private RemoveAllCommand removeAllCommand(MetaStorageCommandsFactory commandsFactory, Set<ByteArray> keys, HybridTimestamp ts) { + List<byte[]> list = new ArrayList<>(keys.size()); + + for (ByteArray key : keys) { + list.add(key.bytes()); + } + + return commandsFactory.removeAllCommand().keys(list).initiatorTime(hybridTimestamp(ts)).build(); + } + + private HybridTimestampMessage hybridTimestamp(HybridTimestamp ts) { + return context.commandsFactory().hybridTimestampMessage().physical(ts.getPhysical()).logical(ts.getLogical()).build(); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java index 7902e66781..7dd7e918f5 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java @@ -21,6 +21,9 @@ import java.nio.file.Path; import java.util.Iterator; import java.util.function.Consumer; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; +import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; @@ -34,9 +37,9 @@ public class MetaStorageLearnerListener implements RaftGroupListener { private final MetaStorageWriteHandler writeHandler; - public MetaStorageLearnerListener(KeyValueStorage storage) { + public MetaStorageLearnerListener(KeyValueStorage storage, ClusterTimeImpl clusterTime) { this.storage = storage; - this.writeHandler = new MetaStorageWriteHandler(storage); + this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime); } @Override @@ -56,6 +59,11 @@ public class MetaStorageLearnerListener implements RaftGroupListener { } } + @Override + public void onBeforeApply(Command command) { + writeHandler.beforeApply(command); + } + @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) { storage.snapshot(path).whenComplete((unused, throwable) -> doneClo.accept(throwable)); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java index d3c2ceaace..409380b88b 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java @@ -39,6 +39,9 @@ import org.apache.ignite.internal.metastorage.command.cursor.NextBatchCommand; import org.apache.ignite.internal.metastorage.command.response.BatchResponse; import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; +import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; @@ -64,9 +67,9 @@ public class MetaStorageListener implements RaftGroupListener { * * @param storage Storage. */ - public MetaStorageListener(KeyValueStorage storage) { + public MetaStorageListener(KeyValueStorage storage, ClusterTimeImpl clusterTime) { this.storage = storage; - this.writeHandler = new MetaStorageWriteHandler(storage); + this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime); this.cursors = new ConcurrentHashMap<>(); } @@ -217,6 +220,11 @@ public class MetaStorageListener implements RaftGroupListener { } } + @Override + public void onBeforeApply(Command command) { + writeHandler.beforeApply(command); + } + private void closeCursor(IgniteUuid cursorId) { CursorMeta cursorMeta = cursors.remove(cursorId); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index ddc91f2f65..663856744a 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -19,17 +19,22 @@ package org.apache.ignite.internal.metastorage.server.raft; import java.io.Serializable; import java.util.Collection; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand; import org.apache.ignite.internal.metastorage.command.GetAndPutCommand; import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand; import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand; +import org.apache.ignite.internal.metastorage.command.HybridTimestampMessage; import org.apache.ignite.internal.metastorage.command.InvokeCommand; +import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; +import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand; import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; import org.apache.ignite.internal.metastorage.command.PutAllCommand; import org.apache.ignite.internal.metastorage.command.PutCommand; import org.apache.ignite.internal.metastorage.command.RemoveAllCommand; import org.apache.ignite.internal.metastorage.command.RemoveCommand; +import org.apache.ignite.internal.metastorage.command.SyncTimeCommand; import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.ConditionType; import org.apache.ignite.internal.metastorage.dsl.Iif; @@ -46,6 +51,9 @@ import org.apache.ignite.internal.metastorage.server.RevisionCondition; import org.apache.ignite.internal.metastorage.server.Statement; import org.apache.ignite.internal.metastorage.server.TombstoneCondition; import org.apache.ignite.internal.metastorage.server.ValueCondition; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; +import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; @@ -53,10 +61,13 @@ import org.apache.ignite.internal.raft.service.CommandClosure; * Class containing some common logic for Meta Storage Raft group listeners. */ class MetaStorageWriteHandler { + private static final MetaStorageCommandsFactory META_STORAGE_COMMANDS_FACTORY = new MetaStorageCommandsFactory(); private final KeyValueStorage storage; + private final ClusterTimeImpl clusterTime; - MetaStorageWriteHandler(KeyValueStorage storage) { + MetaStorageWriteHandler(KeyValueStorage storage, ClusterTimeImpl clusterTime) { this.storage = storage; + this.clusterTime = clusterTime; } /** @@ -122,6 +133,10 @@ class MetaStorageWriteHandler { MultiInvokeCommand cmd = (MultiInvokeCommand) command; clo.result(storage.invoke(toIf(cmd.iif()))); + } else if (command instanceof SyncTimeCommand) { + clusterTime.updateSafeTime(((SyncTimeCommand) command).safeTime().asHybridTimestamp()); + + clo.result(null); } else { return false; } @@ -234,4 +249,16 @@ class MetaStorageWriteHandler { throw new IllegalArgumentException("Unexpected revision condition type " + type); } } + + void beforeApply(Command command) { + if (command instanceof MetaStorageWriteCommand) { + clusterTime.adjust(((MetaStorageWriteCommand) command).initiatorTime().asHybridTimestamp()); + + ((MetaStorageWriteCommand) command).safeTime(hybridTimestamp(clusterTime.now())); + } + } + + private static HybridTimestampMessage hybridTimestamp(HybridTimestamp ts) { + return META_STORAGE_COMMANDS_FACTORY.hybridTimestampMessage().physical(ts.getPhysical()).logical(ts.getLogical()).build(); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java similarity index 55% copy from modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java copy to modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java index eb176875f1..7f631ef8dd 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/PutCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java @@ -15,24 +15,29 @@ * limitations under the License. */ -package org.apache.ignite.internal.metastorage.command; +package org.apache.ignite.internal.metastorage.server.time; -import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.annotations.Transferable; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl; /** - * Put command for MetaStorageCommandListener that inserts or updates an entry with the given key and the given value and retrieves a - * previous entry for the given key. + * Cluster time. */ -@Transferable(MetastorageCommandsMessageGroup.PUT) -public interface PutCommand extends WriteCommand { +public interface ClusterTime { /** - * Returns the key. Couldn't be {@code null}. + * Returns current cluster time. + * + * @return Current cluster time. */ - byte[] key(); + HybridTimestamp now(); /** - * Returns the value. Couldn't be {@code null}. + * Provides the future that is completed when cluster time reaches given one. If the time is greater or equal + * then the given one, returns completed future. + * + * @param time Timestamp to wait for. + * @return Future. */ - byte[] value(); + CompletableFuture<Void> waitFor(HybridTimestamp time); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java new file mode 100644 index 0000000000..2f33436bd7 --- /dev/null +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server.time; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.jetbrains.annotations.Nullable; + +public class ClusterTimeImpl implements ClusterTime { + /** The logger. */ + private static final IgniteLogger LOG = Loggers.forClass(ClusterTimeImpl.class); + + private final IgniteSpinBusyLock busyLock; + + private volatile @Nullable LeaderTimer leaderTimer; + + private final HybridClock clock; + + private final PendingComparableValuesTracker<HybridTimestamp> safeTime; + + public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) { + this.busyLock = busyLock; + this.clock = clock; + this.safeTime = new PendingComparableValuesTracker<>(clock.now()); + } + + public void startLeaderTimer(MetaStorageServiceImpl service) { + if (!busyLock.enterBusy()) { + return; + } + + try { + assert leaderTimer == null; + + LeaderTimer newTimer = new LeaderTimer(service); + + leaderTimer = newTimer; + + newTimer.start(); + } finally { + busyLock.leaveBusy(); + } + } + + public void stopLeaderTimer() { + LeaderTimer timer = leaderTimer; + + assert timer != null; + + timer.stop(); + + leaderTimer = null; + } + + @Override + public HybridTimestamp now() { + return clock.now(); + } + + @Override + public CompletableFuture<Void> waitFor(HybridTimestamp time) { + return safeTime.waitFor(time); + } + + public void updateSafeTime(HybridTimestamp ts) { + this.safeTime.update(ts); + } + + public void adjust(HybridTimestamp ts) { + this.clock.update(ts); + } + + private class LeaderTimer { + + private final MetaStorageServiceImpl service; + + /** Scheduled executor for cluster time sync. */ + private final ScheduledExecutorService scheduledClusterTimeSyncExecutor = + Executors.newScheduledThreadPool(1, new NamedThreadFactory("scheduled-cluster-time-sync-thread", LOG)); + + private LeaderTimer(MetaStorageServiceImpl service) { + this.service = service; + } + + void start() { + schedule(); + } + + private void schedule() { + try { + scheduledClusterTimeSyncExecutor.scheduleAtFixedRate( + this::disseminateTime, + 0, + 1, + TimeUnit.SECONDS + ); + } catch (RejectedExecutionException ignored) { + // Scheduler was stopped, it is ok. + } + } + + void disseminateTime() { + if (!busyLock.enterBusy()) { + // Shutting down. + return; + } + + try { + HybridTimestamp now = clock.now(); + + service.syncTime(now); + } finally { + busyLock.leaveBusy(); + } + } + + void stop() { + IgniteUtils.shutdownAndAwaitTermination(scheduledClusterTimeSyncExecutor, 1, TimeUnit.SECONDS); + } + } +} diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java index 95871e2399..e07ed4e66b 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRangeCursorTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.Serializable; @@ -42,6 +43,7 @@ import java.util.stream.Stream; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; @@ -83,13 +85,14 @@ public class MetaStorageRangeCursorTest { when(storage.range(any(), any(), anyBoolean())).thenReturn(Cursor.fromIterable(expectedEntries)); when(storage.range(any(), any(), anyLong(), anyBoolean())).thenReturn(Cursor.fromIterable(expectedEntries)); - listener = new MetaStorageListener(storage); + listener = new MetaStorageListener(storage, mock(ClusterTime.class)); when(raftGroupService.run(any())).thenAnswer(invocation -> runCommand(invocation.getArgument(0))); var localNode = new ClusterNode("test", "test", new NetworkAddress("localhost", 10000)); - MetaStorageService metaStorageService = new MetaStorageServiceImpl(raftGroupService, new IgniteSpinBusyLock(), localNode); + MetaStorageService metaStorageService = new MetaStorageServiceImpl(raftGroupService, new IgniteSpinBusyLock(), localNode, + mock(ClusterTime.class)); checkCursor(metaStorageService.range(intToBytes(0), intToBytes(keyTo)), expectedEntries); checkCursor(metaStorageService.range(intToBytes(0), intToBytes(keyTo), 0), expectedEntries); diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java index 0da79d691e..e3342069c0 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java @@ -29,8 +29,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.ReadCommand; @@ -95,7 +97,7 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { */ private StandaloneMetaStorageManager(VaultManager vaultMgr, ClusterService clusterService, ClusterManagementGroupManager cmgMgr, LogicalTopologyService logicalTopologyService, RaftManager raftMgr, KeyValueStorage storage) { - super(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage); + super(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, mock(HybridClock.class)); } private static ClusterService mockClusterService() { diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java index 84e63cc954..98a1eee909 100644 --- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java +++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.WithSetter; /** * Class for generating implementations of the {@link NetworkMessage} interfaces and their builders, generated by a {@link @@ -98,7 +99,7 @@ public class MessageImplGenerator { List<ExecutableElement> getters = message.getters(); var fields = new ArrayList<FieldSpec>(getters.size()); - var getterImpls = new ArrayList<MethodSpec>(getters.size()); + var methodImpls = new ArrayList<MethodSpec>(getters.size()); // create a field and a getter implementation for every getter in the message interface for (ExecutableElement getter : getters) { @@ -111,8 +112,9 @@ public class MessageImplGenerator { .addModifiers(Modifier.PRIVATE); boolean isMarshallable = getter.getAnnotation(Marshallable.class) != null; + boolean generateSetter = getter.getAnnotation(WithSetter.class) != null; - if (!isMarshallable) { + if (!isMarshallable && !generateSetter) { fieldBuilder.addModifiers(Modifier.FINAL); } @@ -132,21 +134,32 @@ public class MessageImplGenerator { .addStatement("return $N", marshallableFieldArray) .build(); - getterImpls.add(baGetterImpl); + methodImpls.add(baGetterImpl); + } + + if (generateSetter) { + MethodSpec setterImpl = MethodSpec.methodBuilder(getterName) + .returns(TypeName.VOID) + .addModifiers(Modifier.PUBLIC) + .addParameter(getterReturnType, getterName) + .addStatement("this.$L = $L", getterName, getterName) + .build(); + + methodImpls.add(setterImpl); } MethodSpec getterImpl = MethodSpec.overriding(getter) .addStatement("return $N", field) .build(); - getterImpls.add(getterImpl); + methodImpls.add(getterImpl); } TypeSpec.Builder messageImpl = TypeSpec.classBuilder(messageImplClassName) .addModifiers(Modifier.PUBLIC) .addSuperinterface(message.className()) .addFields(fields) - .addMethods(getterImpls) + .addMethods(methodImpls) .addMethod(constructor(fields)); // group type constant and getter diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java similarity index 58% copy from modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java copy to modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java index 5b2bce2e2b..61e27fb2e1 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/RemoveCommand.java +++ b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java @@ -15,18 +15,23 @@ * limitations under the License. */ -package org.apache.ignite.internal.metastorage.command; - -import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.annotations.Transferable; +package org.apache.ignite.network.annotations; /** - * Remove command for MetaStorageCommandListener that removes an entry for the given key. + * Annotation for the {@link Transferable} class methods. If a method is marked with this annotation, + * a setter for the field with the same name as method's will be generated. + * In order to have access to this setter via interface one can use default method: + * <pre> + * {@code + * @WithSetter + * HybridTimestampMessage safeTime(); + * + * default void safeTime(HybridTimestampMessage safeTime) { + * // No-op. + * } + * } + * </pre> + * Note that fields with setters will not be final. */ -@Transferable(MetastorageCommandsMessageGroup.REMOVE) -public interface RemoveCommand extends WriteCommand { - /** - * Returns the key. Couldn't be {@code null}. - */ - byte[] key(); +public @interface WithSetter { } diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index eb8c04df9a..e73840369f 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -153,7 +153,8 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest { cmgManager, logicalTopologyService, raftManager, - storage + storage, + mock(HybridClock.class) ); placementDriverManager = new PlacementDriverManager( diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java index 212a34807c..c5e0a0ba35 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java @@ -19,9 +19,12 @@ package org.apache.ignite.internal.raft.service; import java.nio.file.Path; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; +import org.jetbrains.annotations.Nullable; /** * A listener for replication group events. @@ -81,4 +84,13 @@ public interface RaftGroupListener { * Invoked once after a raft node has been shut down. */ void onShutdown(); + + /** + * Invoked before submitting a command to a raft group. + * + * @param command The command. + */ + default void onBeforeApply(Command command) { + // No-op. + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java index b966f6e275..b91b60e3fe 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java @@ -19,8 +19,8 @@ package org.apache.ignite.raft.jraft.rpc.impl; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.Executor; -import org.apache.ignite.internal.logger.IgniteLogger; +import java.util.concurrent.CompletableFuture;import java.util.concurrent.Executor; +import java.util.function.BiFunction;import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.Command; @@ -73,6 +73,11 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { return; } + JraftServerImpl.DelegatingStateMachine fsm = (JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm(); + + // Apply a filter before committing to STM. + fsm.getListener().onBeforeApply(request.command()); + if (request.command() instanceof WriteCommand) { applyWrite(node, request, rpcCtx); } else { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 15d061a732..d07d3487c1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; import java.nio.file.Path; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.ignite.internal.configuration.storage.Data; import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; @@ -154,7 +156,8 @@ public class ItDistributedConfigurationPropertiesTest { cmgManager, new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, - new SimpleInMemoryKeyValueStorage(name()) + new SimpleInMemoryKeyValueStorage(name()), + mock(HybridClock.class) ); // create a custom storage implementation that is able to "lose" some storage updates diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index 4db3c7f5d2..694d475182 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -24,6 +24,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; import java.io.Serializable; import java.nio.file.Path; @@ -40,6 +41,7 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySer import org.apache.ignite.internal.configuration.SecurityConfiguration; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; @@ -129,7 +131,8 @@ public class ItDistributedConfigurationStorageTest { cmgManager, new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, - new SimpleInMemoryKeyValueStorage(name()) + new SimpleInMemoryKeyValueStorage(name()), + mock(HybridClock.class) ); cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index 8d03cfd3d9..8c85194bb8 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -677,7 +678,8 @@ public class ItRebalanceDistributedTest { raftManager, testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class) ? new RocksDbKeyValueStorage(nodeName, resolveDir(dir, "metaStorage")) - : new SimpleInMemoryKeyValueStorage(nodeName) + : new SimpleInMemoryKeyValueStorage(nodeName), + mock(HybridClock.class) ); cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 32345e60a7..d200a9f890 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -301,7 +301,8 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { cmgManager, new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftMgr, - new RocksDbKeyValueStorage(name, dir.resolve("metastorage")) + new RocksDbKeyValueStorage(name, dir.resolve("metastorage")), + mock(HybridClock.class) ); var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vault); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 52a494ac92..9fa07b1a1c 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -380,7 +380,8 @@ public class IgniteImpl implements Ignite { cmgMgr, logicalTopologyService, raftMgr, - new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH)) + new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH)), + clock ); this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vaultMgr); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java index 5b8396b5d8..90b692a848 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.impl.EntryImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; @@ -147,7 +148,7 @@ public class TableManagerDistributionZonesTest extends IgniteAbstractTest { keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); - MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage); + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTime.class)); RaftGroupService metaStorageService = mock(RaftGroupService.class); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java index a9f62a449c..d00f2cbf8e 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; @@ -122,7 +123,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); - MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage); + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTime.class)); RaftGroupService metaStorageService = mock(RaftGroupService.class);
